1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{CommandContext, CommandRegistry};
21use crate::config::CodeConfig;
22use crate::error::Result;
23use crate::llm::{LlmClient, Message};
24use crate::prompts::SystemPromptSlots;
25use crate::queue::{
26 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
27 SessionQueueStats,
28};
29use crate::session_lane_queue::SessionLaneQueue;
30use crate::tools::{ToolContext, ToolExecutor};
31use a3s_lane::{DeadLetter, MetricsSnapshot};
32use a3s_memory::{FileMemoryStore, MemoryStore};
33use anyhow::Context;
34use std::path::{Path, PathBuf};
35use std::sync::{Arc, RwLock};
36use tokio::sync::{broadcast, mpsc};
37use tokio::task::JoinHandle;
38
39#[derive(Debug, Clone)]
45pub struct ToolCallResult {
46 pub name: String,
47 pub output: String,
48 pub exit_code: i32,
49}
50
51#[derive(Clone, Default)]
57pub struct SessionOptions {
58 pub model: Option<String>,
60 pub agent_dirs: Vec<PathBuf>,
63 pub queue_config: Option<SessionQueueConfig>,
68 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
70 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
72 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
74 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
76 pub planning_enabled: bool,
78 pub goal_tracking: bool,
80 pub skill_dirs: Vec<PathBuf>,
83 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
85 pub memory_store: Option<Arc<dyn MemoryStore>>,
87 pub(crate) file_memory_dir: Option<PathBuf>,
89 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
91 pub session_id: Option<String>,
93 pub auto_save: bool,
95 pub max_parse_retries: Option<u32>,
98 pub tool_timeout_ms: Option<u64>,
101 pub circuit_breaker_threshold: Option<u32>,
105 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
111 pub auto_compact: bool,
113 pub auto_compact_threshold: Option<f32>,
116 pub continuation_enabled: Option<bool>,
119 pub max_continuation_turns: Option<u32>,
122 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
127 pub temperature: Option<f32>,
129 pub thinking_budget: Option<usize>,
131 pub prompt_slots: Option<SystemPromptSlots>,
137}
138
139impl std::fmt::Debug for SessionOptions {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("SessionOptions")
142 .field("model", &self.model)
143 .field("agent_dirs", &self.agent_dirs)
144 .field("skill_dirs", &self.skill_dirs)
145 .field("queue_config", &self.queue_config)
146 .field("security_provider", &self.security_provider.is_some())
147 .field("context_providers", &self.context_providers.len())
148 .field("confirmation_manager", &self.confirmation_manager.is_some())
149 .field("permission_checker", &self.permission_checker.is_some())
150 .field("planning_enabled", &self.planning_enabled)
151 .field("goal_tracking", &self.goal_tracking)
152 .field(
153 "skill_registry",
154 &self
155 .skill_registry
156 .as_ref()
157 .map(|r| format!("{} skills", r.len())),
158 )
159 .field("memory_store", &self.memory_store.is_some())
160 .field("session_store", &self.session_store.is_some())
161 .field("session_id", &self.session_id)
162 .field("auto_save", &self.auto_save)
163 .field("max_parse_retries", &self.max_parse_retries)
164 .field("tool_timeout_ms", &self.tool_timeout_ms)
165 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
166 .field("sandbox_config", &self.sandbox_config)
167 .field("auto_compact", &self.auto_compact)
168 .field("auto_compact_threshold", &self.auto_compact_threshold)
169 .field("continuation_enabled", &self.continuation_enabled)
170 .field("max_continuation_turns", &self.max_continuation_turns)
171 .field("mcp_manager", &self.mcp_manager.is_some())
172 .field("temperature", &self.temperature)
173 .field("thinking_budget", &self.thinking_budget)
174 .field("prompt_slots", &self.prompt_slots.is_some())
175 .finish()
176 }
177}
178
179impl SessionOptions {
180 pub fn new() -> Self {
181 Self::default()
182 }
183
184 pub fn with_model(mut self, model: impl Into<String>) -> Self {
185 self.model = Some(model.into());
186 self
187 }
188
189 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
190 self.agent_dirs.push(dir.into());
191 self
192 }
193
194 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
195 self.queue_config = Some(config);
196 self
197 }
198
199 pub fn with_default_security(mut self) -> Self {
201 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
202 self
203 }
204
205 pub fn with_security_provider(
207 mut self,
208 provider: Arc<dyn crate::security::SecurityProvider>,
209 ) -> Self {
210 self.security_provider = Some(provider);
211 self
212 }
213
214 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
216 let config = crate::context::FileSystemContextConfig::new(root_path);
217 self.context_providers
218 .push(Arc::new(crate::context::FileSystemContextProvider::new(
219 config,
220 )));
221 self
222 }
223
224 pub fn with_context_provider(
226 mut self,
227 provider: Arc<dyn crate::context::ContextProvider>,
228 ) -> Self {
229 self.context_providers.push(provider);
230 self
231 }
232
233 pub fn with_confirmation_manager(
235 mut self,
236 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
237 ) -> Self {
238 self.confirmation_manager = Some(manager);
239 self
240 }
241
242 pub fn with_permission_checker(
244 mut self,
245 checker: Arc<dyn crate::permissions::PermissionChecker>,
246 ) -> Self {
247 self.permission_checker = Some(checker);
248 self
249 }
250
251 pub fn with_permissive_policy(self) -> Self {
258 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
259 }
260
261 pub fn with_planning(mut self, enabled: bool) -> Self {
263 self.planning_enabled = enabled;
264 self
265 }
266
267 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
269 self.goal_tracking = enabled;
270 self
271 }
272
273 pub fn with_builtin_skills(mut self) -> Self {
275 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
276 self
277 }
278
279 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
281 self.skill_registry = Some(registry);
282 self
283 }
284
285 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
288 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
289 self
290 }
291
292 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
294 let registry = self
295 .skill_registry
296 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
297 if let Err(e) = registry.load_from_dir(&dir) {
298 tracing::warn!(
299 dir = %dir.as_ref().display(),
300 error = %e,
301 "Failed to load skills from directory — continuing without them"
302 );
303 }
304 self.skill_registry = Some(registry);
305 self
306 }
307
308 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
310 self.memory_store = Some(store);
311 self
312 }
313
314 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
320 self.file_memory_dir = Some(dir.into());
321 self
322 }
323
324 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
326 self.session_store = Some(store);
327 self
328 }
329
330 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
332 let dir = dir.into();
333 match tokio::runtime::Handle::try_current() {
334 Ok(handle) => {
335 match tokio::task::block_in_place(|| {
336 handle.block_on(crate::store::FileSessionStore::new(dir))
337 }) {
338 Ok(store) => {
339 self.session_store =
340 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
341 }
342 Err(e) => {
343 tracing::warn!("Failed to create file session store: {}", e);
344 }
345 }
346 }
347 Err(_) => {
348 tracing::warn!(
349 "No async runtime available for file session store — persistence disabled"
350 );
351 }
352 }
353 self
354 }
355
356 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
358 self.session_id = Some(id.into());
359 self
360 }
361
362 pub fn with_auto_save(mut self, enabled: bool) -> Self {
364 self.auto_save = enabled;
365 self
366 }
367
368 pub fn with_parse_retries(mut self, max: u32) -> Self {
374 self.max_parse_retries = Some(max);
375 self
376 }
377
378 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
384 self.tool_timeout_ms = Some(timeout_ms);
385 self
386 }
387
388 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
394 self.circuit_breaker_threshold = Some(threshold);
395 self
396 }
397
398 pub fn with_resilience_defaults(self) -> Self {
404 self.with_parse_retries(2)
405 .with_tool_timeout(120_000)
406 .with_circuit_breaker(3)
407 }
408
409 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
428 self.sandbox_config = Some(config);
429 self
430 }
431
432 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
437 self.auto_compact = enabled;
438 self
439 }
440
441 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
443 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
444 self
445 }
446
447 pub fn with_continuation(mut self, enabled: bool) -> Self {
452 self.continuation_enabled = Some(enabled);
453 self
454 }
455
456 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
458 self.max_continuation_turns = Some(turns);
459 self
460 }
461
462 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
467 self.mcp_manager = Some(manager);
468 self
469 }
470
471 pub fn with_temperature(mut self, temperature: f32) -> Self {
472 self.temperature = Some(temperature);
473 self
474 }
475
476 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
477 self.thinking_budget = Some(budget);
478 self
479 }
480
481 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
486 self.prompt_slots = Some(slots);
487 self
488 }
489}
490
491pub struct Agent {
500 llm_client: Arc<dyn LlmClient>,
501 code_config: CodeConfig,
502 config: AgentConfig,
503 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
505 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
508}
509
510impl std::fmt::Debug for Agent {
511 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
512 f.debug_struct("Agent").finish()
513 }
514}
515
516impl Agent {
517 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
521 let source = config_source.into();
522
523 let expanded = if let Some(rest) = source.strip_prefix("~/") {
525 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
526 if let Some(home) = home {
527 format!("{}/{}", home.to_string_lossy(), rest)
528 } else {
529 source.clone()
530 }
531 } else {
532 source.clone()
533 };
534
535 let path = Path::new(&expanded);
536
537 let config = if path.extension().is_some() && path.exists() {
538 CodeConfig::from_file(path)
539 .with_context(|| format!("Failed to load config: {}", path.display()))?
540 } else {
541 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
543 };
544
545 Self::from_config(config).await
546 }
547
548 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
553 Self::new(config_source).await
554 }
555
556 pub async fn from_config(config: CodeConfig) -> Result<Self> {
558 let llm_config = config
559 .default_llm_config()
560 .context("default_model must be set in 'provider/model' format with a valid API key")?;
561 let llm_client = crate::llm::create_client_with_config(llm_config);
562
563 let agent_config = AgentConfig {
564 max_tool_rounds: config
565 .max_tool_rounds
566 .unwrap_or(AgentConfig::default().max_tool_rounds),
567 ..AgentConfig::default()
568 };
569
570 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
572 (None, vec![])
573 } else {
574 let manager = Arc::new(crate::mcp::manager::McpManager::new());
575 for server in &config.mcp_servers {
576 if !server.enabled {
577 continue;
578 }
579 manager.register_server(server.clone()).await;
580 if let Err(e) = manager.connect(&server.name).await {
581 tracing::warn!(
582 server = %server.name,
583 error = %e,
584 "Failed to connect to MCP server — skipping"
585 );
586 }
587 }
588 let tools = manager.get_all_tools().await;
590 (Some(manager), tools)
591 };
592
593 let mut agent = Agent {
594 llm_client,
595 code_config: config,
596 config: agent_config,
597 global_mcp,
598 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
599 };
600
601 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
603 for dir in &agent.code_config.skill_dirs.clone() {
604 if let Err(e) = registry.load_from_dir(dir) {
605 tracing::warn!(
606 dir = %dir.display(),
607 error = %e,
608 "Failed to load skills from directory — skipping"
609 );
610 }
611 }
612 agent.config.skill_registry = Some(registry);
613
614 Ok(agent)
615 }
616
617 pub async fn refresh_mcp_tools(&self) -> Result<()> {
625 if let Some(ref mcp) = self.global_mcp {
626 let fresh = mcp.get_all_tools().await;
627 *self
628 .global_mcp_tools
629 .lock()
630 .expect("global_mcp_tools lock poisoned") = fresh;
631 }
632 Ok(())
633 }
634
635 pub fn session(
640 &self,
641 workspace: impl Into<String>,
642 options: Option<SessionOptions>,
643 ) -> Result<AgentSession> {
644 let opts = options.unwrap_or_default();
645
646 let llm_client = if let Some(ref model) = opts.model {
647 let (provider_name, model_id) = model
648 .split_once('/')
649 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
650
651 let mut llm_config = self
652 .code_config
653 .llm_config(provider_name, model_id)
654 .with_context(|| {
655 format!("provider '{provider_name}' or model '{model_id}' not found in config")
656 })?;
657
658 if let Some(temp) = opts.temperature {
659 llm_config = llm_config.with_temperature(temp);
660 }
661 if let Some(budget) = opts.thinking_budget {
662 llm_config = llm_config.with_thinking_budget(budget);
663 }
664
665 crate::llm::create_client_with_config(llm_config)
666 } else {
667 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
668 tracing::warn!(
669 "temperature/thinking_budget set without model override — these will be ignored. \
670 Use with_model() to apply LLM parameter overrides."
671 );
672 }
673 self.llm_client.clone()
674 };
675
676 let merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
679 (Some(global), Some(session)) => {
680 let global = Arc::clone(global);
681 let session_mgr = Arc::clone(session);
682 match tokio::runtime::Handle::try_current() {
683 Ok(handle) => {
684 tokio::task::block_in_place(|| {
685 handle.block_on(async move {
686 for config in session_mgr.all_configs().await {
687 let name = config.name.clone();
688 global.register_server(config).await;
689 if let Err(e) = global.connect(&name).await {
690 tracing::warn!(
691 server = %name,
692 error = %e,
693 "Failed to connect session-level MCP server — skipping"
694 );
695 }
696 }
697 })
698 });
699 }
700 Err(_) => {
701 tracing::warn!(
702 "No async runtime available to merge session-level MCP servers \
703 into global manager — session MCP servers will not be available"
704 );
705 }
706 }
707 SessionOptions {
708 mcp_manager: Some(Arc::clone(self.global_mcp.as_ref().unwrap())),
709 ..opts
710 }
711 }
712 (Some(global), None) => SessionOptions {
713 mcp_manager: Some(Arc::clone(global)),
714 ..opts
715 },
716 _ => opts,
717 };
718
719 self.build_session(workspace.into(), llm_client, &merged_opts)
720 }
721
722 pub fn resume_session(
730 &self,
731 session_id: &str,
732 options: SessionOptions,
733 ) -> Result<AgentSession> {
734 let store = options.session_store.as_ref().ok_or_else(|| {
735 crate::error::CodeError::Session(
736 "resume_session requires a session_store in SessionOptions".to_string(),
737 )
738 })?;
739
740 let data = match tokio::runtime::Handle::try_current() {
742 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
743 .map_err(|e| {
744 crate::error::CodeError::Session(format!(
745 "Failed to load session {}: {}",
746 session_id, e
747 ))
748 })?,
749 Err(_) => {
750 return Err(crate::error::CodeError::Session(
751 "No async runtime available for session resume".to_string(),
752 ))
753 }
754 };
755
756 let data = data.ok_or_else(|| {
757 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
758 })?;
759
760 let mut opts = options;
762 opts.session_id = Some(data.id.clone());
763
764 let llm_client = if let Some(ref model) = opts.model {
765 let (provider_name, model_id) = model
766 .split_once('/')
767 .context("model format must be 'provider/model'")?;
768 let llm_config = self
769 .code_config
770 .llm_config(provider_name, model_id)
771 .with_context(|| {
772 format!("provider '{provider_name}' or model '{model_id}' not found")
773 })?;
774 crate::llm::create_client_with_config(llm_config)
775 } else {
776 self.llm_client.clone()
777 };
778
779 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
780
781 *session.history.write().unwrap() = data.messages;
783
784 Ok(session)
785 }
786
787 fn build_session(
788 &self,
789 workspace: String,
790 llm_client: Arc<dyn LlmClient>,
791 opts: &SessionOptions,
792 ) -> Result<AgentSession> {
793 let canonical =
794 std::fs::canonicalize(&workspace).unwrap_or_else(|_| PathBuf::from(&workspace));
795
796 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
797
798 {
801 use crate::subagent::{load_agents_from_dir, AgentRegistry};
802 use crate::tools::register_task;
803 let agent_registry = AgentRegistry::new();
804 for dir in self
805 .code_config
806 .agent_dirs
807 .iter()
808 .chain(opts.agent_dirs.iter())
809 {
810 for agent in load_agents_from_dir(dir) {
811 agent_registry.register(agent);
812 }
813 }
814 register_task(
815 tool_executor.registry(),
816 Arc::clone(&llm_client),
817 Arc::new(agent_registry),
818 canonical.display().to_string(),
819 );
820 }
821
822 if let Some(ref mcp) = opts.mcp_manager {
825 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
828 Arc::as_ptr(mcp),
829 self.global_mcp
830 .as_ref()
831 .map(Arc::as_ptr)
832 .unwrap_or(std::ptr::null()),
833 ) {
834 self.global_mcp_tools
836 .lock()
837 .expect("global_mcp_tools lock poisoned")
838 .clone()
839 } else {
840 match tokio::runtime::Handle::try_current() {
842 Ok(handle) => {
843 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
844 }
845 Err(_) => {
846 tracing::warn!(
847 "No async runtime available for session-level MCP tools — \
848 MCP tools will not be registered"
849 );
850 vec![]
851 }
852 }
853 };
854
855 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
856 std::collections::HashMap::new();
857 for (server, tool) in all_tools {
858 by_server.entry(server).or_default().push(tool);
859 }
860 for (server_name, tools) in by_server {
861 for tool in
862 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
863 {
864 tool_executor.register_dynamic_tool(tool);
865 }
866 }
867 }
868
869 let tool_defs = tool_executor.definitions();
870
871 let mut prompt_slots = opts
873 .prompt_slots
874 .clone()
875 .unwrap_or_else(|| self.config.prompt_slots.clone());
876
877 let base_registry = self
881 .config
882 .skill_registry
883 .as_deref()
884 .map(|r| r.fork())
885 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
886 if let Some(ref r) = opts.skill_registry {
888 for skill in r.all() {
889 base_registry.register_unchecked(skill);
890 }
891 }
892 for dir in &opts.skill_dirs {
894 if let Err(e) = base_registry.load_from_dir(dir) {
895 tracing::warn!(
896 dir = %dir.display(),
897 error = %e,
898 "Failed to load session skill dir — skipping"
899 );
900 }
901 }
902 let effective_registry = Arc::new(base_registry);
903
904 let skill_prompt = effective_registry.to_system_prompt();
906 if !skill_prompt.is_empty() {
907 prompt_slots.extra = match prompt_slots.extra {
908 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
909 None => Some(skill_prompt),
910 };
911 }
912
913 let mut init_warning: Option<String> = None;
915 let memory = {
916 let store = if let Some(ref store) = opts.memory_store {
917 Some(Arc::clone(store))
918 } else if let Some(ref dir) = opts.file_memory_dir {
919 match tokio::runtime::Handle::try_current() {
920 Ok(handle) => {
921 let dir = dir.clone();
922 match tokio::task::block_in_place(|| {
923 handle.block_on(FileMemoryStore::new(dir))
924 }) {
925 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
926 Err(e) => {
927 let msg = format!("Failed to create file memory store: {}", e);
928 tracing::warn!("{}", msg);
929 init_warning = Some(msg);
930 None
931 }
932 }
933 }
934 Err(_) => {
935 let msg =
936 "No async runtime available for file memory store — memory disabled"
937 .to_string();
938 tracing::warn!("{}", msg);
939 init_warning = Some(msg);
940 None
941 }
942 }
943 } else {
944 None
945 };
946 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
947 };
948
949 let base = self.config.clone();
950 let config = AgentConfig {
951 prompt_slots,
952 tools: tool_defs,
953 security_provider: opts.security_provider.clone(),
954 permission_checker: opts.permission_checker.clone(),
955 confirmation_manager: opts.confirmation_manager.clone(),
956 context_providers: opts.context_providers.clone(),
957 planning_enabled: opts.planning_enabled,
958 goal_tracking: opts.goal_tracking,
959 skill_registry: Some(effective_registry),
960 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
961 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
962 circuit_breaker_threshold: opts
963 .circuit_breaker_threshold
964 .unwrap_or(base.circuit_breaker_threshold),
965 auto_compact: opts.auto_compact,
966 auto_compact_threshold: opts
967 .auto_compact_threshold
968 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
969 max_context_tokens: base.max_context_tokens,
970 llm_client: Some(Arc::clone(&llm_client)),
971 memory: memory.clone(),
972 continuation_enabled: opts
973 .continuation_enabled
974 .unwrap_or(base.continuation_enabled),
975 max_continuation_turns: opts
976 .max_continuation_turns
977 .unwrap_or(base.max_continuation_turns),
978 ..base
979 };
980
981 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
984 let command_queue = if let Some(ref queue_config) = opts.queue_config {
985 let session_id = uuid::Uuid::new_v4().to_string();
986 let rt = tokio::runtime::Handle::try_current();
987
988 match rt {
989 Ok(handle) => {
990 let queue = tokio::task::block_in_place(|| {
992 handle.block_on(SessionLaneQueue::new(
993 &session_id,
994 queue_config.clone(),
995 agent_event_tx.clone(),
996 ))
997 });
998 match queue {
999 Ok(q) => {
1000 let q = Arc::new(q);
1002 let q2 = Arc::clone(&q);
1003 tokio::task::block_in_place(|| {
1004 handle.block_on(async { q2.start().await.ok() })
1005 });
1006 Some(q)
1007 }
1008 Err(e) => {
1009 tracing::warn!("Failed to create session lane queue: {}", e);
1010 None
1011 }
1012 }
1013 }
1014 Err(_) => {
1015 tracing::warn!(
1016 "No async runtime available for queue creation — queue disabled"
1017 );
1018 None
1019 }
1020 }
1021 } else {
1022 None
1023 };
1024
1025 let mut tool_context = ToolContext::new(canonical.clone());
1027 if let Some(ref search_config) = self.code_config.search {
1028 tool_context = tool_context.with_search_config(search_config.clone());
1029 }
1030 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1031
1032 #[cfg(feature = "sandbox")]
1034 if let Some(ref sandbox_cfg) = opts.sandbox_config {
1035 let handle: Arc<dyn crate::sandbox::BashSandbox> =
1036 Arc::new(crate::sandbox::BoxSandboxHandle::new(
1037 sandbox_cfg.clone(),
1038 canonical.display().to_string(),
1039 ));
1040 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1043 tool_context = tool_context.with_sandbox(handle);
1044 }
1045 #[cfg(not(feature = "sandbox"))]
1046 if opts.sandbox_config.is_some() {
1047 tracing::warn!(
1048 "sandbox_config is set but the `sandbox` Cargo feature is not enabled \
1049 — bash commands will run locally"
1050 );
1051 }
1052
1053 let session_id = opts
1054 .session_id
1055 .clone()
1056 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1057
1058 let session_store = if opts.session_store.is_some() {
1060 opts.session_store.clone()
1061 } else if let Some(ref dir) = self.code_config.sessions_dir {
1062 match tokio::runtime::Handle::try_current() {
1063 Ok(handle) => {
1064 let dir = dir.clone();
1065 match tokio::task::block_in_place(|| {
1066 handle.block_on(crate::store::FileSessionStore::new(dir))
1067 }) {
1068 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1069 Err(e) => {
1070 tracing::warn!(
1071 "Failed to create session store from sessions_dir: {}",
1072 e
1073 );
1074 None
1075 }
1076 }
1077 }
1078 Err(_) => {
1079 tracing::warn!(
1080 "No async runtime for sessions_dir store — persistence disabled"
1081 );
1082 None
1083 }
1084 }
1085 } else {
1086 None
1087 };
1088
1089 Ok(AgentSession {
1090 llm_client,
1091 tool_executor,
1092 tool_context,
1093 memory: config.memory.clone(),
1094 config,
1095 workspace: canonical,
1096 session_id,
1097 history: RwLock::new(Vec::new()),
1098 command_queue,
1099 session_store,
1100 auto_save: opts.auto_save,
1101 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1102 init_warning,
1103 command_registry: CommandRegistry::new(),
1104 model_name: opts
1105 .model
1106 .clone()
1107 .or_else(|| self.code_config.default_model.clone())
1108 .unwrap_or_else(|| "unknown".to_string()),
1109 mcp_manager: opts
1110 .mcp_manager
1111 .clone()
1112 .or_else(|| self.global_mcp.clone())
1113 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1114 })
1115 }
1116}
1117
1118pub struct AgentSession {
1127 llm_client: Arc<dyn LlmClient>,
1128 tool_executor: Arc<ToolExecutor>,
1129 tool_context: ToolContext,
1130 config: AgentConfig,
1131 workspace: PathBuf,
1132 session_id: String,
1134 history: RwLock<Vec<Message>>,
1136 command_queue: Option<Arc<SessionLaneQueue>>,
1138 memory: Option<Arc<crate::memory::AgentMemory>>,
1140 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1142 auto_save: bool,
1144 hook_engine: Arc<crate::hooks::HookEngine>,
1146 init_warning: Option<String>,
1148 command_registry: CommandRegistry,
1150 model_name: String,
1152 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1154}
1155
1156impl std::fmt::Debug for AgentSession {
1157 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1158 f.debug_struct("AgentSession")
1159 .field("session_id", &self.session_id)
1160 .field("workspace", &self.workspace.display().to_string())
1161 .field("auto_save", &self.auto_save)
1162 .finish()
1163 }
1164}
1165
1166impl AgentSession {
1167 fn build_agent_loop(&self) -> AgentLoop {
1171 let mut config = self.config.clone();
1172 config.hook_engine =
1173 Some(Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>);
1174 config.tools = self.tool_executor.definitions();
1178 let mut agent_loop = AgentLoop::new(
1179 self.llm_client.clone(),
1180 self.tool_executor.clone(),
1181 self.tool_context.clone(),
1182 config,
1183 );
1184 if let Some(ref queue) = self.command_queue {
1185 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1186 }
1187 agent_loop
1188 }
1189
1190 fn build_command_context(&self) -> CommandContext {
1192 let history = self.history.read().unwrap();
1193
1194 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1196
1197 let mut mcp_map: std::collections::HashMap<String, usize> =
1199 std::collections::HashMap::new();
1200 for name in &tool_names {
1201 if let Some(rest) = name.strip_prefix("mcp__") {
1202 if let Some((server, _)) = rest.split_once("__") {
1203 *mcp_map.entry(server.to_string()).or_default() += 1;
1204 }
1205 }
1206 }
1207 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1208 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1209
1210 CommandContext {
1211 session_id: self.session_id.clone(),
1212 workspace: self.workspace.display().to_string(),
1213 model: self.model_name.clone(),
1214 history_len: history.len(),
1215 total_tokens: 0,
1216 total_cost: 0.0,
1217 tool_names,
1218 mcp_servers,
1219 }
1220 }
1221
1222 pub fn command_registry(&self) -> &CommandRegistry {
1224 &self.command_registry
1225 }
1226
1227 pub fn register_command(&mut self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1229 self.command_registry.register(cmd);
1230 }
1231
1232 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1241 if CommandRegistry::is_command(prompt) {
1243 let ctx = self.build_command_context();
1244 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1245 return Ok(AgentResult {
1246 text: output.text,
1247 messages: history
1248 .map(|h| h.to_vec())
1249 .unwrap_or_else(|| self.history.read().unwrap().clone()),
1250 tool_calls_count: 0,
1251 usage: crate::llm::TokenUsage::default(),
1252 });
1253 }
1254 }
1255
1256 if let Some(ref w) = self.init_warning {
1257 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1258 }
1259 let agent_loop = self.build_agent_loop();
1260
1261 let use_internal = history.is_none();
1262 let effective_history = match history {
1263 Some(h) => h.to_vec(),
1264 None => self.history.read().unwrap().clone(),
1265 };
1266
1267 let result = agent_loop.execute(&effective_history, prompt, None).await?;
1268
1269 if use_internal {
1272 *self.history.write().unwrap() = result.messages.clone();
1273
1274 if self.auto_save {
1276 if let Err(e) = self.save().await {
1277 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1278 }
1279 }
1280 }
1281
1282 Ok(result)
1283 }
1284
1285 pub async fn send_with_attachments(
1290 &self,
1291 prompt: &str,
1292 attachments: &[crate::llm::Attachment],
1293 history: Option<&[Message]>,
1294 ) -> Result<AgentResult> {
1295 let use_internal = history.is_none();
1299 let mut effective_history = match history {
1300 Some(h) => h.to_vec(),
1301 None => self.history.read().unwrap().clone(),
1302 };
1303 effective_history.push(Message::user_with_attachments(prompt, attachments));
1304
1305 let agent_loop = self.build_agent_loop();
1306 let result = agent_loop
1307 .execute_from_messages(effective_history, None, None)
1308 .await?;
1309
1310 if use_internal {
1311 *self.history.write().unwrap() = result.messages.clone();
1312 if self.auto_save {
1313 if let Err(e) = self.save().await {
1314 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1315 }
1316 }
1317 }
1318
1319 Ok(result)
1320 }
1321
1322 pub async fn stream_with_attachments(
1327 &self,
1328 prompt: &str,
1329 attachments: &[crate::llm::Attachment],
1330 history: Option<&[Message]>,
1331 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1332 let (tx, rx) = mpsc::channel(256);
1333 let mut effective_history = match history {
1334 Some(h) => h.to_vec(),
1335 None => self.history.read().unwrap().clone(),
1336 };
1337 effective_history.push(Message::user_with_attachments(prompt, attachments));
1338
1339 let agent_loop = self.build_agent_loop();
1340 let handle = tokio::spawn(async move {
1341 let _ = agent_loop
1342 .execute_from_messages(effective_history, None, Some(tx))
1343 .await;
1344 });
1345
1346 Ok((rx, handle))
1347 }
1348
1349 pub async fn stream(
1359 &self,
1360 prompt: &str,
1361 history: Option<&[Message]>,
1362 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1363 if CommandRegistry::is_command(prompt) {
1365 let ctx = self.build_command_context();
1366 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1367 let (tx, rx) = mpsc::channel(256);
1368 let handle = tokio::spawn(async move {
1369 let _ = tx
1370 .send(AgentEvent::TextDelta {
1371 text: output.text.clone(),
1372 })
1373 .await;
1374 let _ = tx
1375 .send(AgentEvent::End {
1376 text: output.text.clone(),
1377 usage: crate::llm::TokenUsage::default(),
1378 })
1379 .await;
1380 });
1381 return Ok((rx, handle));
1382 }
1383 }
1384
1385 let (tx, rx) = mpsc::channel(256);
1386 let agent_loop = self.build_agent_loop();
1387 let effective_history = match history {
1388 Some(h) => h.to_vec(),
1389 None => self.history.read().unwrap().clone(),
1390 };
1391 let prompt = prompt.to_string();
1392
1393 let handle = tokio::spawn(async move {
1394 let _ = agent_loop
1395 .execute(&effective_history, &prompt, Some(tx))
1396 .await;
1397 });
1398
1399 Ok((rx, handle))
1400 }
1401
1402 pub fn history(&self) -> Vec<Message> {
1404 self.history.read().unwrap().clone()
1405 }
1406
1407 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
1409 self.memory.as_ref()
1410 }
1411
1412 pub fn id(&self) -> &str {
1414 &self.session_id
1415 }
1416
1417 pub fn workspace(&self) -> &std::path::Path {
1419 &self.workspace
1420 }
1421
1422 pub fn init_warning(&self) -> Option<&str> {
1424 self.init_warning.as_deref()
1425 }
1426
1427 pub fn session_id(&self) -> &str {
1429 &self.session_id
1430 }
1431
1432 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
1438 self.tool_executor.definitions()
1439 }
1440
1441 pub fn tool_names(&self) -> Vec<String> {
1447 self.tool_executor
1448 .definitions()
1449 .into_iter()
1450 .map(|t| t.name)
1451 .collect()
1452 }
1453
1454 pub fn register_hook(&self, hook: crate::hooks::Hook) {
1460 self.hook_engine.register(hook);
1461 }
1462
1463 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
1465 self.hook_engine.unregister(hook_id)
1466 }
1467
1468 pub fn register_hook_handler(
1470 &self,
1471 hook_id: &str,
1472 handler: Arc<dyn crate::hooks::HookHandler>,
1473 ) {
1474 self.hook_engine.register_handler(hook_id, handler);
1475 }
1476
1477 pub fn unregister_hook_handler(&self, hook_id: &str) {
1479 self.hook_engine.unregister_handler(hook_id);
1480 }
1481
1482 pub fn hook_count(&self) -> usize {
1484 self.hook_engine.hook_count()
1485 }
1486
1487 pub async fn save(&self) -> Result<()> {
1491 let store = match &self.session_store {
1492 Some(s) => s,
1493 None => return Ok(()),
1494 };
1495
1496 let history = self.history.read().unwrap().clone();
1497 let now = chrono::Utc::now().timestamp();
1498
1499 let data = crate::store::SessionData {
1500 id: self.session_id.clone(),
1501 config: crate::session::SessionConfig {
1502 name: String::new(),
1503 workspace: self.workspace.display().to_string(),
1504 system_prompt: Some(self.config.prompt_slots.build()),
1505 max_context_length: 200_000,
1506 auto_compact: false,
1507 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
1508 storage_type: crate::config::StorageBackend::File,
1509 queue_config: None,
1510 confirmation_policy: None,
1511 permission_policy: None,
1512 parent_id: None,
1513 security_config: None,
1514 hook_engine: None,
1515 planning_enabled: self.config.planning_enabled,
1516 goal_tracking: self.config.goal_tracking,
1517 },
1518 state: crate::session::SessionState::Active,
1519 messages: history,
1520 context_usage: crate::session::ContextUsage::default(),
1521 total_usage: crate::llm::TokenUsage::default(),
1522 total_cost: 0.0,
1523 model_name: None,
1524 cost_records: Vec::new(),
1525 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
1526 thinking_enabled: false,
1527 thinking_budget: None,
1528 created_at: now,
1529 updated_at: now,
1530 llm_config: None,
1531 tasks: Vec::new(),
1532 parent_id: None,
1533 };
1534
1535 store.save(&data).await?;
1536 tracing::debug!("Session {} saved", self.session_id);
1537 Ok(())
1538 }
1539
1540 pub async fn read_file(&self, path: &str) -> Result<String> {
1542 let args = serde_json::json!({ "file_path": path });
1543 let result = self.tool_executor.execute("read", &args).await?;
1544 Ok(result.output)
1545 }
1546
1547 pub async fn bash(&self, command: &str) -> Result<String> {
1552 let args = serde_json::json!({ "command": command });
1553 let result = self
1554 .tool_executor
1555 .execute_with_context("bash", &args, &self.tool_context)
1556 .await?;
1557 Ok(result.output)
1558 }
1559
1560 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
1562 let args = serde_json::json!({ "pattern": pattern });
1563 let result = self.tool_executor.execute("glob", &args).await?;
1564 let files: Vec<String> = result
1565 .output
1566 .lines()
1567 .filter(|l| !l.is_empty())
1568 .map(|l| l.to_string())
1569 .collect();
1570 Ok(files)
1571 }
1572
1573 pub async fn grep(&self, pattern: &str) -> Result<String> {
1575 let args = serde_json::json!({ "pattern": pattern });
1576 let result = self.tool_executor.execute("grep", &args).await?;
1577 Ok(result.output)
1578 }
1579
1580 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
1582 let result = self.tool_executor.execute(name, &args).await?;
1583 Ok(ToolCallResult {
1584 name: name.to_string(),
1585 output: result.output,
1586 exit_code: result.exit_code,
1587 })
1588 }
1589
1590 pub fn has_queue(&self) -> bool {
1596 self.command_queue.is_some()
1597 }
1598
1599 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
1603 if let Some(ref queue) = self.command_queue {
1604 queue.set_lane_handler(lane, config).await;
1605 }
1606 }
1607
1608 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
1612 if let Some(ref queue) = self.command_queue {
1613 queue.complete_external_task(task_id, result).await
1614 } else {
1615 false
1616 }
1617 }
1618
1619 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
1621 if let Some(ref queue) = self.command_queue {
1622 queue.pending_external_tasks().await
1623 } else {
1624 Vec::new()
1625 }
1626 }
1627
1628 pub async fn queue_stats(&self) -> SessionQueueStats {
1630 if let Some(ref queue) = self.command_queue {
1631 queue.stats().await
1632 } else {
1633 SessionQueueStats::default()
1634 }
1635 }
1636
1637 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
1639 if let Some(ref queue) = self.command_queue {
1640 queue.metrics_snapshot().await
1641 } else {
1642 None
1643 }
1644 }
1645
1646 pub async fn submit(
1652 &self,
1653 lane: SessionLane,
1654 command: Box<dyn crate::queue::SessionCommand>,
1655 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
1656 let queue = self
1657 .command_queue
1658 .as_ref()
1659 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1660 Ok(queue.submit(lane, command).await)
1661 }
1662
1663 pub async fn submit_batch(
1670 &self,
1671 lane: SessionLane,
1672 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
1673 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
1674 {
1675 let queue = self
1676 .command_queue
1677 .as_ref()
1678 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1679 Ok(queue.submit_batch(lane, commands).await)
1680 }
1681
1682 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
1684 if let Some(ref queue) = self.command_queue {
1685 queue.dead_letters().await
1686 } else {
1687 Vec::new()
1688 }
1689 }
1690
1691 pub async fn add_mcp_server(
1702 &self,
1703 config: crate::mcp::McpServerConfig,
1704 ) -> crate::error::Result<usize> {
1705 let server_name = config.name.clone();
1706 self.mcp_manager.register_server(config).await;
1707 self.mcp_manager.connect(&server_name).await.map_err(|e| {
1708 crate::error::CodeError::Tool {
1709 tool: server_name.clone(),
1710 message: format!("Failed to connect MCP server: {}", e),
1711 }
1712 })?;
1713
1714 let tools = self.mcp_manager.get_server_tools(&server_name).await;
1715 let count = tools.len();
1716
1717 for tool in
1718 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
1719 {
1720 self.tool_executor.register_dynamic_tool(tool);
1721 }
1722
1723 tracing::info!(
1724 session_id = %self.session_id,
1725 server = server_name,
1726 tools = count,
1727 "MCP server added to live session"
1728 );
1729
1730 Ok(count)
1731 }
1732
1733 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
1738 self.tool_executor
1739 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
1740 self.mcp_manager
1741 .disconnect(server_name)
1742 .await
1743 .map_err(|e| crate::error::CodeError::Tool {
1744 tool: server_name.to_string(),
1745 message: format!("Failed to disconnect MCP server: {}", e),
1746 })?;
1747 tracing::info!(
1748 session_id = %self.session_id,
1749 server = server_name,
1750 "MCP server removed from live session"
1751 );
1752 Ok(())
1753 }
1754
1755 pub async fn mcp_status(
1757 &self,
1758 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
1759 self.mcp_manager.get_status().await
1760 }
1761}
1762
1763#[cfg(test)]
1768mod tests {
1769 use super::*;
1770 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
1771 use crate::store::SessionStore;
1772
1773 #[tokio::test]
1774 async fn test_session_submit_no_queue_returns_err() {
1775 let agent = Agent::from_config(test_config()).await.unwrap();
1776 let session = agent.session(".", None).unwrap();
1777 struct Noop;
1778 #[async_trait::async_trait]
1779 impl crate::queue::SessionCommand for Noop {
1780 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1781 Ok(serde_json::json!(null))
1782 }
1783 fn command_type(&self) -> &str {
1784 "noop"
1785 }
1786 }
1787 let result: anyhow::Result<
1788 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
1789 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
1790 assert!(result.is_err());
1791 assert!(result.unwrap_err().to_string().contains("No queue"));
1792 }
1793
1794 #[tokio::test]
1795 async fn test_session_submit_batch_no_queue_returns_err() {
1796 let agent = Agent::from_config(test_config()).await.unwrap();
1797 let session = agent.session(".", None).unwrap();
1798 struct Noop;
1799 #[async_trait::async_trait]
1800 impl crate::queue::SessionCommand for Noop {
1801 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1802 Ok(serde_json::json!(null))
1803 }
1804 fn command_type(&self) -> &str {
1805 "noop"
1806 }
1807 }
1808 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
1809 let result: anyhow::Result<
1810 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
1811 > = session.submit_batch(SessionLane::Query, cmds).await;
1812 assert!(result.is_err());
1813 assert!(result.unwrap_err().to_string().contains("No queue"));
1814 }
1815
1816 fn test_config() -> CodeConfig {
1817 CodeConfig {
1818 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
1819 providers: vec![
1820 ProviderConfig {
1821 name: "anthropic".to_string(),
1822 api_key: Some("test-key".to_string()),
1823 base_url: None,
1824 models: vec![ModelConfig {
1825 id: "claude-sonnet-4-20250514".to_string(),
1826 name: "Claude Sonnet 4".to_string(),
1827 family: "claude-sonnet".to_string(),
1828 api_key: None,
1829 base_url: None,
1830 attachment: false,
1831 reasoning: false,
1832 tool_call: true,
1833 temperature: true,
1834 release_date: None,
1835 modalities: ModelModalities::default(),
1836 cost: Default::default(),
1837 limit: Default::default(),
1838 }],
1839 },
1840 ProviderConfig {
1841 name: "openai".to_string(),
1842 api_key: Some("test-openai-key".to_string()),
1843 base_url: None,
1844 models: vec![ModelConfig {
1845 id: "gpt-4o".to_string(),
1846 name: "GPT-4o".to_string(),
1847 family: "gpt-4".to_string(),
1848 api_key: None,
1849 base_url: None,
1850 attachment: false,
1851 reasoning: false,
1852 tool_call: true,
1853 temperature: true,
1854 release_date: None,
1855 modalities: ModelModalities::default(),
1856 cost: Default::default(),
1857 limit: Default::default(),
1858 }],
1859 },
1860 ],
1861 ..Default::default()
1862 }
1863 }
1864
1865 #[tokio::test]
1866 async fn test_from_config() {
1867 let agent = Agent::from_config(test_config()).await;
1868 assert!(agent.is_ok());
1869 }
1870
1871 #[tokio::test]
1872 async fn test_session_default() {
1873 let agent = Agent::from_config(test_config()).await.unwrap();
1874 let session = agent.session("/tmp/test-workspace", None);
1875 assert!(session.is_ok());
1876 let debug = format!("{:?}", session.unwrap());
1877 assert!(debug.contains("AgentSession"));
1878 }
1879
1880 #[tokio::test]
1881 async fn test_session_with_model_override() {
1882 let agent = Agent::from_config(test_config()).await.unwrap();
1883 let opts = SessionOptions::new().with_model("openai/gpt-4o");
1884 let session = agent.session("/tmp/test-workspace", Some(opts));
1885 assert!(session.is_ok());
1886 }
1887
1888 #[tokio::test]
1889 async fn test_session_with_invalid_model_format() {
1890 let agent = Agent::from_config(test_config()).await.unwrap();
1891 let opts = SessionOptions::new().with_model("gpt-4o");
1892 let session = agent.session("/tmp/test-workspace", Some(opts));
1893 assert!(session.is_err());
1894 }
1895
1896 #[tokio::test]
1897 async fn test_session_with_model_not_found() {
1898 let agent = Agent::from_config(test_config()).await.unwrap();
1899 let opts = SessionOptions::new().with_model("openai/nonexistent");
1900 let session = agent.session("/tmp/test-workspace", Some(opts));
1901 assert!(session.is_err());
1902 }
1903
1904 #[tokio::test]
1905 async fn test_new_with_hcl_string() {
1906 let hcl = r#"
1907 default_model = "anthropic/claude-sonnet-4-20250514"
1908 providers {
1909 name = "anthropic"
1910 api_key = "test-key"
1911 models {
1912 id = "claude-sonnet-4-20250514"
1913 name = "Claude Sonnet 4"
1914 }
1915 }
1916 "#;
1917 let agent = Agent::new(hcl).await;
1918 assert!(agent.is_ok());
1919 }
1920
1921 #[tokio::test]
1922 async fn test_create_alias_hcl() {
1923 let hcl = r#"
1924 default_model = "anthropic/claude-sonnet-4-20250514"
1925 providers {
1926 name = "anthropic"
1927 api_key = "test-key"
1928 models {
1929 id = "claude-sonnet-4-20250514"
1930 name = "Claude Sonnet 4"
1931 }
1932 }
1933 "#;
1934 let agent = Agent::create(hcl).await;
1935 assert!(agent.is_ok());
1936 }
1937
1938 #[tokio::test]
1939 async fn test_create_and_new_produce_same_result() {
1940 let hcl = r#"
1941 default_model = "anthropic/claude-sonnet-4-20250514"
1942 providers {
1943 name = "anthropic"
1944 api_key = "test-key"
1945 models {
1946 id = "claude-sonnet-4-20250514"
1947 name = "Claude Sonnet 4"
1948 }
1949 }
1950 "#;
1951 let agent_new = Agent::new(hcl).await;
1952 let agent_create = Agent::create(hcl).await;
1953 assert!(agent_new.is_ok());
1954 assert!(agent_create.is_ok());
1955
1956 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
1958 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
1959 assert!(session_new.is_ok());
1960 assert!(session_create.is_ok());
1961 }
1962
1963 #[test]
1964 fn test_from_config_requires_default_model() {
1965 let rt = tokio::runtime::Runtime::new().unwrap();
1966 let config = CodeConfig {
1967 providers: vec![ProviderConfig {
1968 name: "anthropic".to_string(),
1969 api_key: Some("test-key".to_string()),
1970 base_url: None,
1971 models: vec![],
1972 }],
1973 ..Default::default()
1974 };
1975 let result = rt.block_on(Agent::from_config(config));
1976 assert!(result.is_err());
1977 }
1978
1979 #[tokio::test]
1980 async fn test_history_empty_on_new_session() {
1981 let agent = Agent::from_config(test_config()).await.unwrap();
1982 let session = agent.session("/tmp/test-workspace", None).unwrap();
1983 assert!(session.history().is_empty());
1984 }
1985
1986 #[tokio::test]
1987 async fn test_session_options_with_agent_dir() {
1988 let opts = SessionOptions::new()
1989 .with_agent_dir("/tmp/agents")
1990 .with_agent_dir("/tmp/more-agents");
1991 assert_eq!(opts.agent_dirs.len(), 2);
1992 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
1993 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
1994 }
1995
1996 #[test]
2001 fn test_session_options_with_queue_config() {
2002 let qc = SessionQueueConfig::default().with_lane_features();
2003 let opts = SessionOptions::new().with_queue_config(qc.clone());
2004 assert!(opts.queue_config.is_some());
2005
2006 let config = opts.queue_config.unwrap();
2007 assert!(config.enable_dlq);
2008 assert!(config.enable_metrics);
2009 assert!(config.enable_alerts);
2010 assert_eq!(config.default_timeout_ms, Some(60_000));
2011 }
2012
2013 #[tokio::test(flavor = "multi_thread")]
2014 async fn test_session_with_queue_config() {
2015 let agent = Agent::from_config(test_config()).await.unwrap();
2016 let qc = SessionQueueConfig::default();
2017 let opts = SessionOptions::new().with_queue_config(qc);
2018 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
2019 assert!(session.is_ok());
2020 let session = session.unwrap();
2021 assert!(session.has_queue());
2022 }
2023
2024 #[tokio::test]
2025 async fn test_session_without_queue_config() {
2026 let agent = Agent::from_config(test_config()).await.unwrap();
2027 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
2028 assert!(!session.has_queue());
2029 }
2030
2031 #[tokio::test]
2032 async fn test_session_queue_stats_without_queue() {
2033 let agent = Agent::from_config(test_config()).await.unwrap();
2034 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
2035 let stats = session.queue_stats().await;
2036 assert_eq!(stats.total_pending, 0);
2038 assert_eq!(stats.total_active, 0);
2039 }
2040
2041 #[tokio::test(flavor = "multi_thread")]
2042 async fn test_session_queue_stats_with_queue() {
2043 let agent = Agent::from_config(test_config()).await.unwrap();
2044 let qc = SessionQueueConfig::default();
2045 let opts = SessionOptions::new().with_queue_config(qc);
2046 let session = agent
2047 .session("/tmp/test-workspace-qstats", Some(opts))
2048 .unwrap();
2049 let stats = session.queue_stats().await;
2050 assert_eq!(stats.total_pending, 0);
2052 assert_eq!(stats.total_active, 0);
2053 }
2054
2055 #[tokio::test(flavor = "multi_thread")]
2056 async fn test_session_pending_external_tasks_empty() {
2057 let agent = Agent::from_config(test_config()).await.unwrap();
2058 let qc = SessionQueueConfig::default();
2059 let opts = SessionOptions::new().with_queue_config(qc);
2060 let session = agent
2061 .session("/tmp/test-workspace-ext", Some(opts))
2062 .unwrap();
2063 let tasks = session.pending_external_tasks().await;
2064 assert!(tasks.is_empty());
2065 }
2066
2067 #[tokio::test(flavor = "multi_thread")]
2068 async fn test_session_dead_letters_empty() {
2069 let agent = Agent::from_config(test_config()).await.unwrap();
2070 let qc = SessionQueueConfig::default().with_dlq(Some(100));
2071 let opts = SessionOptions::new().with_queue_config(qc);
2072 let session = agent
2073 .session("/tmp/test-workspace-dlq", Some(opts))
2074 .unwrap();
2075 let dead = session.dead_letters().await;
2076 assert!(dead.is_empty());
2077 }
2078
2079 #[tokio::test(flavor = "multi_thread")]
2080 async fn test_session_queue_metrics_disabled() {
2081 let agent = Agent::from_config(test_config()).await.unwrap();
2082 let qc = SessionQueueConfig::default();
2084 let opts = SessionOptions::new().with_queue_config(qc);
2085 let session = agent
2086 .session("/tmp/test-workspace-nomet", Some(opts))
2087 .unwrap();
2088 let metrics = session.queue_metrics().await;
2089 assert!(metrics.is_none());
2090 }
2091
2092 #[tokio::test(flavor = "multi_thread")]
2093 async fn test_session_queue_metrics_enabled() {
2094 let agent = Agent::from_config(test_config()).await.unwrap();
2095 let qc = SessionQueueConfig::default().with_metrics();
2096 let opts = SessionOptions::new().with_queue_config(qc);
2097 let session = agent
2098 .session("/tmp/test-workspace-met", Some(opts))
2099 .unwrap();
2100 let metrics = session.queue_metrics().await;
2101 assert!(metrics.is_some());
2102 }
2103
2104 #[tokio::test(flavor = "multi_thread")]
2105 async fn test_session_set_lane_handler() {
2106 let agent = Agent::from_config(test_config()).await.unwrap();
2107 let qc = SessionQueueConfig::default();
2108 let opts = SessionOptions::new().with_queue_config(qc);
2109 let session = agent
2110 .session("/tmp/test-workspace-handler", Some(opts))
2111 .unwrap();
2112
2113 session
2115 .set_lane_handler(
2116 SessionLane::Execute,
2117 LaneHandlerConfig {
2118 mode: crate::queue::TaskHandlerMode::External,
2119 timeout_ms: 30_000,
2120 },
2121 )
2122 .await;
2123
2124 }
2127
2128 #[tokio::test(flavor = "multi_thread")]
2133 async fn test_session_has_id() {
2134 let agent = Agent::from_config(test_config()).await.unwrap();
2135 let session = agent.session("/tmp/test-ws-id", None).unwrap();
2136 assert!(!session.session_id().is_empty());
2138 assert_eq!(session.session_id().len(), 36); }
2140
2141 #[tokio::test(flavor = "multi_thread")]
2142 async fn test_session_explicit_id() {
2143 let agent = Agent::from_config(test_config()).await.unwrap();
2144 let opts = SessionOptions::new().with_session_id("my-session-42");
2145 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
2146 assert_eq!(session.session_id(), "my-session-42");
2147 }
2148
2149 #[tokio::test(flavor = "multi_thread")]
2150 async fn test_session_save_no_store() {
2151 let agent = Agent::from_config(test_config()).await.unwrap();
2152 let session = agent.session("/tmp/test-ws-save", None).unwrap();
2153 session.save().await.unwrap();
2155 }
2156
2157 #[tokio::test(flavor = "multi_thread")]
2158 async fn test_session_save_and_load() {
2159 let store = Arc::new(crate::store::MemorySessionStore::new());
2160 let agent = Agent::from_config(test_config()).await.unwrap();
2161
2162 let opts = SessionOptions::new()
2163 .with_session_store(store.clone())
2164 .with_session_id("persist-test");
2165 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
2166
2167 session.save().await.unwrap();
2169
2170 assert!(store.exists("persist-test").await.unwrap());
2172
2173 let data = store.load("persist-test").await.unwrap().unwrap();
2174 assert_eq!(data.id, "persist-test");
2175 assert!(data.messages.is_empty());
2176 }
2177
2178 #[tokio::test(flavor = "multi_thread")]
2179 async fn test_session_save_with_history() {
2180 let store = Arc::new(crate::store::MemorySessionStore::new());
2181 let agent = Agent::from_config(test_config()).await.unwrap();
2182
2183 let opts = SessionOptions::new()
2184 .with_session_store(store.clone())
2185 .with_session_id("history-test");
2186 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
2187
2188 {
2190 let mut h = session.history.write().unwrap();
2191 h.push(Message::user("Hello"));
2192 h.push(Message::user("How are you?"));
2193 }
2194
2195 session.save().await.unwrap();
2196
2197 let data = store.load("history-test").await.unwrap().unwrap();
2198 assert_eq!(data.messages.len(), 2);
2199 }
2200
2201 #[tokio::test(flavor = "multi_thread")]
2202 async fn test_resume_session() {
2203 let store = Arc::new(crate::store::MemorySessionStore::new());
2204 let agent = Agent::from_config(test_config()).await.unwrap();
2205
2206 let opts = SessionOptions::new()
2208 .with_session_store(store.clone())
2209 .with_session_id("resume-test");
2210 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
2211 {
2212 let mut h = session.history.write().unwrap();
2213 h.push(Message::user("What is Rust?"));
2214 h.push(Message::user("Tell me more"));
2215 }
2216 session.save().await.unwrap();
2217
2218 let opts2 = SessionOptions::new().with_session_store(store.clone());
2220 let resumed = agent.resume_session("resume-test", opts2).unwrap();
2221
2222 assert_eq!(resumed.session_id(), "resume-test");
2223 let history = resumed.history();
2224 assert_eq!(history.len(), 2);
2225 assert_eq!(history[0].text(), "What is Rust?");
2226 }
2227
2228 #[tokio::test(flavor = "multi_thread")]
2229 async fn test_resume_session_not_found() {
2230 let store = Arc::new(crate::store::MemorySessionStore::new());
2231 let agent = Agent::from_config(test_config()).await.unwrap();
2232
2233 let opts = SessionOptions::new().with_session_store(store.clone());
2234 let result = agent.resume_session("nonexistent", opts);
2235 assert!(result.is_err());
2236 assert!(result.unwrap_err().to_string().contains("not found"));
2237 }
2238
2239 #[tokio::test(flavor = "multi_thread")]
2240 async fn test_resume_session_no_store() {
2241 let agent = Agent::from_config(test_config()).await.unwrap();
2242 let opts = SessionOptions::new();
2243 let result = agent.resume_session("any-id", opts);
2244 assert!(result.is_err());
2245 assert!(result.unwrap_err().to_string().contains("session_store"));
2246 }
2247
2248 #[tokio::test(flavor = "multi_thread")]
2249 async fn test_file_session_store_persistence() {
2250 let dir = tempfile::TempDir::new().unwrap();
2251 let store = Arc::new(
2252 crate::store::FileSessionStore::new(dir.path())
2253 .await
2254 .unwrap(),
2255 );
2256 let agent = Agent::from_config(test_config()).await.unwrap();
2257
2258 let opts = SessionOptions::new()
2260 .with_session_store(store.clone())
2261 .with_session_id("file-persist");
2262 let session = agent
2263 .session("/tmp/test-ws-file-persist", Some(opts))
2264 .unwrap();
2265 {
2266 let mut h = session.history.write().unwrap();
2267 h.push(Message::user("test message"));
2268 }
2269 session.save().await.unwrap();
2270
2271 let store2 = Arc::new(
2273 crate::store::FileSessionStore::new(dir.path())
2274 .await
2275 .unwrap(),
2276 );
2277 let data = store2.load("file-persist").await.unwrap().unwrap();
2278 assert_eq!(data.messages.len(), 1);
2279 }
2280
2281 #[tokio::test(flavor = "multi_thread")]
2282 async fn test_session_options_builders() {
2283 let opts = SessionOptions::new()
2284 .with_session_id("test-id")
2285 .with_auto_save(true);
2286 assert_eq!(opts.session_id, Some("test-id".to_string()));
2287 assert!(opts.auto_save);
2288 }
2289
2290 #[test]
2295 fn test_session_options_with_sandbox_sets_config() {
2296 use crate::sandbox::SandboxConfig;
2297 let cfg = SandboxConfig {
2298 image: "ubuntu:22.04".into(),
2299 memory_mb: 1024,
2300 ..SandboxConfig::default()
2301 };
2302 let opts = SessionOptions::new().with_sandbox(cfg);
2303 assert!(opts.sandbox_config.is_some());
2304 let sc = opts.sandbox_config.unwrap();
2305 assert_eq!(sc.image, "ubuntu:22.04");
2306 assert_eq!(sc.memory_mb, 1024);
2307 }
2308
2309 #[test]
2310 fn test_session_options_default_has_no_sandbox() {
2311 let opts = SessionOptions::default();
2312 assert!(opts.sandbox_config.is_none());
2313 }
2314
2315 #[tokio::test]
2316 async fn test_session_debug_includes_sandbox_config() {
2317 use crate::sandbox::SandboxConfig;
2318 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
2319 let debug = format!("{:?}", opts);
2320 assert!(debug.contains("sandbox_config"));
2321 }
2322
2323 #[tokio::test]
2324 async fn test_session_build_with_sandbox_config_no_feature_warn() {
2325 let agent = Agent::from_config(test_config()).await.unwrap();
2328 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
2329 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
2331 assert!(session.is_ok());
2332 }
2333
2334 #[tokio::test(flavor = "multi_thread")]
2339 async fn test_session_with_memory_store() {
2340 use a3s_memory::InMemoryStore;
2341 let store = Arc::new(InMemoryStore::new());
2342 let agent = Agent::from_config(test_config()).await.unwrap();
2343 let opts = SessionOptions::new().with_memory(store);
2344 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
2345 assert!(session.memory().is_some());
2346 }
2347
2348 #[tokio::test(flavor = "multi_thread")]
2349 async fn test_session_without_memory_store() {
2350 let agent = Agent::from_config(test_config()).await.unwrap();
2351 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
2352 assert!(session.memory().is_none());
2353 }
2354
2355 #[tokio::test(flavor = "multi_thread")]
2356 async fn test_session_memory_wired_into_config() {
2357 use a3s_memory::InMemoryStore;
2358 let store = Arc::new(InMemoryStore::new());
2359 let agent = Agent::from_config(test_config()).await.unwrap();
2360 let opts = SessionOptions::new().with_memory(store);
2361 let session = agent
2362 .session("/tmp/test-ws-mem-config", Some(opts))
2363 .unwrap();
2364 assert!(session.memory().is_some());
2366 }
2367
2368 #[tokio::test(flavor = "multi_thread")]
2369 async fn test_session_with_file_memory() {
2370 let dir = tempfile::TempDir::new().unwrap();
2371 let agent = Agent::from_config(test_config()).await.unwrap();
2372 let opts = SessionOptions::new().with_file_memory(dir.path());
2373 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
2374 assert!(session.memory().is_some());
2375 }
2376
2377 #[tokio::test(flavor = "multi_thread")]
2378 async fn test_memory_remember_and_recall() {
2379 use a3s_memory::InMemoryStore;
2380 let store = Arc::new(InMemoryStore::new());
2381 let agent = Agent::from_config(test_config()).await.unwrap();
2382 let opts = SessionOptions::new().with_memory(store);
2383 let session = agent
2384 .session("/tmp/test-ws-mem-recall", Some(opts))
2385 .unwrap();
2386
2387 let memory = session.memory().unwrap();
2388 memory
2389 .remember_success("write a file", &["write".to_string()], "done")
2390 .await
2391 .unwrap();
2392
2393 let results = memory.recall_similar("write", 5).await.unwrap();
2394 assert!(!results.is_empty());
2395 let stats = memory.stats().await.unwrap();
2396 assert_eq!(stats.long_term_count, 1);
2397 }
2398
2399 #[tokio::test(flavor = "multi_thread")]
2404 async fn test_session_tool_timeout_configured() {
2405 let agent = Agent::from_config(test_config()).await.unwrap();
2406 let opts = SessionOptions::new().with_tool_timeout(5000);
2407 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
2408 assert!(!session.id().is_empty());
2409 }
2410
2411 #[tokio::test(flavor = "multi_thread")]
2416 async fn test_session_without_queue_builds_ok() {
2417 let agent = Agent::from_config(test_config()).await.unwrap();
2418 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
2419 assert!(!session.id().is_empty());
2420 }
2421
2422 #[tokio::test(flavor = "multi_thread")]
2427 async fn test_concurrent_history_reads() {
2428 let agent = Agent::from_config(test_config()).await.unwrap();
2429 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
2430
2431 let handles: Vec<_> = (0..10)
2432 .map(|_| {
2433 let s = Arc::clone(&session);
2434 tokio::spawn(async move { s.history().len() })
2435 })
2436 .collect();
2437
2438 for h in handles {
2439 h.await.unwrap();
2440 }
2441 }
2442
2443 #[tokio::test(flavor = "multi_thread")]
2448 async fn test_session_no_init_warning_without_file_memory() {
2449 let agent = Agent::from_config(test_config()).await.unwrap();
2450 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
2451 assert!(session.init_warning().is_none());
2452 }
2453
2454 #[tokio::test(flavor = "multi_thread")]
2455 async fn test_session_with_mcp_manager_builds_ok() {
2456 use crate::mcp::manager::McpManager;
2457 let mcp = Arc::new(McpManager::new());
2458 let agent = Agent::from_config(test_config()).await.unwrap();
2459 let opts = SessionOptions::new().with_mcp(mcp);
2460 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
2462 assert!(!session.id().is_empty());
2463 }
2464
2465 #[test]
2466 fn test_session_command_is_pub() {
2467 use crate::SessionCommand;
2469 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
2470 }
2471
2472 #[tokio::test(flavor = "multi_thread")]
2473 async fn test_session_submit_with_queue_executes() {
2474 let agent = Agent::from_config(test_config()).await.unwrap();
2475 let qc = SessionQueueConfig::default();
2476 let opts = SessionOptions::new().with_queue_config(qc);
2477 let session = agent
2478 .session("/tmp/test-ws-submit-exec", Some(opts))
2479 .unwrap();
2480
2481 struct Echo(serde_json::Value);
2482 #[async_trait::async_trait]
2483 impl crate::queue::SessionCommand for Echo {
2484 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2485 Ok(self.0.clone())
2486 }
2487 fn command_type(&self) -> &str {
2488 "echo"
2489 }
2490 }
2491
2492 let rx = session
2493 .submit(
2494 SessionLane::Query,
2495 Box::new(Echo(serde_json::json!({"ok": true}))),
2496 )
2497 .await
2498 .expect("submit should succeed with queue configured");
2499
2500 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
2501 .await
2502 .expect("timed out waiting for command result")
2503 .expect("channel closed before result")
2504 .expect("command returned an error");
2505
2506 assert_eq!(result["ok"], true);
2507 }
2508}