1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{CommandContext, CommandRegistry};
21use crate::config::CodeConfig;
22use crate::error::Result;
23use crate::llm::{LlmClient, Message};
24use crate::prompts::SystemPromptSlots;
25use crate::queue::{
26 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
27 SessionQueueStats,
28};
29use crate::session_lane_queue::SessionLaneQueue;
30use crate::tools::{ToolContext, ToolExecutor};
31use a3s_lane::{DeadLetter, MetricsSnapshot};
32use a3s_memory::{FileMemoryStore, MemoryStore};
33use anyhow::Context;
34use std::path::{Path, PathBuf};
35use std::sync::{Arc, RwLock};
36use tokio::sync::{broadcast, mpsc};
37use tokio::task::JoinHandle;
38
39#[derive(Debug, Clone)]
45pub struct ToolCallResult {
46 pub name: String,
47 pub output: String,
48 pub exit_code: i32,
49}
50
51#[derive(Clone, Default)]
57pub struct SessionOptions {
58 pub model: Option<String>,
60 pub agent_dirs: Vec<PathBuf>,
63 pub queue_config: Option<SessionQueueConfig>,
68 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
70 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
72 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
74 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
76 pub planning_enabled: bool,
78 pub goal_tracking: bool,
80 pub skill_dirs: Vec<PathBuf>,
83 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
85 pub memory_store: Option<Arc<dyn MemoryStore>>,
87 pub(crate) file_memory_dir: Option<PathBuf>,
89 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
91 pub session_id: Option<String>,
93 pub auto_save: bool,
95 pub max_parse_retries: Option<u32>,
98 pub tool_timeout_ms: Option<u64>,
101 pub circuit_breaker_threshold: Option<u32>,
105 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
111 pub auto_compact: bool,
113 pub auto_compact_threshold: Option<f32>,
116 pub continuation_enabled: Option<bool>,
119 pub max_continuation_turns: Option<u32>,
122 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
127 pub temperature: Option<f32>,
129 pub thinking_budget: Option<usize>,
131 pub prompt_slots: Option<SystemPromptSlots>,
137}
138
139impl std::fmt::Debug for SessionOptions {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("SessionOptions")
142 .field("model", &self.model)
143 .field("agent_dirs", &self.agent_dirs)
144 .field("skill_dirs", &self.skill_dirs)
145 .field("queue_config", &self.queue_config)
146 .field("security_provider", &self.security_provider.is_some())
147 .field("context_providers", &self.context_providers.len())
148 .field("confirmation_manager", &self.confirmation_manager.is_some())
149 .field("permission_checker", &self.permission_checker.is_some())
150 .field("planning_enabled", &self.planning_enabled)
151 .field("goal_tracking", &self.goal_tracking)
152 .field(
153 "skill_registry",
154 &self
155 .skill_registry
156 .as_ref()
157 .map(|r| format!("{} skills", r.len())),
158 )
159 .field("memory_store", &self.memory_store.is_some())
160 .field("session_store", &self.session_store.is_some())
161 .field("session_id", &self.session_id)
162 .field("auto_save", &self.auto_save)
163 .field("max_parse_retries", &self.max_parse_retries)
164 .field("tool_timeout_ms", &self.tool_timeout_ms)
165 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
166 .field("sandbox_config", &self.sandbox_config)
167 .field("auto_compact", &self.auto_compact)
168 .field("auto_compact_threshold", &self.auto_compact_threshold)
169 .field("continuation_enabled", &self.continuation_enabled)
170 .field("max_continuation_turns", &self.max_continuation_turns)
171 .field("mcp_manager", &self.mcp_manager.is_some())
172 .field("temperature", &self.temperature)
173 .field("thinking_budget", &self.thinking_budget)
174 .field("prompt_slots", &self.prompt_slots.is_some())
175 .finish()
176 }
177}
178
179impl SessionOptions {
180 pub fn new() -> Self {
181 Self::default()
182 }
183
184 pub fn with_model(mut self, model: impl Into<String>) -> Self {
185 self.model = Some(model.into());
186 self
187 }
188
189 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
190 self.agent_dirs.push(dir.into());
191 self
192 }
193
194 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
195 self.queue_config = Some(config);
196 self
197 }
198
199 pub fn with_default_security(mut self) -> Self {
201 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
202 self
203 }
204
205 pub fn with_security_provider(
207 mut self,
208 provider: Arc<dyn crate::security::SecurityProvider>,
209 ) -> Self {
210 self.security_provider = Some(provider);
211 self
212 }
213
214 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
216 let config = crate::context::FileSystemContextConfig::new(root_path);
217 self.context_providers
218 .push(Arc::new(crate::context::FileSystemContextProvider::new(
219 config,
220 )));
221 self
222 }
223
224 pub fn with_context_provider(
226 mut self,
227 provider: Arc<dyn crate::context::ContextProvider>,
228 ) -> Self {
229 self.context_providers.push(provider);
230 self
231 }
232
233 pub fn with_confirmation_manager(
235 mut self,
236 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
237 ) -> Self {
238 self.confirmation_manager = Some(manager);
239 self
240 }
241
242 pub fn with_permission_checker(
244 mut self,
245 checker: Arc<dyn crate::permissions::PermissionChecker>,
246 ) -> Self {
247 self.permission_checker = Some(checker);
248 self
249 }
250
251 pub fn with_permissive_policy(self) -> Self {
258 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
259 }
260
261 pub fn with_planning(mut self, enabled: bool) -> Self {
263 self.planning_enabled = enabled;
264 self
265 }
266
267 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
269 self.goal_tracking = enabled;
270 self
271 }
272
273 pub fn with_builtin_skills(mut self) -> Self {
275 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
276 self
277 }
278
279 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
281 self.skill_registry = Some(registry);
282 self
283 }
284
285 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
288 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
289 self
290 }
291
292 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
294 let registry = self
295 .skill_registry
296 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
297 if let Err(e) = registry.load_from_dir(&dir) {
298 tracing::warn!(
299 dir = %dir.as_ref().display(),
300 error = %e,
301 "Failed to load skills from directory — continuing without them"
302 );
303 }
304 self.skill_registry = Some(registry);
305 self
306 }
307
308 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
310 self.memory_store = Some(store);
311 self
312 }
313
314 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
320 self.file_memory_dir = Some(dir.into());
321 self
322 }
323
324 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
326 self.session_store = Some(store);
327 self
328 }
329
330 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
332 let dir = dir.into();
333 match tokio::runtime::Handle::try_current() {
334 Ok(handle) => {
335 match tokio::task::block_in_place(|| {
336 handle.block_on(crate::store::FileSessionStore::new(dir))
337 }) {
338 Ok(store) => {
339 self.session_store =
340 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
341 }
342 Err(e) => {
343 tracing::warn!("Failed to create file session store: {}", e);
344 }
345 }
346 }
347 Err(_) => {
348 tracing::warn!(
349 "No async runtime available for file session store — persistence disabled"
350 );
351 }
352 }
353 self
354 }
355
356 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
358 self.session_id = Some(id.into());
359 self
360 }
361
362 pub fn with_auto_save(mut self, enabled: bool) -> Self {
364 self.auto_save = enabled;
365 self
366 }
367
368 pub fn with_parse_retries(mut self, max: u32) -> Self {
374 self.max_parse_retries = Some(max);
375 self
376 }
377
378 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
384 self.tool_timeout_ms = Some(timeout_ms);
385 self
386 }
387
388 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
394 self.circuit_breaker_threshold = Some(threshold);
395 self
396 }
397
398 pub fn with_resilience_defaults(self) -> Self {
404 self.with_parse_retries(2)
405 .with_tool_timeout(120_000)
406 .with_circuit_breaker(3)
407 }
408
409 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
428 self.sandbox_config = Some(config);
429 self
430 }
431
432 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
437 self.auto_compact = enabled;
438 self
439 }
440
441 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
443 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
444 self
445 }
446
447 pub fn with_continuation(mut self, enabled: bool) -> Self {
452 self.continuation_enabled = Some(enabled);
453 self
454 }
455
456 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
458 self.max_continuation_turns = Some(turns);
459 self
460 }
461
462 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
467 self.mcp_manager = Some(manager);
468 self
469 }
470
471 pub fn with_temperature(mut self, temperature: f32) -> Self {
472 self.temperature = Some(temperature);
473 self
474 }
475
476 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
477 self.thinking_budget = Some(budget);
478 self
479 }
480
481 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
486 self.prompt_slots = Some(slots);
487 self
488 }
489}
490
491pub struct Agent {
500 llm_client: Arc<dyn LlmClient>,
501 code_config: CodeConfig,
502 config: AgentConfig,
503 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
505 global_mcp_tools: Vec<(String, crate::mcp::McpTool)>,
507}
508
509impl std::fmt::Debug for Agent {
510 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
511 f.debug_struct("Agent").finish()
512 }
513}
514
515impl Agent {
516 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
520 let source = config_source.into();
521
522 let expanded = if let Some(rest) = source.strip_prefix("~/") {
524 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
525 if let Some(home) = home {
526 format!("{}/{}", home.to_string_lossy(), rest)
527 } else {
528 source.clone()
529 }
530 } else {
531 source.clone()
532 };
533
534 let path = Path::new(&expanded);
535
536 let config = if path.extension().is_some() && path.exists() {
537 CodeConfig::from_file(path)
538 .with_context(|| format!("Failed to load config: {}", path.display()))?
539 } else {
540 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
542 };
543
544 Self::from_config(config).await
545 }
546
547 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
552 Self::new(config_source).await
553 }
554
555 pub async fn from_config(config: CodeConfig) -> Result<Self> {
557 let llm_config = config
558 .default_llm_config()
559 .context("default_model must be set in 'provider/model' format with a valid API key")?;
560 let llm_client = crate::llm::create_client_with_config(llm_config);
561
562 let agent_config = AgentConfig {
563 max_tool_rounds: config
564 .max_tool_rounds
565 .unwrap_or(AgentConfig::default().max_tool_rounds),
566 ..AgentConfig::default()
567 };
568
569 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
571 (None, vec![])
572 } else {
573 let manager = Arc::new(crate::mcp::manager::McpManager::new());
574 for server in &config.mcp_servers {
575 if !server.enabled {
576 continue;
577 }
578 manager.register_server(server.clone()).await;
579 if let Err(e) = manager.connect(&server.name).await {
580 tracing::warn!(
581 server = %server.name,
582 error = %e,
583 "Failed to connect to MCP server — skipping"
584 );
585 }
586 }
587 let tools = manager.get_all_tools().await;
589 (Some(manager), tools)
590 };
591
592 let mut agent = Agent {
593 llm_client,
594 code_config: config,
595 config: agent_config,
596 global_mcp,
597 global_mcp_tools,
598 };
599
600 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
602 for dir in &agent.code_config.skill_dirs.clone() {
603 if let Err(e) = registry.load_from_dir(dir) {
604 tracing::warn!(
605 dir = %dir.display(),
606 error = %e,
607 "Failed to load skills from directory — skipping"
608 );
609 }
610 }
611 agent.config.skill_registry = Some(registry);
612
613 Ok(agent)
614 }
615
616 pub fn session(
621 &self,
622 workspace: impl Into<String>,
623 options: Option<SessionOptions>,
624 ) -> Result<AgentSession> {
625 let opts = options.unwrap_or_default();
626
627 let llm_client = if let Some(ref model) = opts.model {
628 let (provider_name, model_id) = model
629 .split_once('/')
630 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
631
632 let mut llm_config = self
633 .code_config
634 .llm_config(provider_name, model_id)
635 .with_context(|| {
636 format!("provider '{provider_name}' or model '{model_id}' not found in config")
637 })?;
638
639 if let Some(temp) = opts.temperature {
640 llm_config = llm_config.with_temperature(temp);
641 }
642 if let Some(budget) = opts.thinking_budget {
643 llm_config = llm_config.with_thinking_budget(budget);
644 }
645
646 crate::llm::create_client_with_config(llm_config)
647 } else {
648 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
649 tracing::warn!(
650 "temperature/thinking_budget set without model override — these will be ignored. \
651 Use with_model() to apply LLM parameter overrides."
652 );
653 }
654 self.llm_client.clone()
655 };
656
657 let merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
660 (Some(global), Some(session)) => {
661 let global = Arc::clone(global);
662 let session_mgr = Arc::clone(session);
663 match tokio::runtime::Handle::try_current() {
664 Ok(handle) => {
665 tokio::task::block_in_place(|| {
666 handle.block_on(async move {
667 for config in session_mgr.all_configs().await {
668 let name = config.name.clone();
669 global.register_server(config).await;
670 if let Err(e) = global.connect(&name).await {
671 tracing::warn!(
672 server = %name,
673 error = %e,
674 "Failed to connect session-level MCP server — skipping"
675 );
676 }
677 }
678 })
679 });
680 }
681 Err(_) => {
682 tracing::warn!(
683 "No async runtime available to merge session-level MCP servers \
684 into global manager — session MCP servers will not be available"
685 );
686 }
687 }
688 SessionOptions {
689 mcp_manager: Some(Arc::clone(self.global_mcp.as_ref().unwrap())),
690 ..opts
691 }
692 }
693 (Some(global), None) => SessionOptions {
694 mcp_manager: Some(Arc::clone(global)),
695 ..opts
696 },
697 _ => opts,
698 };
699
700 self.build_session(workspace.into(), llm_client, &merged_opts)
701 }
702
703 pub fn resume_session(
711 &self,
712 session_id: &str,
713 options: SessionOptions,
714 ) -> Result<AgentSession> {
715 let store = options.session_store.as_ref().ok_or_else(|| {
716 crate::error::CodeError::Session(
717 "resume_session requires a session_store in SessionOptions".to_string(),
718 )
719 })?;
720
721 let data = match tokio::runtime::Handle::try_current() {
723 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
724 .map_err(|e| {
725 crate::error::CodeError::Session(format!(
726 "Failed to load session {}: {}",
727 session_id, e
728 ))
729 })?,
730 Err(_) => {
731 return Err(crate::error::CodeError::Session(
732 "No async runtime available for session resume".to_string(),
733 ))
734 }
735 };
736
737 let data = data.ok_or_else(|| {
738 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
739 })?;
740
741 let mut opts = options;
743 opts.session_id = Some(data.id.clone());
744
745 let llm_client = if let Some(ref model) = opts.model {
746 let (provider_name, model_id) = model
747 .split_once('/')
748 .context("model format must be 'provider/model'")?;
749 let llm_config = self
750 .code_config
751 .llm_config(provider_name, model_id)
752 .with_context(|| {
753 format!("provider '{provider_name}' or model '{model_id}' not found")
754 })?;
755 crate::llm::create_client_with_config(llm_config)
756 } else {
757 self.llm_client.clone()
758 };
759
760 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
761
762 *session.history.write().unwrap() = data.messages;
764
765 Ok(session)
766 }
767
768 fn build_session(
769 &self,
770 workspace: String,
771 llm_client: Arc<dyn LlmClient>,
772 opts: &SessionOptions,
773 ) -> Result<AgentSession> {
774 let canonical =
775 std::fs::canonicalize(&workspace).unwrap_or_else(|_| PathBuf::from(&workspace));
776
777 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
778
779 if let Some(ref mcp) = opts.mcp_manager {
782 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
785 Arc::as_ptr(mcp),
786 self.global_mcp
787 .as_ref()
788 .map(Arc::as_ptr)
789 .unwrap_or(std::ptr::null()),
790 ) {
791 self.global_mcp_tools.clone()
793 } else {
794 match tokio::runtime::Handle::try_current() {
796 Ok(handle) => {
797 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
798 }
799 Err(_) => {
800 tracing::warn!(
801 "No async runtime available for session-level MCP tools — \
802 MCP tools will not be registered"
803 );
804 vec![]
805 }
806 }
807 };
808
809 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
810 std::collections::HashMap::new();
811 for (server, tool) in all_tools {
812 by_server.entry(server).or_default().push(tool);
813 }
814 for (server_name, tools) in by_server {
815 for tool in
816 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
817 {
818 tool_executor.register_dynamic_tool(tool);
819 }
820 }
821 }
822
823 let tool_defs = tool_executor.definitions();
824
825 let mut prompt_slots = opts
827 .prompt_slots
828 .clone()
829 .unwrap_or_else(|| self.config.prompt_slots.clone());
830
831 let base_registry = self
835 .config
836 .skill_registry
837 .as_deref()
838 .map(|r| r.fork())
839 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
840 if let Some(ref r) = opts.skill_registry {
842 for skill in r.all() {
843 base_registry.register_unchecked(skill);
844 }
845 }
846 for dir in &opts.skill_dirs {
848 if let Err(e) = base_registry.load_from_dir(dir) {
849 tracing::warn!(
850 dir = %dir.display(),
851 error = %e,
852 "Failed to load session skill dir — skipping"
853 );
854 }
855 }
856 let effective_registry = Arc::new(base_registry);
857
858 let skill_prompt = effective_registry.to_system_prompt();
860 if !skill_prompt.is_empty() {
861 prompt_slots.extra = match prompt_slots.extra {
862 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
863 None => Some(skill_prompt),
864 };
865 }
866
867 let mut init_warning: Option<String> = None;
869 let memory = {
870 let store = if let Some(ref store) = opts.memory_store {
871 Some(Arc::clone(store))
872 } else if let Some(ref dir) = opts.file_memory_dir {
873 match tokio::runtime::Handle::try_current() {
874 Ok(handle) => {
875 let dir = dir.clone();
876 match tokio::task::block_in_place(|| {
877 handle.block_on(FileMemoryStore::new(dir))
878 }) {
879 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
880 Err(e) => {
881 let msg = format!("Failed to create file memory store: {}", e);
882 tracing::warn!("{}", msg);
883 init_warning = Some(msg);
884 None
885 }
886 }
887 }
888 Err(_) => {
889 let msg =
890 "No async runtime available for file memory store — memory disabled"
891 .to_string();
892 tracing::warn!("{}", msg);
893 init_warning = Some(msg);
894 None
895 }
896 }
897 } else {
898 None
899 };
900 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
901 };
902
903 let base = self.config.clone();
904 let config = AgentConfig {
905 prompt_slots,
906 tools: tool_defs,
907 security_provider: opts.security_provider.clone(),
908 permission_checker: opts.permission_checker.clone(),
909 confirmation_manager: opts.confirmation_manager.clone(),
910 context_providers: opts.context_providers.clone(),
911 planning_enabled: opts.planning_enabled,
912 goal_tracking: opts.goal_tracking,
913 skill_registry: Some(effective_registry),
914 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
915 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
916 circuit_breaker_threshold: opts
917 .circuit_breaker_threshold
918 .unwrap_or(base.circuit_breaker_threshold),
919 auto_compact: opts.auto_compact,
920 auto_compact_threshold: opts
921 .auto_compact_threshold
922 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
923 max_context_tokens: base.max_context_tokens,
924 llm_client: Some(Arc::clone(&llm_client)),
925 memory: memory.clone(),
926 continuation_enabled: opts
927 .continuation_enabled
928 .unwrap_or(base.continuation_enabled),
929 max_continuation_turns: opts
930 .max_continuation_turns
931 .unwrap_or(base.max_continuation_turns),
932 ..base
933 };
934
935 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
938 let command_queue = if let Some(ref queue_config) = opts.queue_config {
939 let session_id = uuid::Uuid::new_v4().to_string();
940 let rt = tokio::runtime::Handle::try_current();
941
942 match rt {
943 Ok(handle) => {
944 let queue = tokio::task::block_in_place(|| {
946 handle.block_on(SessionLaneQueue::new(
947 &session_id,
948 queue_config.clone(),
949 agent_event_tx.clone(),
950 ))
951 });
952 match queue {
953 Ok(q) => {
954 let q = Arc::new(q);
956 let q2 = Arc::clone(&q);
957 tokio::task::block_in_place(|| {
958 handle.block_on(async { q2.start().await.ok() })
959 });
960 Some(q)
961 }
962 Err(e) => {
963 tracing::warn!("Failed to create session lane queue: {}", e);
964 None
965 }
966 }
967 }
968 Err(_) => {
969 tracing::warn!(
970 "No async runtime available for queue creation — queue disabled"
971 );
972 None
973 }
974 }
975 } else {
976 None
977 };
978
979 let mut tool_context = ToolContext::new(canonical.clone());
981 if let Some(ref search_config) = self.code_config.search {
982 tool_context = tool_context.with_search_config(search_config.clone());
983 }
984 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
985
986 #[cfg(feature = "sandbox")]
988 if let Some(ref sandbox_cfg) = opts.sandbox_config {
989 let handle: Arc<dyn crate::sandbox::BashSandbox> =
990 Arc::new(crate::sandbox::BoxSandboxHandle::new(
991 sandbox_cfg.clone(),
992 canonical.display().to_string(),
993 ));
994 tool_executor.registry().set_sandbox(Arc::clone(&handle));
997 tool_context = tool_context.with_sandbox(handle);
998 }
999 #[cfg(not(feature = "sandbox"))]
1000 if opts.sandbox_config.is_some() {
1001 tracing::warn!(
1002 "sandbox_config is set but the `sandbox` Cargo feature is not enabled \
1003 — bash commands will run locally"
1004 );
1005 }
1006
1007 let session_id = opts
1008 .session_id
1009 .clone()
1010 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1011
1012 let session_store = if opts.session_store.is_some() {
1014 opts.session_store.clone()
1015 } else if let Some(ref dir) = self.code_config.sessions_dir {
1016 match tokio::runtime::Handle::try_current() {
1017 Ok(handle) => {
1018 let dir = dir.clone();
1019 match tokio::task::block_in_place(|| {
1020 handle.block_on(crate::store::FileSessionStore::new(dir))
1021 }) {
1022 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1023 Err(e) => {
1024 tracing::warn!(
1025 "Failed to create session store from sessions_dir: {}",
1026 e
1027 );
1028 None
1029 }
1030 }
1031 }
1032 Err(_) => {
1033 tracing::warn!(
1034 "No async runtime for sessions_dir store — persistence disabled"
1035 );
1036 None
1037 }
1038 }
1039 } else {
1040 None
1041 };
1042
1043 Ok(AgentSession {
1044 llm_client,
1045 tool_executor,
1046 tool_context,
1047 memory: config.memory.clone(),
1048 config,
1049 workspace: canonical,
1050 session_id,
1051 history: RwLock::new(Vec::new()),
1052 command_queue,
1053 session_store,
1054 auto_save: opts.auto_save,
1055 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1056 init_warning,
1057 command_registry: CommandRegistry::new(),
1058 model_name: opts
1059 .model
1060 .clone()
1061 .or_else(|| self.code_config.default_model.clone())
1062 .unwrap_or_else(|| "unknown".to_string()),
1063 })
1064 }
1065}
1066
1067pub struct AgentSession {
1076 llm_client: Arc<dyn LlmClient>,
1077 tool_executor: Arc<ToolExecutor>,
1078 tool_context: ToolContext,
1079 config: AgentConfig,
1080 workspace: PathBuf,
1081 session_id: String,
1083 history: RwLock<Vec<Message>>,
1085 command_queue: Option<Arc<SessionLaneQueue>>,
1087 memory: Option<Arc<crate::memory::AgentMemory>>,
1089 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1091 auto_save: bool,
1093 hook_engine: Arc<crate::hooks::HookEngine>,
1095 init_warning: Option<String>,
1097 command_registry: CommandRegistry,
1099 model_name: String,
1101}
1102
1103impl std::fmt::Debug for AgentSession {
1104 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1105 f.debug_struct("AgentSession")
1106 .field("session_id", &self.session_id)
1107 .field("workspace", &self.workspace.display().to_string())
1108 .field("auto_save", &self.auto_save)
1109 .finish()
1110 }
1111}
1112
1113impl AgentSession {
1114 fn build_agent_loop(&self) -> AgentLoop {
1118 let mut config = self.config.clone();
1119 config.hook_engine =
1120 Some(Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>);
1121 let mut agent_loop = AgentLoop::new(
1122 self.llm_client.clone(),
1123 self.tool_executor.clone(),
1124 self.tool_context.clone(),
1125 config,
1126 );
1127 if let Some(ref queue) = self.command_queue {
1128 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1129 }
1130 agent_loop
1131 }
1132
1133 fn build_command_context(&self) -> CommandContext {
1135 let history = self.history.read().unwrap();
1136
1137 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1139
1140 let mut mcp_map: std::collections::HashMap<String, usize> =
1142 std::collections::HashMap::new();
1143 for name in &tool_names {
1144 if let Some(rest) = name.strip_prefix("mcp__") {
1145 if let Some((server, _)) = rest.split_once("__") {
1146 *mcp_map.entry(server.to_string()).or_default() += 1;
1147 }
1148 }
1149 }
1150 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1151 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1152
1153 CommandContext {
1154 session_id: self.session_id.clone(),
1155 workspace: self.workspace.display().to_string(),
1156 model: self.model_name.clone(),
1157 history_len: history.len(),
1158 total_tokens: 0,
1159 total_cost: 0.0,
1160 tool_names,
1161 mcp_servers,
1162 }
1163 }
1164
1165 pub fn command_registry(&self) -> &CommandRegistry {
1167 &self.command_registry
1168 }
1169
1170 pub fn register_command(&mut self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1172 self.command_registry.register(cmd);
1173 }
1174
1175 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1184 if CommandRegistry::is_command(prompt) {
1186 let ctx = self.build_command_context();
1187 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1188 return Ok(AgentResult {
1189 text: output.text,
1190 messages: history
1191 .map(|h| h.to_vec())
1192 .unwrap_or_else(|| self.history.read().unwrap().clone()),
1193 tool_calls_count: 0,
1194 usage: crate::llm::TokenUsage::default(),
1195 });
1196 }
1197 }
1198
1199 if let Some(ref w) = self.init_warning {
1200 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1201 }
1202 let agent_loop = self.build_agent_loop();
1203
1204 let use_internal = history.is_none();
1205 let effective_history = match history {
1206 Some(h) => h.to_vec(),
1207 None => self.history.read().unwrap().clone(),
1208 };
1209
1210 let result = agent_loop.execute(&effective_history, prompt, None).await?;
1211
1212 if use_internal {
1215 *self.history.write().unwrap() = result.messages.clone();
1216
1217 if self.auto_save {
1219 if let Err(e) = self.save().await {
1220 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1221 }
1222 }
1223 }
1224
1225 Ok(result)
1226 }
1227
1228 pub async fn send_with_attachments(
1233 &self,
1234 prompt: &str,
1235 attachments: &[crate::llm::Attachment],
1236 history: Option<&[Message]>,
1237 ) -> Result<AgentResult> {
1238 let use_internal = history.is_none();
1242 let mut effective_history = match history {
1243 Some(h) => h.to_vec(),
1244 None => self.history.read().unwrap().clone(),
1245 };
1246 effective_history.push(Message::user_with_attachments(prompt, attachments));
1247
1248 let agent_loop = self.build_agent_loop();
1249 let result = agent_loop
1250 .execute_from_messages(effective_history, None, None)
1251 .await?;
1252
1253 if use_internal {
1254 *self.history.write().unwrap() = result.messages.clone();
1255 if self.auto_save {
1256 if let Err(e) = self.save().await {
1257 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1258 }
1259 }
1260 }
1261
1262 Ok(result)
1263 }
1264
1265 pub async fn stream_with_attachments(
1270 &self,
1271 prompt: &str,
1272 attachments: &[crate::llm::Attachment],
1273 history: Option<&[Message]>,
1274 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1275 let (tx, rx) = mpsc::channel(256);
1276 let mut effective_history = match history {
1277 Some(h) => h.to_vec(),
1278 None => self.history.read().unwrap().clone(),
1279 };
1280 effective_history.push(Message::user_with_attachments(prompt, attachments));
1281
1282 let agent_loop = self.build_agent_loop();
1283 let handle = tokio::spawn(async move {
1284 let _ = agent_loop
1285 .execute_from_messages(effective_history, None, Some(tx))
1286 .await;
1287 });
1288
1289 Ok((rx, handle))
1290 }
1291
1292 pub async fn stream(
1302 &self,
1303 prompt: &str,
1304 history: Option<&[Message]>,
1305 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1306 if CommandRegistry::is_command(prompt) {
1308 let ctx = self.build_command_context();
1309 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1310 let (tx, rx) = mpsc::channel(256);
1311 let handle = tokio::spawn(async move {
1312 let _ = tx
1313 .send(AgentEvent::TextDelta {
1314 text: output.text.clone(),
1315 })
1316 .await;
1317 let _ = tx
1318 .send(AgentEvent::End {
1319 text: output.text.clone(),
1320 usage: crate::llm::TokenUsage::default(),
1321 })
1322 .await;
1323 });
1324 return Ok((rx, handle));
1325 }
1326 }
1327
1328 let (tx, rx) = mpsc::channel(256);
1329 let agent_loop = self.build_agent_loop();
1330 let effective_history = match history {
1331 Some(h) => h.to_vec(),
1332 None => self.history.read().unwrap().clone(),
1333 };
1334 let prompt = prompt.to_string();
1335
1336 let handle = tokio::spawn(async move {
1337 let _ = agent_loop
1338 .execute(&effective_history, &prompt, Some(tx))
1339 .await;
1340 });
1341
1342 Ok((rx, handle))
1343 }
1344
1345 pub fn history(&self) -> Vec<Message> {
1347 self.history.read().unwrap().clone()
1348 }
1349
1350 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
1352 self.memory.as_ref()
1353 }
1354
1355 pub fn id(&self) -> &str {
1357 &self.session_id
1358 }
1359
1360 pub fn workspace(&self) -> &std::path::Path {
1362 &self.workspace
1363 }
1364
1365 pub fn init_warning(&self) -> Option<&str> {
1367 self.init_warning.as_deref()
1368 }
1369
1370 pub fn session_id(&self) -> &str {
1372 &self.session_id
1373 }
1374
1375 pub fn register_hook(&self, hook: crate::hooks::Hook) {
1381 self.hook_engine.register(hook);
1382 }
1383
1384 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
1386 self.hook_engine.unregister(hook_id)
1387 }
1388
1389 pub fn register_hook_handler(
1391 &self,
1392 hook_id: &str,
1393 handler: Arc<dyn crate::hooks::HookHandler>,
1394 ) {
1395 self.hook_engine.register_handler(hook_id, handler);
1396 }
1397
1398 pub fn unregister_hook_handler(&self, hook_id: &str) {
1400 self.hook_engine.unregister_handler(hook_id);
1401 }
1402
1403 pub fn hook_count(&self) -> usize {
1405 self.hook_engine.hook_count()
1406 }
1407
1408 pub async fn save(&self) -> Result<()> {
1412 let store = match &self.session_store {
1413 Some(s) => s,
1414 None => return Ok(()),
1415 };
1416
1417 let history = self.history.read().unwrap().clone();
1418 let now = chrono::Utc::now().timestamp();
1419
1420 let data = crate::store::SessionData {
1421 id: self.session_id.clone(),
1422 config: crate::session::SessionConfig {
1423 name: String::new(),
1424 workspace: self.workspace.display().to_string(),
1425 system_prompt: Some(self.config.prompt_slots.build()),
1426 max_context_length: 200_000,
1427 auto_compact: false,
1428 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
1429 storage_type: crate::config::StorageBackend::File,
1430 queue_config: None,
1431 confirmation_policy: None,
1432 permission_policy: None,
1433 parent_id: None,
1434 security_config: None,
1435 hook_engine: None,
1436 planning_enabled: self.config.planning_enabled,
1437 goal_tracking: self.config.goal_tracking,
1438 },
1439 state: crate::session::SessionState::Active,
1440 messages: history,
1441 context_usage: crate::session::ContextUsage::default(),
1442 total_usage: crate::llm::TokenUsage::default(),
1443 total_cost: 0.0,
1444 model_name: None,
1445 cost_records: Vec::new(),
1446 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
1447 thinking_enabled: false,
1448 thinking_budget: None,
1449 created_at: now,
1450 updated_at: now,
1451 llm_config: None,
1452 tasks: Vec::new(),
1453 parent_id: None,
1454 };
1455
1456 store.save(&data).await?;
1457 tracing::debug!("Session {} saved", self.session_id);
1458 Ok(())
1459 }
1460
1461 pub async fn read_file(&self, path: &str) -> Result<String> {
1463 let args = serde_json::json!({ "file_path": path });
1464 let result = self.tool_executor.execute("read", &args).await?;
1465 Ok(result.output)
1466 }
1467
1468 pub async fn bash(&self, command: &str) -> Result<String> {
1473 let args = serde_json::json!({ "command": command });
1474 let result = self
1475 .tool_executor
1476 .execute_with_context("bash", &args, &self.tool_context)
1477 .await?;
1478 Ok(result.output)
1479 }
1480
1481 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
1483 let args = serde_json::json!({ "pattern": pattern });
1484 let result = self.tool_executor.execute("glob", &args).await?;
1485 let files: Vec<String> = result
1486 .output
1487 .lines()
1488 .filter(|l| !l.is_empty())
1489 .map(|l| l.to_string())
1490 .collect();
1491 Ok(files)
1492 }
1493
1494 pub async fn grep(&self, pattern: &str) -> Result<String> {
1496 let args = serde_json::json!({ "pattern": pattern });
1497 let result = self.tool_executor.execute("grep", &args).await?;
1498 Ok(result.output)
1499 }
1500
1501 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
1503 let result = self.tool_executor.execute(name, &args).await?;
1504 Ok(ToolCallResult {
1505 name: name.to_string(),
1506 output: result.output,
1507 exit_code: result.exit_code,
1508 })
1509 }
1510
1511 pub fn has_queue(&self) -> bool {
1517 self.command_queue.is_some()
1518 }
1519
1520 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
1524 if let Some(ref queue) = self.command_queue {
1525 queue.set_lane_handler(lane, config).await;
1526 }
1527 }
1528
1529 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
1533 if let Some(ref queue) = self.command_queue {
1534 queue.complete_external_task(task_id, result).await
1535 } else {
1536 false
1537 }
1538 }
1539
1540 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
1542 if let Some(ref queue) = self.command_queue {
1543 queue.pending_external_tasks().await
1544 } else {
1545 Vec::new()
1546 }
1547 }
1548
1549 pub async fn queue_stats(&self) -> SessionQueueStats {
1551 if let Some(ref queue) = self.command_queue {
1552 queue.stats().await
1553 } else {
1554 SessionQueueStats::default()
1555 }
1556 }
1557
1558 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
1560 if let Some(ref queue) = self.command_queue {
1561 queue.metrics_snapshot().await
1562 } else {
1563 None
1564 }
1565 }
1566
1567 pub async fn submit(
1573 &self,
1574 lane: SessionLane,
1575 command: Box<dyn crate::queue::SessionCommand>,
1576 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
1577 let queue = self
1578 .command_queue
1579 .as_ref()
1580 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1581 Ok(queue.submit(lane, command).await)
1582 }
1583
1584 pub async fn submit_batch(
1591 &self,
1592 lane: SessionLane,
1593 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
1594 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
1595 {
1596 let queue = self
1597 .command_queue
1598 .as_ref()
1599 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1600 Ok(queue.submit_batch(lane, commands).await)
1601 }
1602
1603 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
1605 if let Some(ref queue) = self.command_queue {
1606 queue.dead_letters().await
1607 } else {
1608 Vec::new()
1609 }
1610 }
1611
1612 pub async fn add_mcp_server(
1624 &self,
1625 manager: Arc<crate::mcp::manager::McpManager>,
1626 server_name: &str,
1627 ) -> crate::error::Result<usize> {
1628 manager
1629 .connect(server_name)
1630 .await
1631 .map_err(|e| crate::error::CodeError::Tool {
1632 tool: server_name.to_string(),
1633 message: format!("Failed to connect MCP server: {}", e),
1634 })?;
1635
1636 let tools = manager.get_server_tools(server_name).await;
1637 let count = tools.len();
1638
1639 for tool in crate::mcp::tools::create_mcp_tools(server_name, tools, Arc::clone(&manager)) {
1640 self.tool_executor.register_dynamic_tool(tool);
1641 }
1642
1643 tracing::info!(
1644 session_id = %self.session_id,
1645 server = server_name,
1646 tools = count,
1647 "MCP server added to live session"
1648 );
1649
1650 Ok(count)
1651 }
1652}
1653
1654#[cfg(test)]
1659mod tests {
1660 use super::*;
1661 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
1662 use crate::store::SessionStore;
1663
1664 #[tokio::test]
1665 async fn test_session_submit_no_queue_returns_err() {
1666 let agent = Agent::from_config(test_config()).await.unwrap();
1667 let session = agent.session(".", None).unwrap();
1668 struct Noop;
1669 #[async_trait::async_trait]
1670 impl crate::queue::SessionCommand for Noop {
1671 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1672 Ok(serde_json::json!(null))
1673 }
1674 fn command_type(&self) -> &str {
1675 "noop"
1676 }
1677 }
1678 let result: anyhow::Result<
1679 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
1680 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
1681 assert!(result.is_err());
1682 assert!(result.unwrap_err().to_string().contains("No queue"));
1683 }
1684
1685 #[tokio::test]
1686 async fn test_session_submit_batch_no_queue_returns_err() {
1687 let agent = Agent::from_config(test_config()).await.unwrap();
1688 let session = agent.session(".", None).unwrap();
1689 struct Noop;
1690 #[async_trait::async_trait]
1691 impl crate::queue::SessionCommand for Noop {
1692 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1693 Ok(serde_json::json!(null))
1694 }
1695 fn command_type(&self) -> &str {
1696 "noop"
1697 }
1698 }
1699 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
1700 let result: anyhow::Result<
1701 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
1702 > = session.submit_batch(SessionLane::Query, cmds).await;
1703 assert!(result.is_err());
1704 assert!(result.unwrap_err().to_string().contains("No queue"));
1705 }
1706
1707 fn test_config() -> CodeConfig {
1708 CodeConfig {
1709 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
1710 providers: vec![
1711 ProviderConfig {
1712 name: "anthropic".to_string(),
1713 api_key: Some("test-key".to_string()),
1714 base_url: None,
1715 models: vec![ModelConfig {
1716 id: "claude-sonnet-4-20250514".to_string(),
1717 name: "Claude Sonnet 4".to_string(),
1718 family: "claude-sonnet".to_string(),
1719 api_key: None,
1720 base_url: None,
1721 attachment: false,
1722 reasoning: false,
1723 tool_call: true,
1724 temperature: true,
1725 release_date: None,
1726 modalities: ModelModalities::default(),
1727 cost: Default::default(),
1728 limit: Default::default(),
1729 }],
1730 },
1731 ProviderConfig {
1732 name: "openai".to_string(),
1733 api_key: Some("test-openai-key".to_string()),
1734 base_url: None,
1735 models: vec![ModelConfig {
1736 id: "gpt-4o".to_string(),
1737 name: "GPT-4o".to_string(),
1738 family: "gpt-4".to_string(),
1739 api_key: None,
1740 base_url: None,
1741 attachment: false,
1742 reasoning: false,
1743 tool_call: true,
1744 temperature: true,
1745 release_date: None,
1746 modalities: ModelModalities::default(),
1747 cost: Default::default(),
1748 limit: Default::default(),
1749 }],
1750 },
1751 ],
1752 ..Default::default()
1753 }
1754 }
1755
1756 #[tokio::test]
1757 async fn test_from_config() {
1758 let agent = Agent::from_config(test_config()).await;
1759 assert!(agent.is_ok());
1760 }
1761
1762 #[tokio::test]
1763 async fn test_session_default() {
1764 let agent = Agent::from_config(test_config()).await.unwrap();
1765 let session = agent.session("/tmp/test-workspace", None);
1766 assert!(session.is_ok());
1767 let debug = format!("{:?}", session.unwrap());
1768 assert!(debug.contains("AgentSession"));
1769 }
1770
1771 #[tokio::test]
1772 async fn test_session_with_model_override() {
1773 let agent = Agent::from_config(test_config()).await.unwrap();
1774 let opts = SessionOptions::new().with_model("openai/gpt-4o");
1775 let session = agent.session("/tmp/test-workspace", Some(opts));
1776 assert!(session.is_ok());
1777 }
1778
1779 #[tokio::test]
1780 async fn test_session_with_invalid_model_format() {
1781 let agent = Agent::from_config(test_config()).await.unwrap();
1782 let opts = SessionOptions::new().with_model("gpt-4o");
1783 let session = agent.session("/tmp/test-workspace", Some(opts));
1784 assert!(session.is_err());
1785 }
1786
1787 #[tokio::test]
1788 async fn test_session_with_model_not_found() {
1789 let agent = Agent::from_config(test_config()).await.unwrap();
1790 let opts = SessionOptions::new().with_model("openai/nonexistent");
1791 let session = agent.session("/tmp/test-workspace", Some(opts));
1792 assert!(session.is_err());
1793 }
1794
1795 #[tokio::test]
1796 async fn test_new_with_hcl_string() {
1797 let hcl = r#"
1798 default_model = "anthropic/claude-sonnet-4-20250514"
1799 providers {
1800 name = "anthropic"
1801 api_key = "test-key"
1802 models {
1803 id = "claude-sonnet-4-20250514"
1804 name = "Claude Sonnet 4"
1805 }
1806 }
1807 "#;
1808 let agent = Agent::new(hcl).await;
1809 assert!(agent.is_ok());
1810 }
1811
1812 #[tokio::test]
1813 async fn test_create_alias_hcl() {
1814 let hcl = r#"
1815 default_model = "anthropic/claude-sonnet-4-20250514"
1816 providers {
1817 name = "anthropic"
1818 api_key = "test-key"
1819 models {
1820 id = "claude-sonnet-4-20250514"
1821 name = "Claude Sonnet 4"
1822 }
1823 }
1824 "#;
1825 let agent = Agent::create(hcl).await;
1826 assert!(agent.is_ok());
1827 }
1828
1829 #[tokio::test]
1830 async fn test_create_and_new_produce_same_result() {
1831 let hcl = r#"
1832 default_model = "anthropic/claude-sonnet-4-20250514"
1833 providers {
1834 name = "anthropic"
1835 api_key = "test-key"
1836 models {
1837 id = "claude-sonnet-4-20250514"
1838 name = "Claude Sonnet 4"
1839 }
1840 }
1841 "#;
1842 let agent_new = Agent::new(hcl).await;
1843 let agent_create = Agent::create(hcl).await;
1844 assert!(agent_new.is_ok());
1845 assert!(agent_create.is_ok());
1846
1847 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
1849 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
1850 assert!(session_new.is_ok());
1851 assert!(session_create.is_ok());
1852 }
1853
1854 #[test]
1855 fn test_from_config_requires_default_model() {
1856 let rt = tokio::runtime::Runtime::new().unwrap();
1857 let config = CodeConfig {
1858 providers: vec![ProviderConfig {
1859 name: "anthropic".to_string(),
1860 api_key: Some("test-key".to_string()),
1861 base_url: None,
1862 models: vec![],
1863 }],
1864 ..Default::default()
1865 };
1866 let result = rt.block_on(Agent::from_config(config));
1867 assert!(result.is_err());
1868 }
1869
1870 #[tokio::test]
1871 async fn test_history_empty_on_new_session() {
1872 let agent = Agent::from_config(test_config()).await.unwrap();
1873 let session = agent.session("/tmp/test-workspace", None).unwrap();
1874 assert!(session.history().is_empty());
1875 }
1876
1877 #[tokio::test]
1878 async fn test_session_options_with_agent_dir() {
1879 let opts = SessionOptions::new()
1880 .with_agent_dir("/tmp/agents")
1881 .with_agent_dir("/tmp/more-agents");
1882 assert_eq!(opts.agent_dirs.len(), 2);
1883 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
1884 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
1885 }
1886
1887 #[test]
1892 fn test_session_options_with_queue_config() {
1893 let qc = SessionQueueConfig::default().with_lane_features();
1894 let opts = SessionOptions::new().with_queue_config(qc.clone());
1895 assert!(opts.queue_config.is_some());
1896
1897 let config = opts.queue_config.unwrap();
1898 assert!(config.enable_dlq);
1899 assert!(config.enable_metrics);
1900 assert!(config.enable_alerts);
1901 assert_eq!(config.default_timeout_ms, Some(60_000));
1902 }
1903
1904 #[tokio::test(flavor = "multi_thread")]
1905 async fn test_session_with_queue_config() {
1906 let agent = Agent::from_config(test_config()).await.unwrap();
1907 let qc = SessionQueueConfig::default();
1908 let opts = SessionOptions::new().with_queue_config(qc);
1909 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
1910 assert!(session.is_ok());
1911 let session = session.unwrap();
1912 assert!(session.has_queue());
1913 }
1914
1915 #[tokio::test]
1916 async fn test_session_without_queue_config() {
1917 let agent = Agent::from_config(test_config()).await.unwrap();
1918 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
1919 assert!(!session.has_queue());
1920 }
1921
1922 #[tokio::test]
1923 async fn test_session_queue_stats_without_queue() {
1924 let agent = Agent::from_config(test_config()).await.unwrap();
1925 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
1926 let stats = session.queue_stats().await;
1927 assert_eq!(stats.total_pending, 0);
1929 assert_eq!(stats.total_active, 0);
1930 }
1931
1932 #[tokio::test(flavor = "multi_thread")]
1933 async fn test_session_queue_stats_with_queue() {
1934 let agent = Agent::from_config(test_config()).await.unwrap();
1935 let qc = SessionQueueConfig::default();
1936 let opts = SessionOptions::new().with_queue_config(qc);
1937 let session = agent
1938 .session("/tmp/test-workspace-qstats", Some(opts))
1939 .unwrap();
1940 let stats = session.queue_stats().await;
1941 assert_eq!(stats.total_pending, 0);
1943 assert_eq!(stats.total_active, 0);
1944 }
1945
1946 #[tokio::test(flavor = "multi_thread")]
1947 async fn test_session_pending_external_tasks_empty() {
1948 let agent = Agent::from_config(test_config()).await.unwrap();
1949 let qc = SessionQueueConfig::default();
1950 let opts = SessionOptions::new().with_queue_config(qc);
1951 let session = agent
1952 .session("/tmp/test-workspace-ext", Some(opts))
1953 .unwrap();
1954 let tasks = session.pending_external_tasks().await;
1955 assert!(tasks.is_empty());
1956 }
1957
1958 #[tokio::test(flavor = "multi_thread")]
1959 async fn test_session_dead_letters_empty() {
1960 let agent = Agent::from_config(test_config()).await.unwrap();
1961 let qc = SessionQueueConfig::default().with_dlq(Some(100));
1962 let opts = SessionOptions::new().with_queue_config(qc);
1963 let session = agent
1964 .session("/tmp/test-workspace-dlq", Some(opts))
1965 .unwrap();
1966 let dead = session.dead_letters().await;
1967 assert!(dead.is_empty());
1968 }
1969
1970 #[tokio::test(flavor = "multi_thread")]
1971 async fn test_session_queue_metrics_disabled() {
1972 let agent = Agent::from_config(test_config()).await.unwrap();
1973 let qc = SessionQueueConfig::default();
1975 let opts = SessionOptions::new().with_queue_config(qc);
1976 let session = agent
1977 .session("/tmp/test-workspace-nomet", Some(opts))
1978 .unwrap();
1979 let metrics = session.queue_metrics().await;
1980 assert!(metrics.is_none());
1981 }
1982
1983 #[tokio::test(flavor = "multi_thread")]
1984 async fn test_session_queue_metrics_enabled() {
1985 let agent = Agent::from_config(test_config()).await.unwrap();
1986 let qc = SessionQueueConfig::default().with_metrics();
1987 let opts = SessionOptions::new().with_queue_config(qc);
1988 let session = agent
1989 .session("/tmp/test-workspace-met", Some(opts))
1990 .unwrap();
1991 let metrics = session.queue_metrics().await;
1992 assert!(metrics.is_some());
1993 }
1994
1995 #[tokio::test(flavor = "multi_thread")]
1996 async fn test_session_set_lane_handler() {
1997 let agent = Agent::from_config(test_config()).await.unwrap();
1998 let qc = SessionQueueConfig::default();
1999 let opts = SessionOptions::new().with_queue_config(qc);
2000 let session = agent
2001 .session("/tmp/test-workspace-handler", Some(opts))
2002 .unwrap();
2003
2004 session
2006 .set_lane_handler(
2007 SessionLane::Execute,
2008 LaneHandlerConfig {
2009 mode: crate::queue::TaskHandlerMode::External,
2010 timeout_ms: 30_000,
2011 },
2012 )
2013 .await;
2014
2015 }
2018
2019 #[tokio::test(flavor = "multi_thread")]
2024 async fn test_session_has_id() {
2025 let agent = Agent::from_config(test_config()).await.unwrap();
2026 let session = agent.session("/tmp/test-ws-id", None).unwrap();
2027 assert!(!session.session_id().is_empty());
2029 assert_eq!(session.session_id().len(), 36); }
2031
2032 #[tokio::test(flavor = "multi_thread")]
2033 async fn test_session_explicit_id() {
2034 let agent = Agent::from_config(test_config()).await.unwrap();
2035 let opts = SessionOptions::new().with_session_id("my-session-42");
2036 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
2037 assert_eq!(session.session_id(), "my-session-42");
2038 }
2039
2040 #[tokio::test(flavor = "multi_thread")]
2041 async fn test_session_save_no_store() {
2042 let agent = Agent::from_config(test_config()).await.unwrap();
2043 let session = agent.session("/tmp/test-ws-save", None).unwrap();
2044 session.save().await.unwrap();
2046 }
2047
2048 #[tokio::test(flavor = "multi_thread")]
2049 async fn test_session_save_and_load() {
2050 let store = Arc::new(crate::store::MemorySessionStore::new());
2051 let agent = Agent::from_config(test_config()).await.unwrap();
2052
2053 let opts = SessionOptions::new()
2054 .with_session_store(store.clone())
2055 .with_session_id("persist-test");
2056 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
2057
2058 session.save().await.unwrap();
2060
2061 assert!(store.exists("persist-test").await.unwrap());
2063
2064 let data = store.load("persist-test").await.unwrap().unwrap();
2065 assert_eq!(data.id, "persist-test");
2066 assert!(data.messages.is_empty());
2067 }
2068
2069 #[tokio::test(flavor = "multi_thread")]
2070 async fn test_session_save_with_history() {
2071 let store = Arc::new(crate::store::MemorySessionStore::new());
2072 let agent = Agent::from_config(test_config()).await.unwrap();
2073
2074 let opts = SessionOptions::new()
2075 .with_session_store(store.clone())
2076 .with_session_id("history-test");
2077 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
2078
2079 {
2081 let mut h = session.history.write().unwrap();
2082 h.push(Message::user("Hello"));
2083 h.push(Message::user("How are you?"));
2084 }
2085
2086 session.save().await.unwrap();
2087
2088 let data = store.load("history-test").await.unwrap().unwrap();
2089 assert_eq!(data.messages.len(), 2);
2090 }
2091
2092 #[tokio::test(flavor = "multi_thread")]
2093 async fn test_resume_session() {
2094 let store = Arc::new(crate::store::MemorySessionStore::new());
2095 let agent = Agent::from_config(test_config()).await.unwrap();
2096
2097 let opts = SessionOptions::new()
2099 .with_session_store(store.clone())
2100 .with_session_id("resume-test");
2101 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
2102 {
2103 let mut h = session.history.write().unwrap();
2104 h.push(Message::user("What is Rust?"));
2105 h.push(Message::user("Tell me more"));
2106 }
2107 session.save().await.unwrap();
2108
2109 let opts2 = SessionOptions::new().with_session_store(store.clone());
2111 let resumed = agent.resume_session("resume-test", opts2).unwrap();
2112
2113 assert_eq!(resumed.session_id(), "resume-test");
2114 let history = resumed.history();
2115 assert_eq!(history.len(), 2);
2116 assert_eq!(history[0].text(), "What is Rust?");
2117 }
2118
2119 #[tokio::test(flavor = "multi_thread")]
2120 async fn test_resume_session_not_found() {
2121 let store = Arc::new(crate::store::MemorySessionStore::new());
2122 let agent = Agent::from_config(test_config()).await.unwrap();
2123
2124 let opts = SessionOptions::new().with_session_store(store.clone());
2125 let result = agent.resume_session("nonexistent", opts);
2126 assert!(result.is_err());
2127 assert!(result.unwrap_err().to_string().contains("not found"));
2128 }
2129
2130 #[tokio::test(flavor = "multi_thread")]
2131 async fn test_resume_session_no_store() {
2132 let agent = Agent::from_config(test_config()).await.unwrap();
2133 let opts = SessionOptions::new();
2134 let result = agent.resume_session("any-id", opts);
2135 assert!(result.is_err());
2136 assert!(result.unwrap_err().to_string().contains("session_store"));
2137 }
2138
2139 #[tokio::test(flavor = "multi_thread")]
2140 async fn test_file_session_store_persistence() {
2141 let dir = tempfile::TempDir::new().unwrap();
2142 let store = Arc::new(
2143 crate::store::FileSessionStore::new(dir.path())
2144 .await
2145 .unwrap(),
2146 );
2147 let agent = Agent::from_config(test_config()).await.unwrap();
2148
2149 let opts = SessionOptions::new()
2151 .with_session_store(store.clone())
2152 .with_session_id("file-persist");
2153 let session = agent
2154 .session("/tmp/test-ws-file-persist", Some(opts))
2155 .unwrap();
2156 {
2157 let mut h = session.history.write().unwrap();
2158 h.push(Message::user("test message"));
2159 }
2160 session.save().await.unwrap();
2161
2162 let store2 = Arc::new(
2164 crate::store::FileSessionStore::new(dir.path())
2165 .await
2166 .unwrap(),
2167 );
2168 let data = store2.load("file-persist").await.unwrap().unwrap();
2169 assert_eq!(data.messages.len(), 1);
2170 }
2171
2172 #[tokio::test(flavor = "multi_thread")]
2173 async fn test_session_options_builders() {
2174 let opts = SessionOptions::new()
2175 .with_session_id("test-id")
2176 .with_auto_save(true);
2177 assert_eq!(opts.session_id, Some("test-id".to_string()));
2178 assert!(opts.auto_save);
2179 }
2180
2181 #[test]
2186 fn test_session_options_with_sandbox_sets_config() {
2187 use crate::sandbox::SandboxConfig;
2188 let cfg = SandboxConfig {
2189 image: "ubuntu:22.04".into(),
2190 memory_mb: 1024,
2191 ..SandboxConfig::default()
2192 };
2193 let opts = SessionOptions::new().with_sandbox(cfg);
2194 assert!(opts.sandbox_config.is_some());
2195 let sc = opts.sandbox_config.unwrap();
2196 assert_eq!(sc.image, "ubuntu:22.04");
2197 assert_eq!(sc.memory_mb, 1024);
2198 }
2199
2200 #[test]
2201 fn test_session_options_default_has_no_sandbox() {
2202 let opts = SessionOptions::default();
2203 assert!(opts.sandbox_config.is_none());
2204 }
2205
2206 #[tokio::test]
2207 async fn test_session_debug_includes_sandbox_config() {
2208 use crate::sandbox::SandboxConfig;
2209 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
2210 let debug = format!("{:?}", opts);
2211 assert!(debug.contains("sandbox_config"));
2212 }
2213
2214 #[tokio::test]
2215 async fn test_session_build_with_sandbox_config_no_feature_warn() {
2216 let agent = Agent::from_config(test_config()).await.unwrap();
2219 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
2220 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
2222 assert!(session.is_ok());
2223 }
2224
2225 #[tokio::test(flavor = "multi_thread")]
2230 async fn test_session_with_memory_store() {
2231 use a3s_memory::InMemoryStore;
2232 let store = Arc::new(InMemoryStore::new());
2233 let agent = Agent::from_config(test_config()).await.unwrap();
2234 let opts = SessionOptions::new().with_memory(store);
2235 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
2236 assert!(session.memory().is_some());
2237 }
2238
2239 #[tokio::test(flavor = "multi_thread")]
2240 async fn test_session_without_memory_store() {
2241 let agent = Agent::from_config(test_config()).await.unwrap();
2242 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
2243 assert!(session.memory().is_none());
2244 }
2245
2246 #[tokio::test(flavor = "multi_thread")]
2247 async fn test_session_memory_wired_into_config() {
2248 use a3s_memory::InMemoryStore;
2249 let store = Arc::new(InMemoryStore::new());
2250 let agent = Agent::from_config(test_config()).await.unwrap();
2251 let opts = SessionOptions::new().with_memory(store);
2252 let session = agent
2253 .session("/tmp/test-ws-mem-config", Some(opts))
2254 .unwrap();
2255 assert!(session.memory().is_some());
2257 }
2258
2259 #[tokio::test(flavor = "multi_thread")]
2260 async fn test_session_with_file_memory() {
2261 let dir = tempfile::TempDir::new().unwrap();
2262 let agent = Agent::from_config(test_config()).await.unwrap();
2263 let opts = SessionOptions::new().with_file_memory(dir.path());
2264 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
2265 assert!(session.memory().is_some());
2266 }
2267
2268 #[tokio::test(flavor = "multi_thread")]
2269 async fn test_memory_remember_and_recall() {
2270 use a3s_memory::InMemoryStore;
2271 let store = Arc::new(InMemoryStore::new());
2272 let agent = Agent::from_config(test_config()).await.unwrap();
2273 let opts = SessionOptions::new().with_memory(store);
2274 let session = agent
2275 .session("/tmp/test-ws-mem-recall", Some(opts))
2276 .unwrap();
2277
2278 let memory = session.memory().unwrap();
2279 memory
2280 .remember_success("write a file", &["write".to_string()], "done")
2281 .await
2282 .unwrap();
2283
2284 let results = memory.recall_similar("write", 5).await.unwrap();
2285 assert!(!results.is_empty());
2286 let stats = memory.stats().await.unwrap();
2287 assert_eq!(stats.long_term_count, 1);
2288 }
2289
2290 #[tokio::test(flavor = "multi_thread")]
2295 async fn test_session_tool_timeout_configured() {
2296 let agent = Agent::from_config(test_config()).await.unwrap();
2297 let opts = SessionOptions::new().with_tool_timeout(5000);
2298 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
2299 assert!(!session.id().is_empty());
2300 }
2301
2302 #[tokio::test(flavor = "multi_thread")]
2307 async fn test_session_without_queue_builds_ok() {
2308 let agent = Agent::from_config(test_config()).await.unwrap();
2309 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
2310 assert!(!session.id().is_empty());
2311 }
2312
2313 #[tokio::test(flavor = "multi_thread")]
2318 async fn test_concurrent_history_reads() {
2319 let agent = Agent::from_config(test_config()).await.unwrap();
2320 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
2321
2322 let handles: Vec<_> = (0..10)
2323 .map(|_| {
2324 let s = Arc::clone(&session);
2325 tokio::spawn(async move { s.history().len() })
2326 })
2327 .collect();
2328
2329 for h in handles {
2330 h.await.unwrap();
2331 }
2332 }
2333
2334 #[tokio::test(flavor = "multi_thread")]
2339 async fn test_session_no_init_warning_without_file_memory() {
2340 let agent = Agent::from_config(test_config()).await.unwrap();
2341 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
2342 assert!(session.init_warning().is_none());
2343 }
2344
2345 #[tokio::test(flavor = "multi_thread")]
2346 async fn test_session_with_mcp_manager_builds_ok() {
2347 use crate::mcp::manager::McpManager;
2348 let mcp = Arc::new(McpManager::new());
2349 let agent = Agent::from_config(test_config()).await.unwrap();
2350 let opts = SessionOptions::new().with_mcp(mcp);
2351 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
2353 assert!(!session.id().is_empty());
2354 }
2355
2356 #[test]
2357 fn test_session_command_is_pub() {
2358 use crate::SessionCommand;
2360 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
2361 }
2362
2363 #[tokio::test(flavor = "multi_thread")]
2364 async fn test_session_submit_with_queue_executes() {
2365 let agent = Agent::from_config(test_config()).await.unwrap();
2366 let qc = SessionQueueConfig::default();
2367 let opts = SessionOptions::new().with_queue_config(qc);
2368 let session = agent
2369 .session("/tmp/test-ws-submit-exec", Some(opts))
2370 .unwrap();
2371
2372 struct Echo(serde_json::Value);
2373 #[async_trait::async_trait]
2374 impl crate::queue::SessionCommand for Echo {
2375 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2376 Ok(self.0.clone())
2377 }
2378 fn command_type(&self) -> &str {
2379 "echo"
2380 }
2381 }
2382
2383 let rx = session
2384 .submit(
2385 SessionLane::Query,
2386 Box::new(Echo(serde_json::json!({"ok": true}))),
2387 )
2388 .await
2389 .expect("submit should succeed with queue configured");
2390
2391 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
2392 .await
2393 .expect("timed out waiting for command result")
2394 .expect("channel closed before result")
2395 .expect("command returned an error");
2396
2397 assert_eq!(result["ok"], true);
2398 }
2399}