1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{CommandContext, CommandRegistry};
21use crate::config::CodeConfig;
22use crate::error::{read_or_recover, write_or_recover, Result};
23use crate::llm::{LlmClient, Message};
24use crate::prompts::SystemPromptSlots;
25use crate::queue::{
26 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
27 SessionQueueStats,
28};
29use crate::session_lane_queue::SessionLaneQueue;
30use crate::tools::{ToolContext, ToolExecutor};
31use a3s_lane::{DeadLetter, MetricsSnapshot};
32use a3s_memory::{FileMemoryStore, MemoryStore};
33use anyhow::Context;
34use std::path::{Path, PathBuf};
35use std::sync::{Arc, RwLock};
36use tokio::sync::{broadcast, mpsc};
37use tokio::task::JoinHandle;
38
39fn safe_canonicalize(path: &Path) -> PathBuf {
42 match std::fs::canonicalize(path) {
43 Ok(p) => strip_unc_prefix(p),
44 Err(_) => path.to_path_buf(),
45 }
46}
47
48fn strip_unc_prefix(path: PathBuf) -> PathBuf {
51 #[cfg(windows)]
52 {
53 let s = path.to_string_lossy();
54 if let Some(stripped) = s.strip_prefix(r"\\?\") {
55 return PathBuf::from(stripped);
56 }
57 }
58 path
59}
60
61#[derive(Debug, Clone)]
67pub struct ToolCallResult {
68 pub name: String,
69 pub output: String,
70 pub exit_code: i32,
71}
72
73#[derive(Clone, Default)]
79pub struct SessionOptions {
80 pub model: Option<String>,
82 pub agent_dirs: Vec<PathBuf>,
85 pub queue_config: Option<SessionQueueConfig>,
90 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
92 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
94 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
96 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
98 pub planning_enabled: bool,
100 pub goal_tracking: bool,
102 pub skill_dirs: Vec<PathBuf>,
105 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
107 pub memory_store: Option<Arc<dyn MemoryStore>>,
109 pub(crate) file_memory_dir: Option<PathBuf>,
111 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
113 pub session_id: Option<String>,
115 pub auto_save: bool,
117 pub max_parse_retries: Option<u32>,
120 pub tool_timeout_ms: Option<u64>,
123 pub circuit_breaker_threshold: Option<u32>,
127 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
133 pub auto_compact: bool,
135 pub auto_compact_threshold: Option<f32>,
138 pub continuation_enabled: Option<bool>,
141 pub max_continuation_turns: Option<u32>,
144 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
149 pub temperature: Option<f32>,
151 pub thinking_budget: Option<usize>,
153 pub prompt_slots: Option<SystemPromptSlots>,
159}
160
161impl std::fmt::Debug for SessionOptions {
162 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
163 f.debug_struct("SessionOptions")
164 .field("model", &self.model)
165 .field("agent_dirs", &self.agent_dirs)
166 .field("skill_dirs", &self.skill_dirs)
167 .field("queue_config", &self.queue_config)
168 .field("security_provider", &self.security_provider.is_some())
169 .field("context_providers", &self.context_providers.len())
170 .field("confirmation_manager", &self.confirmation_manager.is_some())
171 .field("permission_checker", &self.permission_checker.is_some())
172 .field("planning_enabled", &self.planning_enabled)
173 .field("goal_tracking", &self.goal_tracking)
174 .field(
175 "skill_registry",
176 &self
177 .skill_registry
178 .as_ref()
179 .map(|r| format!("{} skills", r.len())),
180 )
181 .field("memory_store", &self.memory_store.is_some())
182 .field("session_store", &self.session_store.is_some())
183 .field("session_id", &self.session_id)
184 .field("auto_save", &self.auto_save)
185 .field("max_parse_retries", &self.max_parse_retries)
186 .field("tool_timeout_ms", &self.tool_timeout_ms)
187 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
188 .field("sandbox_config", &self.sandbox_config)
189 .field("auto_compact", &self.auto_compact)
190 .field("auto_compact_threshold", &self.auto_compact_threshold)
191 .field("continuation_enabled", &self.continuation_enabled)
192 .field("max_continuation_turns", &self.max_continuation_turns)
193 .field("mcp_manager", &self.mcp_manager.is_some())
194 .field("temperature", &self.temperature)
195 .field("thinking_budget", &self.thinking_budget)
196 .field("prompt_slots", &self.prompt_slots.is_some())
197 .finish()
198 }
199}
200
201impl SessionOptions {
202 pub fn new() -> Self {
203 Self::default()
204 }
205
206 pub fn with_model(mut self, model: impl Into<String>) -> Self {
207 self.model = Some(model.into());
208 self
209 }
210
211 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
212 self.agent_dirs.push(dir.into());
213 self
214 }
215
216 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
217 self.queue_config = Some(config);
218 self
219 }
220
221 pub fn with_default_security(mut self) -> Self {
223 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
224 self
225 }
226
227 pub fn with_security_provider(
229 mut self,
230 provider: Arc<dyn crate::security::SecurityProvider>,
231 ) -> Self {
232 self.security_provider = Some(provider);
233 self
234 }
235
236 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
238 let config = crate::context::FileSystemContextConfig::new(root_path);
239 self.context_providers
240 .push(Arc::new(crate::context::FileSystemContextProvider::new(
241 config,
242 )));
243 self
244 }
245
246 pub fn with_context_provider(
248 mut self,
249 provider: Arc<dyn crate::context::ContextProvider>,
250 ) -> Self {
251 self.context_providers.push(provider);
252 self
253 }
254
255 pub fn with_confirmation_manager(
257 mut self,
258 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
259 ) -> Self {
260 self.confirmation_manager = Some(manager);
261 self
262 }
263
264 pub fn with_permission_checker(
266 mut self,
267 checker: Arc<dyn crate::permissions::PermissionChecker>,
268 ) -> Self {
269 self.permission_checker = Some(checker);
270 self
271 }
272
273 pub fn with_permissive_policy(self) -> Self {
280 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
281 }
282
283 pub fn with_planning(mut self, enabled: bool) -> Self {
285 self.planning_enabled = enabled;
286 self
287 }
288
289 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
291 self.goal_tracking = enabled;
292 self
293 }
294
295 pub fn with_builtin_skills(mut self) -> Self {
297 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
298 self
299 }
300
301 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
303 self.skill_registry = Some(registry);
304 self
305 }
306
307 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
310 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
311 self
312 }
313
314 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
316 let registry = self
317 .skill_registry
318 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
319 if let Err(e) = registry.load_from_dir(&dir) {
320 tracing::warn!(
321 dir = %dir.as_ref().display(),
322 error = %e,
323 "Failed to load skills from directory — continuing without them"
324 );
325 }
326 self.skill_registry = Some(registry);
327 self
328 }
329
330 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
332 self.memory_store = Some(store);
333 self
334 }
335
336 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
342 self.file_memory_dir = Some(dir.into());
343 self
344 }
345
346 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
348 self.session_store = Some(store);
349 self
350 }
351
352 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
354 let dir = dir.into();
355 match tokio::runtime::Handle::try_current() {
356 Ok(handle) => {
357 match tokio::task::block_in_place(|| {
358 handle.block_on(crate::store::FileSessionStore::new(dir))
359 }) {
360 Ok(store) => {
361 self.session_store =
362 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
363 }
364 Err(e) => {
365 tracing::warn!("Failed to create file session store: {}", e);
366 }
367 }
368 }
369 Err(_) => {
370 tracing::warn!(
371 "No async runtime available for file session store — persistence disabled"
372 );
373 }
374 }
375 self
376 }
377
378 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
380 self.session_id = Some(id.into());
381 self
382 }
383
384 pub fn with_auto_save(mut self, enabled: bool) -> Self {
386 self.auto_save = enabled;
387 self
388 }
389
390 pub fn with_parse_retries(mut self, max: u32) -> Self {
396 self.max_parse_retries = Some(max);
397 self
398 }
399
400 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
406 self.tool_timeout_ms = Some(timeout_ms);
407 self
408 }
409
410 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
416 self.circuit_breaker_threshold = Some(threshold);
417 self
418 }
419
420 pub fn with_resilience_defaults(self) -> Self {
426 self.with_parse_retries(2)
427 .with_tool_timeout(120_000)
428 .with_circuit_breaker(3)
429 }
430
431 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
450 self.sandbox_config = Some(config);
451 self
452 }
453
454 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
459 self.auto_compact = enabled;
460 self
461 }
462
463 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
465 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
466 self
467 }
468
469 pub fn with_continuation(mut self, enabled: bool) -> Self {
474 self.continuation_enabled = Some(enabled);
475 self
476 }
477
478 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
480 self.max_continuation_turns = Some(turns);
481 self
482 }
483
484 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
489 self.mcp_manager = Some(manager);
490 self
491 }
492
493 pub fn with_temperature(mut self, temperature: f32) -> Self {
494 self.temperature = Some(temperature);
495 self
496 }
497
498 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
499 self.thinking_budget = Some(budget);
500 self
501 }
502
503 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
508 self.prompt_slots = Some(slots);
509 self
510 }
511}
512
513pub struct Agent {
522 llm_client: Arc<dyn LlmClient>,
523 code_config: CodeConfig,
524 config: AgentConfig,
525 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
527 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
530}
531
532impl std::fmt::Debug for Agent {
533 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
534 f.debug_struct("Agent").finish()
535 }
536}
537
538impl Agent {
539 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
543 let source = config_source.into();
544
545 let expanded = if let Some(rest) = source.strip_prefix("~/") {
547 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
548 if let Some(home) = home {
549 PathBuf::from(home).join(rest).display().to_string()
550 } else {
551 source.clone()
552 }
553 } else {
554 source.clone()
555 };
556
557 let path = Path::new(&expanded);
558
559 let config = if path.extension().is_some() && path.exists() {
560 CodeConfig::from_file(path)
561 .with_context(|| format!("Failed to load config: {}", path.display()))?
562 } else {
563 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
565 };
566
567 Self::from_config(config).await
568 }
569
570 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
575 Self::new(config_source).await
576 }
577
578 pub async fn from_config(config: CodeConfig) -> Result<Self> {
580 let llm_config = config
581 .default_llm_config()
582 .context("default_model must be set in 'provider/model' format with a valid API key")?;
583 let llm_client = crate::llm::create_client_with_config(llm_config);
584
585 let agent_config = AgentConfig {
586 max_tool_rounds: config
587 .max_tool_rounds
588 .unwrap_or(AgentConfig::default().max_tool_rounds),
589 ..AgentConfig::default()
590 };
591
592 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
594 (None, vec![])
595 } else {
596 let manager = Arc::new(crate::mcp::manager::McpManager::new());
597 for server in &config.mcp_servers {
598 if !server.enabled {
599 continue;
600 }
601 manager.register_server(server.clone()).await;
602 if let Err(e) = manager.connect(&server.name).await {
603 tracing::warn!(
604 server = %server.name,
605 error = %e,
606 "Failed to connect to MCP server — skipping"
607 );
608 }
609 }
610 let tools = manager.get_all_tools().await;
612 (Some(manager), tools)
613 };
614
615 let mut agent = Agent {
616 llm_client,
617 code_config: config,
618 config: agent_config,
619 global_mcp,
620 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
621 };
622
623 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
625 for dir in &agent.code_config.skill_dirs.clone() {
626 if let Err(e) = registry.load_from_dir(dir) {
627 tracing::warn!(
628 dir = %dir.display(),
629 error = %e,
630 "Failed to load skills from directory — skipping"
631 );
632 }
633 }
634 agent.config.skill_registry = Some(registry);
635
636 Ok(agent)
637 }
638
639 pub async fn refresh_mcp_tools(&self) -> Result<()> {
647 if let Some(ref mcp) = self.global_mcp {
648 let fresh = mcp.get_all_tools().await;
649 *self
650 .global_mcp_tools
651 .lock()
652 .expect("global_mcp_tools lock poisoned") = fresh;
653 }
654 Ok(())
655 }
656
657 pub fn session(
662 &self,
663 workspace: impl Into<String>,
664 options: Option<SessionOptions>,
665 ) -> Result<AgentSession> {
666 let opts = options.unwrap_or_default();
667
668 let llm_client = if let Some(ref model) = opts.model {
669 let (provider_name, model_id) = model
670 .split_once('/')
671 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
672
673 let mut llm_config = self
674 .code_config
675 .llm_config(provider_name, model_id)
676 .with_context(|| {
677 format!("provider '{provider_name}' or model '{model_id}' not found in config")
678 })?;
679
680 if let Some(temp) = opts.temperature {
681 llm_config = llm_config.with_temperature(temp);
682 }
683 if let Some(budget) = opts.thinking_budget {
684 llm_config = llm_config.with_thinking_budget(budget);
685 }
686
687 crate::llm::create_client_with_config(llm_config)
688 } else {
689 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
690 tracing::warn!(
691 "temperature/thinking_budget set without model override — these will be ignored. \
692 Use with_model() to apply LLM parameter overrides."
693 );
694 }
695 self.llm_client.clone()
696 };
697
698 let merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
701 (Some(global), Some(session)) => {
702 let global = Arc::clone(global);
703 let session_mgr = Arc::clone(session);
704 match tokio::runtime::Handle::try_current() {
705 Ok(handle) => {
706 let global_for_merge = Arc::clone(&global);
707 tokio::task::block_in_place(|| {
708 handle.block_on(async move {
709 for config in session_mgr.all_configs().await {
710 let name = config.name.clone();
711 global_for_merge.register_server(config).await;
712 if let Err(e) = global_for_merge.connect(&name).await {
713 tracing::warn!(
714 server = %name,
715 error = %e,
716 "Failed to connect session-level MCP server — skipping"
717 );
718 }
719 }
720 })
721 });
722 }
723 Err(_) => {
724 tracing::warn!(
725 "No async runtime available to merge session-level MCP servers \
726 into global manager — session MCP servers will not be available"
727 );
728 }
729 }
730 SessionOptions {
731 mcp_manager: Some(Arc::clone(&global)),
732 ..opts
733 }
734 }
735 (Some(global), None) => SessionOptions {
736 mcp_manager: Some(Arc::clone(global)),
737 ..opts
738 },
739 _ => opts,
740 };
741
742 self.build_session(workspace.into(), llm_client, &merged_opts)
743 }
744
745 pub fn resume_session(
753 &self,
754 session_id: &str,
755 options: SessionOptions,
756 ) -> Result<AgentSession> {
757 let store = options.session_store.as_ref().ok_or_else(|| {
758 crate::error::CodeError::Session(
759 "resume_session requires a session_store in SessionOptions".to_string(),
760 )
761 })?;
762
763 let data = match tokio::runtime::Handle::try_current() {
765 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
766 .map_err(|e| {
767 crate::error::CodeError::Session(format!(
768 "Failed to load session {}: {}",
769 session_id, e
770 ))
771 })?,
772 Err(_) => {
773 return Err(crate::error::CodeError::Session(
774 "No async runtime available for session resume".to_string(),
775 ))
776 }
777 };
778
779 let data = data.ok_or_else(|| {
780 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
781 })?;
782
783 let mut opts = options;
785 opts.session_id = Some(data.id.clone());
786
787 let llm_client = if let Some(ref model) = opts.model {
788 let (provider_name, model_id) = model
789 .split_once('/')
790 .context("model format must be 'provider/model'")?;
791 let llm_config = self
792 .code_config
793 .llm_config(provider_name, model_id)
794 .with_context(|| {
795 format!("provider '{provider_name}' or model '{model_id}' not found")
796 })?;
797 crate::llm::create_client_with_config(llm_config)
798 } else {
799 self.llm_client.clone()
800 };
801
802 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
803
804 *write_or_recover(&session.history) = data.messages;
806
807 Ok(session)
808 }
809
810 fn build_session(
811 &self,
812 workspace: String,
813 llm_client: Arc<dyn LlmClient>,
814 opts: &SessionOptions,
815 ) -> Result<AgentSession> {
816 let canonical = safe_canonicalize(Path::new(&workspace));
817
818 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
819
820 {
824 use crate::subagent::{load_agents_from_dir, AgentRegistry};
825 use crate::tools::register_task_with_mcp;
826 let agent_registry = AgentRegistry::new();
827 for dir in self
828 .code_config
829 .agent_dirs
830 .iter()
831 .chain(opts.agent_dirs.iter())
832 {
833 for agent in load_agents_from_dir(dir) {
834 agent_registry.register(agent);
835 }
836 }
837 register_task_with_mcp(
838 tool_executor.registry(),
839 Arc::clone(&llm_client),
840 Arc::new(agent_registry),
841 canonical.display().to_string(),
842 opts.mcp_manager.clone(),
843 );
844 }
845
846 if let Some(ref mcp) = opts.mcp_manager {
849 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
852 Arc::as_ptr(mcp),
853 self.global_mcp
854 .as_ref()
855 .map(Arc::as_ptr)
856 .unwrap_or(std::ptr::null()),
857 ) {
858 self.global_mcp_tools
860 .lock()
861 .expect("global_mcp_tools lock poisoned")
862 .clone()
863 } else {
864 match tokio::runtime::Handle::try_current() {
866 Ok(handle) => {
867 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
868 }
869 Err(_) => {
870 tracing::warn!(
871 "No async runtime available for session-level MCP tools — \
872 MCP tools will not be registered"
873 );
874 vec![]
875 }
876 }
877 };
878
879 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
880 std::collections::HashMap::new();
881 for (server, tool) in all_tools {
882 by_server.entry(server).or_default().push(tool);
883 }
884 for (server_name, tools) in by_server {
885 for tool in
886 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
887 {
888 tool_executor.register_dynamic_tool(tool);
889 }
890 }
891 }
892
893 let tool_defs = tool_executor.definitions();
894
895 let mut prompt_slots = opts
897 .prompt_slots
898 .clone()
899 .unwrap_or_else(|| self.config.prompt_slots.clone());
900
901 let base_registry = self
905 .config
906 .skill_registry
907 .as_deref()
908 .map(|r| r.fork())
909 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
910 if let Some(ref r) = opts.skill_registry {
912 for skill in r.all() {
913 base_registry.register_unchecked(skill);
914 }
915 }
916 for dir in &opts.skill_dirs {
918 if let Err(e) = base_registry.load_from_dir(dir) {
919 tracing::warn!(
920 dir = %dir.display(),
921 error = %e,
922 "Failed to load session skill dir — skipping"
923 );
924 }
925 }
926 let effective_registry = Arc::new(base_registry);
927
928 let skill_prompt = effective_registry.to_system_prompt();
930 if !skill_prompt.is_empty() {
931 prompt_slots.extra = match prompt_slots.extra {
932 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
933 None => Some(skill_prompt),
934 };
935 }
936
937 let mut init_warning: Option<String> = None;
939 let memory = {
940 let store = if let Some(ref store) = opts.memory_store {
941 Some(Arc::clone(store))
942 } else if let Some(ref dir) = opts.file_memory_dir {
943 match tokio::runtime::Handle::try_current() {
944 Ok(handle) => {
945 let dir = dir.clone();
946 match tokio::task::block_in_place(|| {
947 handle.block_on(FileMemoryStore::new(dir))
948 }) {
949 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
950 Err(e) => {
951 let msg = format!("Failed to create file memory store: {}", e);
952 tracing::warn!("{}", msg);
953 init_warning = Some(msg);
954 None
955 }
956 }
957 }
958 Err(_) => {
959 let msg =
960 "No async runtime available for file memory store — memory disabled"
961 .to_string();
962 tracing::warn!("{}", msg);
963 init_warning = Some(msg);
964 None
965 }
966 }
967 } else {
968 None
969 };
970 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
971 };
972
973 let base = self.config.clone();
974 let config = AgentConfig {
975 prompt_slots,
976 tools: tool_defs,
977 security_provider: opts.security_provider.clone(),
978 permission_checker: opts.permission_checker.clone(),
979 confirmation_manager: opts.confirmation_manager.clone(),
980 context_providers: opts.context_providers.clone(),
981 planning_enabled: opts.planning_enabled,
982 goal_tracking: opts.goal_tracking,
983 skill_registry: Some(effective_registry),
984 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
985 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
986 circuit_breaker_threshold: opts
987 .circuit_breaker_threshold
988 .unwrap_or(base.circuit_breaker_threshold),
989 auto_compact: opts.auto_compact,
990 auto_compact_threshold: opts
991 .auto_compact_threshold
992 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
993 max_context_tokens: base.max_context_tokens,
994 llm_client: Some(Arc::clone(&llm_client)),
995 memory: memory.clone(),
996 continuation_enabled: opts
997 .continuation_enabled
998 .unwrap_or(base.continuation_enabled),
999 max_continuation_turns: opts
1000 .max_continuation_turns
1001 .unwrap_or(base.max_continuation_turns),
1002 ..base
1003 };
1004
1005 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1008 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1009 let session_id = uuid::Uuid::new_v4().to_string();
1010 let rt = tokio::runtime::Handle::try_current();
1011
1012 match rt {
1013 Ok(handle) => {
1014 let queue = tokio::task::block_in_place(|| {
1016 handle.block_on(SessionLaneQueue::new(
1017 &session_id,
1018 queue_config.clone(),
1019 agent_event_tx.clone(),
1020 ))
1021 });
1022 match queue {
1023 Ok(q) => {
1024 let q = Arc::new(q);
1026 let q2 = Arc::clone(&q);
1027 tokio::task::block_in_place(|| {
1028 handle.block_on(async { q2.start().await.ok() })
1029 });
1030 Some(q)
1031 }
1032 Err(e) => {
1033 tracing::warn!("Failed to create session lane queue: {}", e);
1034 None
1035 }
1036 }
1037 }
1038 Err(_) => {
1039 tracing::warn!(
1040 "No async runtime available for queue creation — queue disabled"
1041 );
1042 None
1043 }
1044 }
1045 } else {
1046 None
1047 };
1048
1049 let mut tool_context = ToolContext::new(canonical.clone());
1051 if let Some(ref search_config) = self.code_config.search {
1052 tool_context = tool_context.with_search_config(search_config.clone());
1053 }
1054 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1055
1056 #[cfg(feature = "sandbox")]
1058 if let Some(ref sandbox_cfg) = opts.sandbox_config {
1059 let handle: Arc<dyn crate::sandbox::BashSandbox> =
1060 Arc::new(crate::sandbox::BoxSandboxHandle::new(
1061 sandbox_cfg.clone(),
1062 canonical.display().to_string(),
1063 ));
1064 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1067 tool_context = tool_context.with_sandbox(handle);
1068 }
1069 #[cfg(not(feature = "sandbox"))]
1070 if opts.sandbox_config.is_some() {
1071 tracing::warn!(
1072 "sandbox_config is set but the `sandbox` Cargo feature is not enabled \
1073 — bash commands will run locally"
1074 );
1075 }
1076
1077 let session_id = opts
1078 .session_id
1079 .clone()
1080 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1081
1082 let session_store = if opts.session_store.is_some() {
1084 opts.session_store.clone()
1085 } else if let Some(ref dir) = self.code_config.sessions_dir {
1086 match tokio::runtime::Handle::try_current() {
1087 Ok(handle) => {
1088 let dir = dir.clone();
1089 match tokio::task::block_in_place(|| {
1090 handle.block_on(crate::store::FileSessionStore::new(dir))
1091 }) {
1092 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1093 Err(e) => {
1094 tracing::warn!(
1095 "Failed to create session store from sessions_dir: {}",
1096 e
1097 );
1098 None
1099 }
1100 }
1101 }
1102 Err(_) => {
1103 tracing::warn!(
1104 "No async runtime for sessions_dir store — persistence disabled"
1105 );
1106 None
1107 }
1108 }
1109 } else {
1110 None
1111 };
1112
1113 Ok(AgentSession {
1114 llm_client,
1115 tool_executor,
1116 tool_context,
1117 memory: config.memory.clone(),
1118 config,
1119 workspace: canonical,
1120 session_id,
1121 history: RwLock::new(Vec::new()),
1122 command_queue,
1123 session_store,
1124 auto_save: opts.auto_save,
1125 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1126 init_warning,
1127 command_registry: CommandRegistry::new(),
1128 model_name: opts
1129 .model
1130 .clone()
1131 .or_else(|| self.code_config.default_model.clone())
1132 .unwrap_or_else(|| "unknown".to_string()),
1133 mcp_manager: opts
1134 .mcp_manager
1135 .clone()
1136 .or_else(|| self.global_mcp.clone())
1137 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1138 })
1139 }
1140}
1141
1142pub struct AgentSession {
1151 llm_client: Arc<dyn LlmClient>,
1152 tool_executor: Arc<ToolExecutor>,
1153 tool_context: ToolContext,
1154 config: AgentConfig,
1155 workspace: PathBuf,
1156 session_id: String,
1158 history: RwLock<Vec<Message>>,
1160 command_queue: Option<Arc<SessionLaneQueue>>,
1162 memory: Option<Arc<crate::memory::AgentMemory>>,
1164 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1166 auto_save: bool,
1168 hook_engine: Arc<crate::hooks::HookEngine>,
1170 init_warning: Option<String>,
1172 command_registry: CommandRegistry,
1174 model_name: String,
1176 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1178}
1179
1180impl std::fmt::Debug for AgentSession {
1181 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1182 f.debug_struct("AgentSession")
1183 .field("session_id", &self.session_id)
1184 .field("workspace", &self.workspace.display().to_string())
1185 .field("auto_save", &self.auto_save)
1186 .finish()
1187 }
1188}
1189
1190impl AgentSession {
1191 fn build_agent_loop(&self) -> AgentLoop {
1195 let mut config = self.config.clone();
1196 config.hook_engine =
1197 Some(Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>);
1198 config.tools = self.tool_executor.definitions();
1202 let mut agent_loop = AgentLoop::new(
1203 self.llm_client.clone(),
1204 self.tool_executor.clone(),
1205 self.tool_context.clone(),
1206 config,
1207 );
1208 if let Some(ref queue) = self.command_queue {
1209 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1210 }
1211 agent_loop
1212 }
1213
1214 fn build_command_context(&self) -> CommandContext {
1216 let history = read_or_recover(&self.history);
1217
1218 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1220
1221 let mut mcp_map: std::collections::HashMap<String, usize> =
1223 std::collections::HashMap::new();
1224 for name in &tool_names {
1225 if let Some(rest) = name.strip_prefix("mcp__") {
1226 if let Some((server, _)) = rest.split_once("__") {
1227 *mcp_map.entry(server.to_string()).or_default() += 1;
1228 }
1229 }
1230 }
1231 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1232 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1233
1234 CommandContext {
1235 session_id: self.session_id.clone(),
1236 workspace: self.workspace.display().to_string(),
1237 model: self.model_name.clone(),
1238 history_len: history.len(),
1239 total_tokens: 0,
1240 total_cost: 0.0,
1241 tool_names,
1242 mcp_servers,
1243 }
1244 }
1245
1246 pub fn command_registry(&self) -> &CommandRegistry {
1248 &self.command_registry
1249 }
1250
1251 pub fn register_command(&mut self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1253 self.command_registry.register(cmd);
1254 }
1255
1256 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1265 if CommandRegistry::is_command(prompt) {
1267 let ctx = self.build_command_context();
1268 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1269 return Ok(AgentResult {
1270 text: output.text,
1271 messages: history
1272 .map(|h| h.to_vec())
1273 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1274 tool_calls_count: 0,
1275 usage: crate::llm::TokenUsage::default(),
1276 });
1277 }
1278 }
1279
1280 if let Some(ref w) = self.init_warning {
1281 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1282 }
1283 let agent_loop = self.build_agent_loop();
1284
1285 let use_internal = history.is_none();
1286 let effective_history = match history {
1287 Some(h) => h.to_vec(),
1288 None => read_or_recover(&self.history).clone(),
1289 };
1290
1291 let result = agent_loop.execute(&effective_history, prompt, None).await?;
1292
1293 if use_internal {
1296 *write_or_recover(&self.history) = result.messages.clone();
1297
1298 if self.auto_save {
1300 if let Err(e) = self.save().await {
1301 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1302 }
1303 }
1304 }
1305
1306 Ok(result)
1307 }
1308
1309 pub async fn send_with_attachments(
1314 &self,
1315 prompt: &str,
1316 attachments: &[crate::llm::Attachment],
1317 history: Option<&[Message]>,
1318 ) -> Result<AgentResult> {
1319 let use_internal = history.is_none();
1323 let mut effective_history = match history {
1324 Some(h) => h.to_vec(),
1325 None => read_or_recover(&self.history).clone(),
1326 };
1327 effective_history.push(Message::user_with_attachments(prompt, attachments));
1328
1329 let agent_loop = self.build_agent_loop();
1330 let result = agent_loop
1331 .execute_from_messages(effective_history, None, None)
1332 .await?;
1333
1334 if use_internal {
1335 *write_or_recover(&self.history) = result.messages.clone();
1336 if self.auto_save {
1337 if let Err(e) = self.save().await {
1338 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1339 }
1340 }
1341 }
1342
1343 Ok(result)
1344 }
1345
1346 pub async fn stream_with_attachments(
1351 &self,
1352 prompt: &str,
1353 attachments: &[crate::llm::Attachment],
1354 history: Option<&[Message]>,
1355 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1356 let (tx, rx) = mpsc::channel(256);
1357 let mut effective_history = match history {
1358 Some(h) => h.to_vec(),
1359 None => read_or_recover(&self.history).clone(),
1360 };
1361 effective_history.push(Message::user_with_attachments(prompt, attachments));
1362
1363 let agent_loop = self.build_agent_loop();
1364 let handle = tokio::spawn(async move {
1365 let _ = agent_loop
1366 .execute_from_messages(effective_history, None, Some(tx))
1367 .await;
1368 });
1369
1370 Ok((rx, handle))
1371 }
1372
1373 pub async fn stream(
1383 &self,
1384 prompt: &str,
1385 history: Option<&[Message]>,
1386 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1387 if CommandRegistry::is_command(prompt) {
1389 let ctx = self.build_command_context();
1390 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1391 let (tx, rx) = mpsc::channel(256);
1392 let handle = tokio::spawn(async move {
1393 let _ = tx
1394 .send(AgentEvent::TextDelta {
1395 text: output.text.clone(),
1396 })
1397 .await;
1398 let _ = tx
1399 .send(AgentEvent::End {
1400 text: output.text.clone(),
1401 usage: crate::llm::TokenUsage::default(),
1402 })
1403 .await;
1404 });
1405 return Ok((rx, handle));
1406 }
1407 }
1408
1409 let (tx, rx) = mpsc::channel(256);
1410 let agent_loop = self.build_agent_loop();
1411 let effective_history = match history {
1412 Some(h) => h.to_vec(),
1413 None => read_or_recover(&self.history).clone(),
1414 };
1415 let prompt = prompt.to_string();
1416
1417 let handle = tokio::spawn(async move {
1418 let _ = agent_loop
1419 .execute(&effective_history, &prompt, Some(tx))
1420 .await;
1421 });
1422
1423 Ok((rx, handle))
1424 }
1425
1426 pub fn history(&self) -> Vec<Message> {
1428 read_or_recover(&self.history).clone()
1429 }
1430
1431 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
1433 self.memory.as_ref()
1434 }
1435
1436 pub fn id(&self) -> &str {
1438 &self.session_id
1439 }
1440
1441 pub fn workspace(&self) -> &std::path::Path {
1443 &self.workspace
1444 }
1445
1446 pub fn init_warning(&self) -> Option<&str> {
1448 self.init_warning.as_deref()
1449 }
1450
1451 pub fn session_id(&self) -> &str {
1453 &self.session_id
1454 }
1455
1456 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
1462 self.tool_executor.definitions()
1463 }
1464
1465 pub fn tool_names(&self) -> Vec<String> {
1471 self.tool_executor
1472 .definitions()
1473 .into_iter()
1474 .map(|t| t.name)
1475 .collect()
1476 }
1477
1478 pub fn register_hook(&self, hook: crate::hooks::Hook) {
1484 self.hook_engine.register(hook);
1485 }
1486
1487 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
1489 self.hook_engine.unregister(hook_id)
1490 }
1491
1492 pub fn register_hook_handler(
1494 &self,
1495 hook_id: &str,
1496 handler: Arc<dyn crate::hooks::HookHandler>,
1497 ) {
1498 self.hook_engine.register_handler(hook_id, handler);
1499 }
1500
1501 pub fn unregister_hook_handler(&self, hook_id: &str) {
1503 self.hook_engine.unregister_handler(hook_id);
1504 }
1505
1506 pub fn hook_count(&self) -> usize {
1508 self.hook_engine.hook_count()
1509 }
1510
1511 pub async fn save(&self) -> Result<()> {
1515 let store = match &self.session_store {
1516 Some(s) => s,
1517 None => return Ok(()),
1518 };
1519
1520 let history = read_or_recover(&self.history).clone();
1521 let now = chrono::Utc::now().timestamp();
1522
1523 let data = crate::store::SessionData {
1524 id: self.session_id.clone(),
1525 config: crate::session::SessionConfig {
1526 name: String::new(),
1527 workspace: self.workspace.display().to_string(),
1528 system_prompt: Some(self.config.prompt_slots.build()),
1529 max_context_length: 200_000,
1530 auto_compact: false,
1531 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
1532 storage_type: crate::config::StorageBackend::File,
1533 queue_config: None,
1534 confirmation_policy: None,
1535 permission_policy: None,
1536 parent_id: None,
1537 security_config: None,
1538 hook_engine: None,
1539 planning_enabled: self.config.planning_enabled,
1540 goal_tracking: self.config.goal_tracking,
1541 },
1542 state: crate::session::SessionState::Active,
1543 messages: history,
1544 context_usage: crate::session::ContextUsage::default(),
1545 total_usage: crate::llm::TokenUsage::default(),
1546 total_cost: 0.0,
1547 model_name: None,
1548 cost_records: Vec::new(),
1549 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
1550 thinking_enabled: false,
1551 thinking_budget: None,
1552 created_at: now,
1553 updated_at: now,
1554 llm_config: None,
1555 tasks: Vec::new(),
1556 parent_id: None,
1557 };
1558
1559 store.save(&data).await?;
1560 tracing::debug!("Session {} saved", self.session_id);
1561 Ok(())
1562 }
1563
1564 pub async fn read_file(&self, path: &str) -> Result<String> {
1566 let args = serde_json::json!({ "file_path": path });
1567 let result = self.tool_executor.execute("read", &args).await?;
1568 Ok(result.output)
1569 }
1570
1571 pub async fn bash(&self, command: &str) -> Result<String> {
1576 let args = serde_json::json!({ "command": command });
1577 let result = self
1578 .tool_executor
1579 .execute_with_context("bash", &args, &self.tool_context)
1580 .await?;
1581 Ok(result.output)
1582 }
1583
1584 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
1586 let args = serde_json::json!({ "pattern": pattern });
1587 let result = self.tool_executor.execute("glob", &args).await?;
1588 let files: Vec<String> = result
1589 .output
1590 .lines()
1591 .filter(|l| !l.is_empty())
1592 .map(|l| l.to_string())
1593 .collect();
1594 Ok(files)
1595 }
1596
1597 pub async fn grep(&self, pattern: &str) -> Result<String> {
1599 let args = serde_json::json!({ "pattern": pattern });
1600 let result = self.tool_executor.execute("grep", &args).await?;
1601 Ok(result.output)
1602 }
1603
1604 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
1606 let result = self.tool_executor.execute(name, &args).await?;
1607 Ok(ToolCallResult {
1608 name: name.to_string(),
1609 output: result.output,
1610 exit_code: result.exit_code,
1611 })
1612 }
1613
1614 pub fn has_queue(&self) -> bool {
1620 self.command_queue.is_some()
1621 }
1622
1623 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
1627 if let Some(ref queue) = self.command_queue {
1628 queue.set_lane_handler(lane, config).await;
1629 }
1630 }
1631
1632 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
1636 if let Some(ref queue) = self.command_queue {
1637 queue.complete_external_task(task_id, result).await
1638 } else {
1639 false
1640 }
1641 }
1642
1643 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
1645 if let Some(ref queue) = self.command_queue {
1646 queue.pending_external_tasks().await
1647 } else {
1648 Vec::new()
1649 }
1650 }
1651
1652 pub async fn queue_stats(&self) -> SessionQueueStats {
1654 if let Some(ref queue) = self.command_queue {
1655 queue.stats().await
1656 } else {
1657 SessionQueueStats::default()
1658 }
1659 }
1660
1661 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
1663 if let Some(ref queue) = self.command_queue {
1664 queue.metrics_snapshot().await
1665 } else {
1666 None
1667 }
1668 }
1669
1670 pub async fn submit(
1676 &self,
1677 lane: SessionLane,
1678 command: Box<dyn crate::queue::SessionCommand>,
1679 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
1680 let queue = self
1681 .command_queue
1682 .as_ref()
1683 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1684 Ok(queue.submit(lane, command).await)
1685 }
1686
1687 pub async fn submit_batch(
1694 &self,
1695 lane: SessionLane,
1696 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
1697 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
1698 {
1699 let queue = self
1700 .command_queue
1701 .as_ref()
1702 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1703 Ok(queue.submit_batch(lane, commands).await)
1704 }
1705
1706 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
1708 if let Some(ref queue) = self.command_queue {
1709 queue.dead_letters().await
1710 } else {
1711 Vec::new()
1712 }
1713 }
1714
1715 pub async fn add_mcp_server(
1726 &self,
1727 config: crate::mcp::McpServerConfig,
1728 ) -> crate::error::Result<usize> {
1729 let server_name = config.name.clone();
1730 self.mcp_manager.register_server(config).await;
1731 self.mcp_manager.connect(&server_name).await.map_err(|e| {
1732 crate::error::CodeError::Tool {
1733 tool: server_name.clone(),
1734 message: format!("Failed to connect MCP server: {}", e),
1735 }
1736 })?;
1737
1738 let tools = self.mcp_manager.get_server_tools(&server_name).await;
1739 let count = tools.len();
1740
1741 for tool in
1742 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
1743 {
1744 self.tool_executor.register_dynamic_tool(tool);
1745 }
1746
1747 tracing::info!(
1748 session_id = %self.session_id,
1749 server = server_name,
1750 tools = count,
1751 "MCP server added to live session"
1752 );
1753
1754 Ok(count)
1755 }
1756
1757 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
1762 self.tool_executor
1763 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
1764 self.mcp_manager
1765 .disconnect(server_name)
1766 .await
1767 .map_err(|e| crate::error::CodeError::Tool {
1768 tool: server_name.to_string(),
1769 message: format!("Failed to disconnect MCP server: {}", e),
1770 })?;
1771 tracing::info!(
1772 session_id = %self.session_id,
1773 server = server_name,
1774 "MCP server removed from live session"
1775 );
1776 Ok(())
1777 }
1778
1779 pub async fn mcp_status(
1781 &self,
1782 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
1783 self.mcp_manager.get_status().await
1784 }
1785}
1786
1787#[cfg(test)]
1792mod tests {
1793 use super::*;
1794 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
1795 use crate::store::SessionStore;
1796
1797 #[tokio::test]
1798 async fn test_session_submit_no_queue_returns_err() {
1799 let agent = Agent::from_config(test_config()).await.unwrap();
1800 let session = agent.session(".", None).unwrap();
1801 struct Noop;
1802 #[async_trait::async_trait]
1803 impl crate::queue::SessionCommand for Noop {
1804 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1805 Ok(serde_json::json!(null))
1806 }
1807 fn command_type(&self) -> &str {
1808 "noop"
1809 }
1810 }
1811 let result: anyhow::Result<
1812 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
1813 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
1814 assert!(result.is_err());
1815 assert!(result.unwrap_err().to_string().contains("No queue"));
1816 }
1817
1818 #[tokio::test]
1819 async fn test_session_submit_batch_no_queue_returns_err() {
1820 let agent = Agent::from_config(test_config()).await.unwrap();
1821 let session = agent.session(".", None).unwrap();
1822 struct Noop;
1823 #[async_trait::async_trait]
1824 impl crate::queue::SessionCommand for Noop {
1825 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1826 Ok(serde_json::json!(null))
1827 }
1828 fn command_type(&self) -> &str {
1829 "noop"
1830 }
1831 }
1832 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
1833 let result: anyhow::Result<
1834 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
1835 > = session.submit_batch(SessionLane::Query, cmds).await;
1836 assert!(result.is_err());
1837 assert!(result.unwrap_err().to_string().contains("No queue"));
1838 }
1839
1840 fn test_config() -> CodeConfig {
1841 CodeConfig {
1842 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
1843 providers: vec![
1844 ProviderConfig {
1845 name: "anthropic".to_string(),
1846 api_key: Some("test-key".to_string()),
1847 base_url: None,
1848 models: vec![ModelConfig {
1849 id: "claude-sonnet-4-20250514".to_string(),
1850 name: "Claude Sonnet 4".to_string(),
1851 family: "claude-sonnet".to_string(),
1852 api_key: None,
1853 base_url: None,
1854 attachment: false,
1855 reasoning: false,
1856 tool_call: true,
1857 temperature: true,
1858 release_date: None,
1859 modalities: ModelModalities::default(),
1860 cost: Default::default(),
1861 limit: Default::default(),
1862 }],
1863 },
1864 ProviderConfig {
1865 name: "openai".to_string(),
1866 api_key: Some("test-openai-key".to_string()),
1867 base_url: None,
1868 models: vec![ModelConfig {
1869 id: "gpt-4o".to_string(),
1870 name: "GPT-4o".to_string(),
1871 family: "gpt-4".to_string(),
1872 api_key: None,
1873 base_url: None,
1874 attachment: false,
1875 reasoning: false,
1876 tool_call: true,
1877 temperature: true,
1878 release_date: None,
1879 modalities: ModelModalities::default(),
1880 cost: Default::default(),
1881 limit: Default::default(),
1882 }],
1883 },
1884 ],
1885 ..Default::default()
1886 }
1887 }
1888
1889 #[tokio::test]
1890 async fn test_from_config() {
1891 let agent = Agent::from_config(test_config()).await;
1892 assert!(agent.is_ok());
1893 }
1894
1895 #[tokio::test]
1896 async fn test_session_default() {
1897 let agent = Agent::from_config(test_config()).await.unwrap();
1898 let session = agent.session("/tmp/test-workspace", None);
1899 assert!(session.is_ok());
1900 let debug = format!("{:?}", session.unwrap());
1901 assert!(debug.contains("AgentSession"));
1902 }
1903
1904 #[tokio::test]
1905 async fn test_session_with_model_override() {
1906 let agent = Agent::from_config(test_config()).await.unwrap();
1907 let opts = SessionOptions::new().with_model("openai/gpt-4o");
1908 let session = agent.session("/tmp/test-workspace", Some(opts));
1909 assert!(session.is_ok());
1910 }
1911
1912 #[tokio::test]
1913 async fn test_session_with_invalid_model_format() {
1914 let agent = Agent::from_config(test_config()).await.unwrap();
1915 let opts = SessionOptions::new().with_model("gpt-4o");
1916 let session = agent.session("/tmp/test-workspace", Some(opts));
1917 assert!(session.is_err());
1918 }
1919
1920 #[tokio::test]
1921 async fn test_session_with_model_not_found() {
1922 let agent = Agent::from_config(test_config()).await.unwrap();
1923 let opts = SessionOptions::new().with_model("openai/nonexistent");
1924 let session = agent.session("/tmp/test-workspace", Some(opts));
1925 assert!(session.is_err());
1926 }
1927
1928 #[tokio::test]
1929 async fn test_new_with_hcl_string() {
1930 let hcl = r#"
1931 default_model = "anthropic/claude-sonnet-4-20250514"
1932 providers {
1933 name = "anthropic"
1934 api_key = "test-key"
1935 models {
1936 id = "claude-sonnet-4-20250514"
1937 name = "Claude Sonnet 4"
1938 }
1939 }
1940 "#;
1941 let agent = Agent::new(hcl).await;
1942 assert!(agent.is_ok());
1943 }
1944
1945 #[tokio::test]
1946 async fn test_create_alias_hcl() {
1947 let hcl = r#"
1948 default_model = "anthropic/claude-sonnet-4-20250514"
1949 providers {
1950 name = "anthropic"
1951 api_key = "test-key"
1952 models {
1953 id = "claude-sonnet-4-20250514"
1954 name = "Claude Sonnet 4"
1955 }
1956 }
1957 "#;
1958 let agent = Agent::create(hcl).await;
1959 assert!(agent.is_ok());
1960 }
1961
1962 #[tokio::test]
1963 async fn test_create_and_new_produce_same_result() {
1964 let hcl = r#"
1965 default_model = "anthropic/claude-sonnet-4-20250514"
1966 providers {
1967 name = "anthropic"
1968 api_key = "test-key"
1969 models {
1970 id = "claude-sonnet-4-20250514"
1971 name = "Claude Sonnet 4"
1972 }
1973 }
1974 "#;
1975 let agent_new = Agent::new(hcl).await;
1976 let agent_create = Agent::create(hcl).await;
1977 assert!(agent_new.is_ok());
1978 assert!(agent_create.is_ok());
1979
1980 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
1982 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
1983 assert!(session_new.is_ok());
1984 assert!(session_create.is_ok());
1985 }
1986
1987 #[test]
1988 fn test_from_config_requires_default_model() {
1989 let rt = tokio::runtime::Runtime::new().unwrap();
1990 let config = CodeConfig {
1991 providers: vec![ProviderConfig {
1992 name: "anthropic".to_string(),
1993 api_key: Some("test-key".to_string()),
1994 base_url: None,
1995 models: vec![],
1996 }],
1997 ..Default::default()
1998 };
1999 let result = rt.block_on(Agent::from_config(config));
2000 assert!(result.is_err());
2001 }
2002
2003 #[tokio::test]
2004 async fn test_history_empty_on_new_session() {
2005 let agent = Agent::from_config(test_config()).await.unwrap();
2006 let session = agent.session("/tmp/test-workspace", None).unwrap();
2007 assert!(session.history().is_empty());
2008 }
2009
2010 #[tokio::test]
2011 async fn test_session_options_with_agent_dir() {
2012 let opts = SessionOptions::new()
2013 .with_agent_dir("/tmp/agents")
2014 .with_agent_dir("/tmp/more-agents");
2015 assert_eq!(opts.agent_dirs.len(), 2);
2016 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
2017 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
2018 }
2019
2020 #[test]
2025 fn test_session_options_with_queue_config() {
2026 let qc = SessionQueueConfig::default().with_lane_features();
2027 let opts = SessionOptions::new().with_queue_config(qc.clone());
2028 assert!(opts.queue_config.is_some());
2029
2030 let config = opts.queue_config.unwrap();
2031 assert!(config.enable_dlq);
2032 assert!(config.enable_metrics);
2033 assert!(config.enable_alerts);
2034 assert_eq!(config.default_timeout_ms, Some(60_000));
2035 }
2036
2037 #[tokio::test(flavor = "multi_thread")]
2038 async fn test_session_with_queue_config() {
2039 let agent = Agent::from_config(test_config()).await.unwrap();
2040 let qc = SessionQueueConfig::default();
2041 let opts = SessionOptions::new().with_queue_config(qc);
2042 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
2043 assert!(session.is_ok());
2044 let session = session.unwrap();
2045 assert!(session.has_queue());
2046 }
2047
2048 #[tokio::test]
2049 async fn test_session_without_queue_config() {
2050 let agent = Agent::from_config(test_config()).await.unwrap();
2051 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
2052 assert!(!session.has_queue());
2053 }
2054
2055 #[tokio::test]
2056 async fn test_session_queue_stats_without_queue() {
2057 let agent = Agent::from_config(test_config()).await.unwrap();
2058 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
2059 let stats = session.queue_stats().await;
2060 assert_eq!(stats.total_pending, 0);
2062 assert_eq!(stats.total_active, 0);
2063 }
2064
2065 #[tokio::test(flavor = "multi_thread")]
2066 async fn test_session_queue_stats_with_queue() {
2067 let agent = Agent::from_config(test_config()).await.unwrap();
2068 let qc = SessionQueueConfig::default();
2069 let opts = SessionOptions::new().with_queue_config(qc);
2070 let session = agent
2071 .session("/tmp/test-workspace-qstats", Some(opts))
2072 .unwrap();
2073 let stats = session.queue_stats().await;
2074 assert_eq!(stats.total_pending, 0);
2076 assert_eq!(stats.total_active, 0);
2077 }
2078
2079 #[tokio::test(flavor = "multi_thread")]
2080 async fn test_session_pending_external_tasks_empty() {
2081 let agent = Agent::from_config(test_config()).await.unwrap();
2082 let qc = SessionQueueConfig::default();
2083 let opts = SessionOptions::new().with_queue_config(qc);
2084 let session = agent
2085 .session("/tmp/test-workspace-ext", Some(opts))
2086 .unwrap();
2087 let tasks = session.pending_external_tasks().await;
2088 assert!(tasks.is_empty());
2089 }
2090
2091 #[tokio::test(flavor = "multi_thread")]
2092 async fn test_session_dead_letters_empty() {
2093 let agent = Agent::from_config(test_config()).await.unwrap();
2094 let qc = SessionQueueConfig::default().with_dlq(Some(100));
2095 let opts = SessionOptions::new().with_queue_config(qc);
2096 let session = agent
2097 .session("/tmp/test-workspace-dlq", Some(opts))
2098 .unwrap();
2099 let dead = session.dead_letters().await;
2100 assert!(dead.is_empty());
2101 }
2102
2103 #[tokio::test(flavor = "multi_thread")]
2104 async fn test_session_queue_metrics_disabled() {
2105 let agent = Agent::from_config(test_config()).await.unwrap();
2106 let qc = SessionQueueConfig::default();
2108 let opts = SessionOptions::new().with_queue_config(qc);
2109 let session = agent
2110 .session("/tmp/test-workspace-nomet", Some(opts))
2111 .unwrap();
2112 let metrics = session.queue_metrics().await;
2113 assert!(metrics.is_none());
2114 }
2115
2116 #[tokio::test(flavor = "multi_thread")]
2117 async fn test_session_queue_metrics_enabled() {
2118 let agent = Agent::from_config(test_config()).await.unwrap();
2119 let qc = SessionQueueConfig::default().with_metrics();
2120 let opts = SessionOptions::new().with_queue_config(qc);
2121 let session = agent
2122 .session("/tmp/test-workspace-met", Some(opts))
2123 .unwrap();
2124 let metrics = session.queue_metrics().await;
2125 assert!(metrics.is_some());
2126 }
2127
2128 #[tokio::test(flavor = "multi_thread")]
2129 async fn test_session_set_lane_handler() {
2130 let agent = Agent::from_config(test_config()).await.unwrap();
2131 let qc = SessionQueueConfig::default();
2132 let opts = SessionOptions::new().with_queue_config(qc);
2133 let session = agent
2134 .session("/tmp/test-workspace-handler", Some(opts))
2135 .unwrap();
2136
2137 session
2139 .set_lane_handler(
2140 SessionLane::Execute,
2141 LaneHandlerConfig {
2142 mode: crate::queue::TaskHandlerMode::External,
2143 timeout_ms: 30_000,
2144 },
2145 )
2146 .await;
2147
2148 }
2151
2152 #[tokio::test(flavor = "multi_thread")]
2157 async fn test_session_has_id() {
2158 let agent = Agent::from_config(test_config()).await.unwrap();
2159 let session = agent.session("/tmp/test-ws-id", None).unwrap();
2160 assert!(!session.session_id().is_empty());
2162 assert_eq!(session.session_id().len(), 36); }
2164
2165 #[tokio::test(flavor = "multi_thread")]
2166 async fn test_session_explicit_id() {
2167 let agent = Agent::from_config(test_config()).await.unwrap();
2168 let opts = SessionOptions::new().with_session_id("my-session-42");
2169 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
2170 assert_eq!(session.session_id(), "my-session-42");
2171 }
2172
2173 #[tokio::test(flavor = "multi_thread")]
2174 async fn test_session_save_no_store() {
2175 let agent = Agent::from_config(test_config()).await.unwrap();
2176 let session = agent.session("/tmp/test-ws-save", None).unwrap();
2177 session.save().await.unwrap();
2179 }
2180
2181 #[tokio::test(flavor = "multi_thread")]
2182 async fn test_session_save_and_load() {
2183 let store = Arc::new(crate::store::MemorySessionStore::new());
2184 let agent = Agent::from_config(test_config()).await.unwrap();
2185
2186 let opts = SessionOptions::new()
2187 .with_session_store(store.clone())
2188 .with_session_id("persist-test");
2189 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
2190
2191 session.save().await.unwrap();
2193
2194 assert!(store.exists("persist-test").await.unwrap());
2196
2197 let data = store.load("persist-test").await.unwrap().unwrap();
2198 assert_eq!(data.id, "persist-test");
2199 assert!(data.messages.is_empty());
2200 }
2201
2202 #[tokio::test(flavor = "multi_thread")]
2203 async fn test_session_save_with_history() {
2204 let store = Arc::new(crate::store::MemorySessionStore::new());
2205 let agent = Agent::from_config(test_config()).await.unwrap();
2206
2207 let opts = SessionOptions::new()
2208 .with_session_store(store.clone())
2209 .with_session_id("history-test");
2210 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
2211
2212 {
2214 let mut h = session.history.write().unwrap();
2215 h.push(Message::user("Hello"));
2216 h.push(Message::user("How are you?"));
2217 }
2218
2219 session.save().await.unwrap();
2220
2221 let data = store.load("history-test").await.unwrap().unwrap();
2222 assert_eq!(data.messages.len(), 2);
2223 }
2224
2225 #[tokio::test(flavor = "multi_thread")]
2226 async fn test_resume_session() {
2227 let store = Arc::new(crate::store::MemorySessionStore::new());
2228 let agent = Agent::from_config(test_config()).await.unwrap();
2229
2230 let opts = SessionOptions::new()
2232 .with_session_store(store.clone())
2233 .with_session_id("resume-test");
2234 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
2235 {
2236 let mut h = session.history.write().unwrap();
2237 h.push(Message::user("What is Rust?"));
2238 h.push(Message::user("Tell me more"));
2239 }
2240 session.save().await.unwrap();
2241
2242 let opts2 = SessionOptions::new().with_session_store(store.clone());
2244 let resumed = agent.resume_session("resume-test", opts2).unwrap();
2245
2246 assert_eq!(resumed.session_id(), "resume-test");
2247 let history = resumed.history();
2248 assert_eq!(history.len(), 2);
2249 assert_eq!(history[0].text(), "What is Rust?");
2250 }
2251
2252 #[tokio::test(flavor = "multi_thread")]
2253 async fn test_resume_session_not_found() {
2254 let store = Arc::new(crate::store::MemorySessionStore::new());
2255 let agent = Agent::from_config(test_config()).await.unwrap();
2256
2257 let opts = SessionOptions::new().with_session_store(store.clone());
2258 let result = agent.resume_session("nonexistent", opts);
2259 assert!(result.is_err());
2260 assert!(result.unwrap_err().to_string().contains("not found"));
2261 }
2262
2263 #[tokio::test(flavor = "multi_thread")]
2264 async fn test_resume_session_no_store() {
2265 let agent = Agent::from_config(test_config()).await.unwrap();
2266 let opts = SessionOptions::new();
2267 let result = agent.resume_session("any-id", opts);
2268 assert!(result.is_err());
2269 assert!(result.unwrap_err().to_string().contains("session_store"));
2270 }
2271
2272 #[tokio::test(flavor = "multi_thread")]
2273 async fn test_file_session_store_persistence() {
2274 let dir = tempfile::TempDir::new().unwrap();
2275 let store = Arc::new(
2276 crate::store::FileSessionStore::new(dir.path())
2277 .await
2278 .unwrap(),
2279 );
2280 let agent = Agent::from_config(test_config()).await.unwrap();
2281
2282 let opts = SessionOptions::new()
2284 .with_session_store(store.clone())
2285 .with_session_id("file-persist");
2286 let session = agent
2287 .session("/tmp/test-ws-file-persist", Some(opts))
2288 .unwrap();
2289 {
2290 let mut h = session.history.write().unwrap();
2291 h.push(Message::user("test message"));
2292 }
2293 session.save().await.unwrap();
2294
2295 let store2 = Arc::new(
2297 crate::store::FileSessionStore::new(dir.path())
2298 .await
2299 .unwrap(),
2300 );
2301 let data = store2.load("file-persist").await.unwrap().unwrap();
2302 assert_eq!(data.messages.len(), 1);
2303 }
2304
2305 #[tokio::test(flavor = "multi_thread")]
2306 async fn test_session_options_builders() {
2307 let opts = SessionOptions::new()
2308 .with_session_id("test-id")
2309 .with_auto_save(true);
2310 assert_eq!(opts.session_id, Some("test-id".to_string()));
2311 assert!(opts.auto_save);
2312 }
2313
2314 #[test]
2319 fn test_session_options_with_sandbox_sets_config() {
2320 use crate::sandbox::SandboxConfig;
2321 let cfg = SandboxConfig {
2322 image: "ubuntu:22.04".into(),
2323 memory_mb: 1024,
2324 ..SandboxConfig::default()
2325 };
2326 let opts = SessionOptions::new().with_sandbox(cfg);
2327 assert!(opts.sandbox_config.is_some());
2328 let sc = opts.sandbox_config.unwrap();
2329 assert_eq!(sc.image, "ubuntu:22.04");
2330 assert_eq!(sc.memory_mb, 1024);
2331 }
2332
2333 #[test]
2334 fn test_session_options_default_has_no_sandbox() {
2335 let opts = SessionOptions::default();
2336 assert!(opts.sandbox_config.is_none());
2337 }
2338
2339 #[tokio::test]
2340 async fn test_session_debug_includes_sandbox_config() {
2341 use crate::sandbox::SandboxConfig;
2342 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
2343 let debug = format!("{:?}", opts);
2344 assert!(debug.contains("sandbox_config"));
2345 }
2346
2347 #[tokio::test]
2348 async fn test_session_build_with_sandbox_config_no_feature_warn() {
2349 let agent = Agent::from_config(test_config()).await.unwrap();
2352 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
2353 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
2355 assert!(session.is_ok());
2356 }
2357
2358 #[tokio::test(flavor = "multi_thread")]
2363 async fn test_session_with_memory_store() {
2364 use a3s_memory::InMemoryStore;
2365 let store = Arc::new(InMemoryStore::new());
2366 let agent = Agent::from_config(test_config()).await.unwrap();
2367 let opts = SessionOptions::new().with_memory(store);
2368 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
2369 assert!(session.memory().is_some());
2370 }
2371
2372 #[tokio::test(flavor = "multi_thread")]
2373 async fn test_session_without_memory_store() {
2374 let agent = Agent::from_config(test_config()).await.unwrap();
2375 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
2376 assert!(session.memory().is_none());
2377 }
2378
2379 #[tokio::test(flavor = "multi_thread")]
2380 async fn test_session_memory_wired_into_config() {
2381 use a3s_memory::InMemoryStore;
2382 let store = Arc::new(InMemoryStore::new());
2383 let agent = Agent::from_config(test_config()).await.unwrap();
2384 let opts = SessionOptions::new().with_memory(store);
2385 let session = agent
2386 .session("/tmp/test-ws-mem-config", Some(opts))
2387 .unwrap();
2388 assert!(session.memory().is_some());
2390 }
2391
2392 #[tokio::test(flavor = "multi_thread")]
2393 async fn test_session_with_file_memory() {
2394 let dir = tempfile::TempDir::new().unwrap();
2395 let agent = Agent::from_config(test_config()).await.unwrap();
2396 let opts = SessionOptions::new().with_file_memory(dir.path());
2397 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
2398 assert!(session.memory().is_some());
2399 }
2400
2401 #[tokio::test(flavor = "multi_thread")]
2402 async fn test_memory_remember_and_recall() {
2403 use a3s_memory::InMemoryStore;
2404 let store = Arc::new(InMemoryStore::new());
2405 let agent = Agent::from_config(test_config()).await.unwrap();
2406 let opts = SessionOptions::new().with_memory(store);
2407 let session = agent
2408 .session("/tmp/test-ws-mem-recall", Some(opts))
2409 .unwrap();
2410
2411 let memory = session.memory().unwrap();
2412 memory
2413 .remember_success("write a file", &["write".to_string()], "done")
2414 .await
2415 .unwrap();
2416
2417 let results = memory.recall_similar("write", 5).await.unwrap();
2418 assert!(!results.is_empty());
2419 let stats = memory.stats().await.unwrap();
2420 assert_eq!(stats.long_term_count, 1);
2421 }
2422
2423 #[tokio::test(flavor = "multi_thread")]
2428 async fn test_session_tool_timeout_configured() {
2429 let agent = Agent::from_config(test_config()).await.unwrap();
2430 let opts = SessionOptions::new().with_tool_timeout(5000);
2431 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
2432 assert!(!session.id().is_empty());
2433 }
2434
2435 #[tokio::test(flavor = "multi_thread")]
2440 async fn test_session_without_queue_builds_ok() {
2441 let agent = Agent::from_config(test_config()).await.unwrap();
2442 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
2443 assert!(!session.id().is_empty());
2444 }
2445
2446 #[tokio::test(flavor = "multi_thread")]
2451 async fn test_concurrent_history_reads() {
2452 let agent = Agent::from_config(test_config()).await.unwrap();
2453 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
2454
2455 let handles: Vec<_> = (0..10)
2456 .map(|_| {
2457 let s = Arc::clone(&session);
2458 tokio::spawn(async move { s.history().len() })
2459 })
2460 .collect();
2461
2462 for h in handles {
2463 h.await.unwrap();
2464 }
2465 }
2466
2467 #[tokio::test(flavor = "multi_thread")]
2472 async fn test_session_no_init_warning_without_file_memory() {
2473 let agent = Agent::from_config(test_config()).await.unwrap();
2474 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
2475 assert!(session.init_warning().is_none());
2476 }
2477
2478 #[tokio::test(flavor = "multi_thread")]
2479 async fn test_session_with_mcp_manager_builds_ok() {
2480 use crate::mcp::manager::McpManager;
2481 let mcp = Arc::new(McpManager::new());
2482 let agent = Agent::from_config(test_config()).await.unwrap();
2483 let opts = SessionOptions::new().with_mcp(mcp);
2484 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
2486 assert!(!session.id().is_empty());
2487 }
2488
2489 #[test]
2490 fn test_session_command_is_pub() {
2491 use crate::SessionCommand;
2493 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
2494 }
2495
2496 #[tokio::test(flavor = "multi_thread")]
2497 async fn test_session_submit_with_queue_executes() {
2498 let agent = Agent::from_config(test_config()).await.unwrap();
2499 let qc = SessionQueueConfig::default();
2500 let opts = SessionOptions::new().with_queue_config(qc);
2501 let session = agent
2502 .session("/tmp/test-ws-submit-exec", Some(opts))
2503 .unwrap();
2504
2505 struct Echo(serde_json::Value);
2506 #[async_trait::async_trait]
2507 impl crate::queue::SessionCommand for Echo {
2508 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2509 Ok(self.0.clone())
2510 }
2511 fn command_type(&self) -> &str {
2512 "echo"
2513 }
2514 }
2515
2516 let rx = session
2517 .submit(
2518 SessionLane::Query,
2519 Box::new(Echo(serde_json::json!({"ok": true}))),
2520 )
2521 .await
2522 .expect("submit should succeed with queue configured");
2523
2524 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
2525 .await
2526 .expect("timed out waiting for command result")
2527 .expect("channel closed before result")
2528 .expect("command returned an error");
2529
2530 assert_eq!(result["ok"], true);
2531 }
2532}