1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{CommandContext, CommandRegistry};
21use crate::config::CodeConfig;
22use crate::error::Result;
23use crate::llm::{LlmClient, Message};
24use crate::prompts::SystemPromptSlots;
25use crate::queue::{
26 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
27 SessionQueueStats,
28};
29use crate::session_lane_queue::SessionLaneQueue;
30use crate::tools::{ToolContext, ToolExecutor};
31use a3s_lane::{DeadLetter, MetricsSnapshot};
32use a3s_memory::{FileMemoryStore, MemoryStore};
33use anyhow::Context;
34use std::path::{Path, PathBuf};
35use std::sync::{Arc, RwLock};
36use tokio::sync::{broadcast, mpsc};
37use tokio::task::JoinHandle;
38
39#[derive(Debug, Clone)]
45pub struct ToolCallResult {
46 pub name: String,
47 pub output: String,
48 pub exit_code: i32,
49}
50
51#[derive(Clone, Default)]
57pub struct SessionOptions {
58 pub model: Option<String>,
60 pub agent_dirs: Vec<PathBuf>,
63 pub queue_config: Option<SessionQueueConfig>,
68 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
70 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
72 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
74 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
76 pub planning_enabled: bool,
78 pub goal_tracking: bool,
80 pub skill_dirs: Vec<PathBuf>,
83 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
85 pub memory_store: Option<Arc<dyn MemoryStore>>,
87 pub(crate) file_memory_dir: Option<PathBuf>,
89 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
91 pub session_id: Option<String>,
93 pub auto_save: bool,
95 pub max_parse_retries: Option<u32>,
98 pub tool_timeout_ms: Option<u64>,
101 pub circuit_breaker_threshold: Option<u32>,
105 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
111 pub auto_compact: bool,
113 pub auto_compact_threshold: Option<f32>,
116 pub continuation_enabled: Option<bool>,
119 pub max_continuation_turns: Option<u32>,
122 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
127 pub temperature: Option<f32>,
129 pub thinking_budget: Option<usize>,
131 pub prompt_slots: Option<SystemPromptSlots>,
137}
138
139impl std::fmt::Debug for SessionOptions {
140 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
141 f.debug_struct("SessionOptions")
142 .field("model", &self.model)
143 .field("agent_dirs", &self.agent_dirs)
144 .field("skill_dirs", &self.skill_dirs)
145 .field("queue_config", &self.queue_config)
146 .field("security_provider", &self.security_provider.is_some())
147 .field("context_providers", &self.context_providers.len())
148 .field("confirmation_manager", &self.confirmation_manager.is_some())
149 .field("permission_checker", &self.permission_checker.is_some())
150 .field("planning_enabled", &self.planning_enabled)
151 .field("goal_tracking", &self.goal_tracking)
152 .field(
153 "skill_registry",
154 &self
155 .skill_registry
156 .as_ref()
157 .map(|r| format!("{} skills", r.len())),
158 )
159 .field("memory_store", &self.memory_store.is_some())
160 .field("session_store", &self.session_store.is_some())
161 .field("session_id", &self.session_id)
162 .field("auto_save", &self.auto_save)
163 .field("max_parse_retries", &self.max_parse_retries)
164 .field("tool_timeout_ms", &self.tool_timeout_ms)
165 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
166 .field("sandbox_config", &self.sandbox_config)
167 .field("auto_compact", &self.auto_compact)
168 .field("auto_compact_threshold", &self.auto_compact_threshold)
169 .field("continuation_enabled", &self.continuation_enabled)
170 .field("max_continuation_turns", &self.max_continuation_turns)
171 .field("mcp_manager", &self.mcp_manager.is_some())
172 .field("temperature", &self.temperature)
173 .field("thinking_budget", &self.thinking_budget)
174 .field("prompt_slots", &self.prompt_slots.is_some())
175 .finish()
176 }
177}
178
179impl SessionOptions {
180 pub fn new() -> Self {
181 Self::default()
182 }
183
184 pub fn with_model(mut self, model: impl Into<String>) -> Self {
185 self.model = Some(model.into());
186 self
187 }
188
189 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
190 self.agent_dirs.push(dir.into());
191 self
192 }
193
194 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
195 self.queue_config = Some(config);
196 self
197 }
198
199 pub fn with_default_security(mut self) -> Self {
201 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
202 self
203 }
204
205 pub fn with_security_provider(
207 mut self,
208 provider: Arc<dyn crate::security::SecurityProvider>,
209 ) -> Self {
210 self.security_provider = Some(provider);
211 self
212 }
213
214 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
216 let config = crate::context::FileSystemContextConfig::new(root_path);
217 self.context_providers
218 .push(Arc::new(crate::context::FileSystemContextProvider::new(
219 config,
220 )));
221 self
222 }
223
224 pub fn with_context_provider(
226 mut self,
227 provider: Arc<dyn crate::context::ContextProvider>,
228 ) -> Self {
229 self.context_providers.push(provider);
230 self
231 }
232
233 pub fn with_confirmation_manager(
235 mut self,
236 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
237 ) -> Self {
238 self.confirmation_manager = Some(manager);
239 self
240 }
241
242 pub fn with_permission_checker(
244 mut self,
245 checker: Arc<dyn crate::permissions::PermissionChecker>,
246 ) -> Self {
247 self.permission_checker = Some(checker);
248 self
249 }
250
251 pub fn with_permissive_policy(self) -> Self {
258 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
259 }
260
261 pub fn with_planning(mut self, enabled: bool) -> Self {
263 self.planning_enabled = enabled;
264 self
265 }
266
267 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
269 self.goal_tracking = enabled;
270 self
271 }
272
273 pub fn with_builtin_skills(mut self) -> Self {
275 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
276 self
277 }
278
279 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
281 self.skill_registry = Some(registry);
282 self
283 }
284
285 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
288 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
289 self
290 }
291
292 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
294 let registry = self
295 .skill_registry
296 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
297 if let Err(e) = registry.load_from_dir(&dir) {
298 tracing::warn!(
299 dir = %dir.as_ref().display(),
300 error = %e,
301 "Failed to load skills from directory — continuing without them"
302 );
303 }
304 self.skill_registry = Some(registry);
305 self
306 }
307
308 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
310 self.memory_store = Some(store);
311 self
312 }
313
314 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
320 self.file_memory_dir = Some(dir.into());
321 self
322 }
323
324 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
326 self.session_store = Some(store);
327 self
328 }
329
330 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
332 let dir = dir.into();
333 match tokio::runtime::Handle::try_current() {
334 Ok(handle) => {
335 match tokio::task::block_in_place(|| {
336 handle.block_on(crate::store::FileSessionStore::new(dir))
337 }) {
338 Ok(store) => {
339 self.session_store =
340 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
341 }
342 Err(e) => {
343 tracing::warn!("Failed to create file session store: {}", e);
344 }
345 }
346 }
347 Err(_) => {
348 tracing::warn!(
349 "No async runtime available for file session store — persistence disabled"
350 );
351 }
352 }
353 self
354 }
355
356 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
358 self.session_id = Some(id.into());
359 self
360 }
361
362 pub fn with_auto_save(mut self, enabled: bool) -> Self {
364 self.auto_save = enabled;
365 self
366 }
367
368 pub fn with_parse_retries(mut self, max: u32) -> Self {
374 self.max_parse_retries = Some(max);
375 self
376 }
377
378 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
384 self.tool_timeout_ms = Some(timeout_ms);
385 self
386 }
387
388 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
394 self.circuit_breaker_threshold = Some(threshold);
395 self
396 }
397
398 pub fn with_resilience_defaults(self) -> Self {
404 self.with_parse_retries(2)
405 .with_tool_timeout(120_000)
406 .with_circuit_breaker(3)
407 }
408
409 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
428 self.sandbox_config = Some(config);
429 self
430 }
431
432 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
437 self.auto_compact = enabled;
438 self
439 }
440
441 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
443 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
444 self
445 }
446
447 pub fn with_continuation(mut self, enabled: bool) -> Self {
452 self.continuation_enabled = Some(enabled);
453 self
454 }
455
456 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
458 self.max_continuation_turns = Some(turns);
459 self
460 }
461
462 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
467 self.mcp_manager = Some(manager);
468 self
469 }
470
471 pub fn with_temperature(mut self, temperature: f32) -> Self {
472 self.temperature = Some(temperature);
473 self
474 }
475
476 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
477 self.thinking_budget = Some(budget);
478 self
479 }
480
481 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
486 self.prompt_slots = Some(slots);
487 self
488 }
489}
490
491pub struct Agent {
500 llm_client: Arc<dyn LlmClient>,
501 code_config: CodeConfig,
502 config: AgentConfig,
503}
504
505impl std::fmt::Debug for Agent {
506 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
507 f.debug_struct("Agent").finish()
508 }
509}
510
511impl Agent {
512 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
516 let source = config_source.into();
517
518 let expanded = if let Some(rest) = source.strip_prefix("~/") {
520 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
521 if let Some(home) = home {
522 format!("{}/{}", home.to_string_lossy(), rest)
523 } else {
524 source.clone()
525 }
526 } else {
527 source.clone()
528 };
529
530 let path = Path::new(&expanded);
531
532 let config = if path.extension().is_some() && path.exists() {
533 CodeConfig::from_file(path)
534 .with_context(|| format!("Failed to load config: {}", path.display()))?
535 } else {
536 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
538 };
539
540 Self::from_config(config).await
541 }
542
543 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
548 Self::new(config_source).await
549 }
550
551 pub async fn from_config(config: CodeConfig) -> Result<Self> {
553 let llm_config = config
554 .default_llm_config()
555 .context("default_model must be set in 'provider/model' format with a valid API key")?;
556 let llm_client = crate::llm::create_client_with_config(llm_config);
557
558 let agent_config = AgentConfig {
559 max_tool_rounds: config
560 .max_tool_rounds
561 .unwrap_or(AgentConfig::default().max_tool_rounds),
562 ..AgentConfig::default()
563 };
564
565 let mut agent = Agent {
566 llm_client,
567 code_config: config,
568 config: agent_config,
569 };
570
571 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
573 for dir in &agent.code_config.skill_dirs.clone() {
574 if let Err(e) = registry.load_from_dir(dir) {
575 tracing::warn!(
576 dir = %dir.display(),
577 error = %e,
578 "Failed to load skills from directory — skipping"
579 );
580 }
581 }
582 agent.config.skill_registry = Some(registry);
583
584 Ok(agent)
585 }
586
587 pub fn session(
592 &self,
593 workspace: impl Into<String>,
594 options: Option<SessionOptions>,
595 ) -> Result<AgentSession> {
596 let opts = options.unwrap_or_default();
597
598 let llm_client = if let Some(ref model) = opts.model {
599 let (provider_name, model_id) = model
600 .split_once('/')
601 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
602
603 let mut llm_config = self
604 .code_config
605 .llm_config(provider_name, model_id)
606 .with_context(|| {
607 format!("provider '{provider_name}' or model '{model_id}' not found in config")
608 })?;
609
610 if let Some(temp) = opts.temperature {
611 llm_config = llm_config.with_temperature(temp);
612 }
613 if let Some(budget) = opts.thinking_budget {
614 llm_config = llm_config.with_thinking_budget(budget);
615 }
616
617 crate::llm::create_client_with_config(llm_config)
618 } else {
619 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
620 tracing::warn!(
621 "temperature/thinking_budget set without model override — these will be ignored. \
622 Use with_model() to apply LLM parameter overrides."
623 );
624 }
625 self.llm_client.clone()
626 };
627
628 self.build_session(workspace.into(), llm_client, &opts)
629 }
630
631 pub fn resume_session(
639 &self,
640 session_id: &str,
641 options: SessionOptions,
642 ) -> Result<AgentSession> {
643 let store = options.session_store.as_ref().ok_or_else(|| {
644 crate::error::CodeError::Session(
645 "resume_session requires a session_store in SessionOptions".to_string(),
646 )
647 })?;
648
649 let data = match tokio::runtime::Handle::try_current() {
651 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
652 .map_err(|e| {
653 crate::error::CodeError::Session(format!(
654 "Failed to load session {}: {}",
655 session_id, e
656 ))
657 })?,
658 Err(_) => {
659 return Err(crate::error::CodeError::Session(
660 "No async runtime available for session resume".to_string(),
661 ))
662 }
663 };
664
665 let data = data.ok_or_else(|| {
666 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
667 })?;
668
669 let mut opts = options;
671 opts.session_id = Some(data.id.clone());
672
673 let llm_client = if let Some(ref model) = opts.model {
674 let (provider_name, model_id) = model
675 .split_once('/')
676 .context("model format must be 'provider/model'")?;
677 let llm_config = self
678 .code_config
679 .llm_config(provider_name, model_id)
680 .with_context(|| {
681 format!("provider '{provider_name}' or model '{model_id}' not found")
682 })?;
683 crate::llm::create_client_with_config(llm_config)
684 } else {
685 self.llm_client.clone()
686 };
687
688 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
689
690 *session.history.write().unwrap() = data.messages;
692
693 Ok(session)
694 }
695
696 fn build_session(
697 &self,
698 workspace: String,
699 llm_client: Arc<dyn LlmClient>,
700 opts: &SessionOptions,
701 ) -> Result<AgentSession> {
702 let canonical =
703 std::fs::canonicalize(&workspace).unwrap_or_else(|_| PathBuf::from(&workspace));
704
705 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
706
707 if let Some(ref mcp) = opts.mcp_manager {
709 let mcp_clone = Arc::clone(mcp);
710 let all_tools = tokio::task::block_in_place(|| {
711 tokio::runtime::Handle::current().block_on(mcp_clone.get_all_tools())
712 });
713 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
715 std::collections::HashMap::new();
716 for (server, tool) in all_tools {
717 by_server.entry(server).or_default().push(tool);
718 }
719 for (server_name, tools) in by_server {
720 for tool in
721 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
722 {
723 tool_executor.register_dynamic_tool(tool);
724 }
725 }
726 }
727
728 let tool_defs = tool_executor.definitions();
729
730 let mut prompt_slots = opts
732 .prompt_slots
733 .clone()
734 .unwrap_or_else(|| self.config.prompt_slots.clone());
735
736 let base_registry = self
740 .config
741 .skill_registry
742 .as_deref()
743 .map(|r| r.fork())
744 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
745 if let Some(ref r) = opts.skill_registry {
747 for skill in r.all() {
748 base_registry.register_unchecked(skill);
749 }
750 }
751 for dir in &opts.skill_dirs {
753 if let Err(e) = base_registry.load_from_dir(dir) {
754 tracing::warn!(
755 dir = %dir.display(),
756 error = %e,
757 "Failed to load session skill dir — skipping"
758 );
759 }
760 }
761 let effective_registry = Arc::new(base_registry);
762
763 let skill_prompt = effective_registry.to_system_prompt();
765 if !skill_prompt.is_empty() {
766 prompt_slots.extra = match prompt_slots.extra {
767 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
768 None => Some(skill_prompt),
769 };
770 }
771
772 let mut init_warning: Option<String> = None;
774 let memory = {
775 let store = if let Some(ref store) = opts.memory_store {
776 Some(Arc::clone(store))
777 } else if let Some(ref dir) = opts.file_memory_dir {
778 match tokio::runtime::Handle::try_current() {
779 Ok(handle) => {
780 let dir = dir.clone();
781 match tokio::task::block_in_place(|| {
782 handle.block_on(FileMemoryStore::new(dir))
783 }) {
784 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
785 Err(e) => {
786 let msg = format!("Failed to create file memory store: {}", e);
787 tracing::warn!("{}", msg);
788 init_warning = Some(msg);
789 None
790 }
791 }
792 }
793 Err(_) => {
794 let msg =
795 "No async runtime available for file memory store — memory disabled"
796 .to_string();
797 tracing::warn!("{}", msg);
798 init_warning = Some(msg);
799 None
800 }
801 }
802 } else {
803 None
804 };
805 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
806 };
807
808 let base = self.config.clone();
809 let config = AgentConfig {
810 prompt_slots,
811 tools: tool_defs,
812 security_provider: opts.security_provider.clone(),
813 permission_checker: opts.permission_checker.clone(),
814 confirmation_manager: opts.confirmation_manager.clone(),
815 context_providers: opts.context_providers.clone(),
816 planning_enabled: opts.planning_enabled,
817 goal_tracking: opts.goal_tracking,
818 skill_registry: Some(effective_registry),
819 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
820 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
821 circuit_breaker_threshold: opts
822 .circuit_breaker_threshold
823 .unwrap_or(base.circuit_breaker_threshold),
824 auto_compact: opts.auto_compact,
825 auto_compact_threshold: opts
826 .auto_compact_threshold
827 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
828 max_context_tokens: base.max_context_tokens,
829 llm_client: Some(Arc::clone(&llm_client)),
830 memory: memory.clone(),
831 continuation_enabled: opts
832 .continuation_enabled
833 .unwrap_or(base.continuation_enabled),
834 max_continuation_turns: opts
835 .max_continuation_turns
836 .unwrap_or(base.max_continuation_turns),
837 ..base
838 };
839
840 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
843 let command_queue = if let Some(ref queue_config) = opts.queue_config {
844 let session_id = uuid::Uuid::new_v4().to_string();
845 let rt = tokio::runtime::Handle::try_current();
846
847 match rt {
848 Ok(handle) => {
849 let queue = tokio::task::block_in_place(|| {
851 handle.block_on(SessionLaneQueue::new(
852 &session_id,
853 queue_config.clone(),
854 agent_event_tx.clone(),
855 ))
856 });
857 match queue {
858 Ok(q) => {
859 let q = Arc::new(q);
861 let q2 = Arc::clone(&q);
862 tokio::task::block_in_place(|| {
863 handle.block_on(async { q2.start().await.ok() })
864 });
865 Some(q)
866 }
867 Err(e) => {
868 tracing::warn!("Failed to create session lane queue: {}", e);
869 None
870 }
871 }
872 }
873 Err(_) => {
874 tracing::warn!(
875 "No async runtime available for queue creation — queue disabled"
876 );
877 None
878 }
879 }
880 } else {
881 None
882 };
883
884 let mut tool_context = ToolContext::new(canonical.clone());
886 if let Some(ref search_config) = self.code_config.search {
887 tool_context = tool_context.with_search_config(search_config.clone());
888 }
889 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
890
891 #[cfg(feature = "sandbox")]
893 if let Some(ref sandbox_cfg) = opts.sandbox_config {
894 let handle: Arc<dyn crate::sandbox::BashSandbox> =
895 Arc::new(crate::sandbox::BoxSandboxHandle::new(
896 sandbox_cfg.clone(),
897 canonical.display().to_string(),
898 ));
899 tool_executor.registry().set_sandbox(Arc::clone(&handle));
902 tool_context = tool_context.with_sandbox(handle);
903 }
904 #[cfg(not(feature = "sandbox"))]
905 if opts.sandbox_config.is_some() {
906 tracing::warn!(
907 "sandbox_config is set but the `sandbox` Cargo feature is not enabled \
908 — bash commands will run locally"
909 );
910 }
911
912 let session_id = opts
913 .session_id
914 .clone()
915 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
916
917 let session_store = if opts.session_store.is_some() {
919 opts.session_store.clone()
920 } else if let Some(ref dir) = self.code_config.sessions_dir {
921 match tokio::runtime::Handle::try_current() {
922 Ok(handle) => {
923 let dir = dir.clone();
924 match tokio::task::block_in_place(|| {
925 handle.block_on(crate::store::FileSessionStore::new(dir))
926 }) {
927 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
928 Err(e) => {
929 tracing::warn!(
930 "Failed to create session store from sessions_dir: {}",
931 e
932 );
933 None
934 }
935 }
936 }
937 Err(_) => {
938 tracing::warn!(
939 "No async runtime for sessions_dir store — persistence disabled"
940 );
941 None
942 }
943 }
944 } else {
945 None
946 };
947
948 Ok(AgentSession {
949 llm_client,
950 tool_executor,
951 tool_context,
952 memory: config.memory.clone(),
953 config,
954 workspace: canonical,
955 session_id,
956 history: RwLock::new(Vec::new()),
957 command_queue,
958 session_store,
959 auto_save: opts.auto_save,
960 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
961 init_warning,
962 command_registry: CommandRegistry::new(),
963 model_name: opts
964 .model
965 .clone()
966 .or_else(|| self.code_config.default_model.clone())
967 .unwrap_or_else(|| "unknown".to_string()),
968 })
969 }
970}
971
972pub struct AgentSession {
981 llm_client: Arc<dyn LlmClient>,
982 tool_executor: Arc<ToolExecutor>,
983 tool_context: ToolContext,
984 config: AgentConfig,
985 workspace: PathBuf,
986 session_id: String,
988 history: RwLock<Vec<Message>>,
990 command_queue: Option<Arc<SessionLaneQueue>>,
992 memory: Option<Arc<crate::memory::AgentMemory>>,
994 session_store: Option<Arc<dyn crate::store::SessionStore>>,
996 auto_save: bool,
998 hook_engine: Arc<crate::hooks::HookEngine>,
1000 init_warning: Option<String>,
1002 command_registry: CommandRegistry,
1004 model_name: String,
1006}
1007
1008impl std::fmt::Debug for AgentSession {
1009 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1010 f.debug_struct("AgentSession")
1011 .field("session_id", &self.session_id)
1012 .field("workspace", &self.workspace.display().to_string())
1013 .field("auto_save", &self.auto_save)
1014 .finish()
1015 }
1016}
1017
1018impl AgentSession {
1019 fn build_agent_loop(&self) -> AgentLoop {
1023 let mut config = self.config.clone();
1024 config.hook_engine =
1025 Some(Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>);
1026 let mut agent_loop = AgentLoop::new(
1027 self.llm_client.clone(),
1028 self.tool_executor.clone(),
1029 self.tool_context.clone(),
1030 config,
1031 );
1032 if let Some(ref queue) = self.command_queue {
1033 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1034 }
1035 agent_loop
1036 }
1037
1038 fn build_command_context(&self) -> CommandContext {
1040 let history = self.history.read().unwrap();
1041
1042 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1044
1045 let mut mcp_map: std::collections::HashMap<String, usize> =
1047 std::collections::HashMap::new();
1048 for name in &tool_names {
1049 if let Some(rest) = name.strip_prefix("mcp__") {
1050 if let Some((server, _)) = rest.split_once("__") {
1051 *mcp_map.entry(server.to_string()).or_default() += 1;
1052 }
1053 }
1054 }
1055 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1056 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1057
1058 CommandContext {
1059 session_id: self.session_id.clone(),
1060 workspace: self.workspace.display().to_string(),
1061 model: self.model_name.clone(),
1062 history_len: history.len(),
1063 total_tokens: 0,
1064 total_cost: 0.0,
1065 tool_names,
1066 mcp_servers,
1067 }
1068 }
1069
1070 pub fn command_registry(&self) -> &CommandRegistry {
1072 &self.command_registry
1073 }
1074
1075 pub fn register_command(&mut self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1077 self.command_registry.register(cmd);
1078 }
1079
1080 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1089 if CommandRegistry::is_command(prompt) {
1091 let ctx = self.build_command_context();
1092 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1093 return Ok(AgentResult {
1094 text: output.text,
1095 messages: history
1096 .map(|h| h.to_vec())
1097 .unwrap_or_else(|| self.history.read().unwrap().clone()),
1098 tool_calls_count: 0,
1099 usage: crate::llm::TokenUsage::default(),
1100 });
1101 }
1102 }
1103
1104 if let Some(ref w) = self.init_warning {
1105 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1106 }
1107 let agent_loop = self.build_agent_loop();
1108
1109 let use_internal = history.is_none();
1110 let effective_history = match history {
1111 Some(h) => h.to_vec(),
1112 None => self.history.read().unwrap().clone(),
1113 };
1114
1115 let result = agent_loop.execute(&effective_history, prompt, None).await?;
1116
1117 if use_internal {
1120 *self.history.write().unwrap() = result.messages.clone();
1121
1122 if self.auto_save {
1124 if let Err(e) = self.save().await {
1125 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1126 }
1127 }
1128 }
1129
1130 Ok(result)
1131 }
1132
1133 pub async fn send_with_attachments(
1138 &self,
1139 prompt: &str,
1140 attachments: &[crate::llm::Attachment],
1141 history: Option<&[Message]>,
1142 ) -> Result<AgentResult> {
1143 let use_internal = history.is_none();
1147 let mut effective_history = match history {
1148 Some(h) => h.to_vec(),
1149 None => self.history.read().unwrap().clone(),
1150 };
1151 effective_history.push(Message::user_with_attachments(prompt, attachments));
1152
1153 let agent_loop = self.build_agent_loop();
1154 let result = agent_loop
1155 .execute_from_messages(effective_history, None, None)
1156 .await?;
1157
1158 if use_internal {
1159 *self.history.write().unwrap() = result.messages.clone();
1160 if self.auto_save {
1161 if let Err(e) = self.save().await {
1162 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1163 }
1164 }
1165 }
1166
1167 Ok(result)
1168 }
1169
1170 pub async fn stream_with_attachments(
1175 &self,
1176 prompt: &str,
1177 attachments: &[crate::llm::Attachment],
1178 history: Option<&[Message]>,
1179 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1180 let (tx, rx) = mpsc::channel(256);
1181 let mut effective_history = match history {
1182 Some(h) => h.to_vec(),
1183 None => self.history.read().unwrap().clone(),
1184 };
1185 effective_history.push(Message::user_with_attachments(prompt, attachments));
1186
1187 let agent_loop = self.build_agent_loop();
1188 let handle = tokio::spawn(async move {
1189 let _ = agent_loop
1190 .execute_from_messages(effective_history, None, Some(tx))
1191 .await;
1192 });
1193
1194 Ok((rx, handle))
1195 }
1196
1197 pub async fn stream(
1207 &self,
1208 prompt: &str,
1209 history: Option<&[Message]>,
1210 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
1211 if CommandRegistry::is_command(prompt) {
1213 let ctx = self.build_command_context();
1214 if let Some(output) = self.command_registry.dispatch(prompt, &ctx) {
1215 let (tx, rx) = mpsc::channel(256);
1216 let handle = tokio::spawn(async move {
1217 let _ = tx
1218 .send(AgentEvent::TextDelta {
1219 text: output.text.clone(),
1220 })
1221 .await;
1222 let _ = tx
1223 .send(AgentEvent::End {
1224 text: output.text.clone(),
1225 usage: crate::llm::TokenUsage::default(),
1226 })
1227 .await;
1228 });
1229 return Ok((rx, handle));
1230 }
1231 }
1232
1233 let (tx, rx) = mpsc::channel(256);
1234 let agent_loop = self.build_agent_loop();
1235 let effective_history = match history {
1236 Some(h) => h.to_vec(),
1237 None => self.history.read().unwrap().clone(),
1238 };
1239 let prompt = prompt.to_string();
1240
1241 let handle = tokio::spawn(async move {
1242 let _ = agent_loop
1243 .execute(&effective_history, &prompt, Some(tx))
1244 .await;
1245 });
1246
1247 Ok((rx, handle))
1248 }
1249
1250 pub fn history(&self) -> Vec<Message> {
1252 self.history.read().unwrap().clone()
1253 }
1254
1255 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
1257 self.memory.as_ref()
1258 }
1259
1260 pub fn id(&self) -> &str {
1262 &self.session_id
1263 }
1264
1265 pub fn workspace(&self) -> &std::path::Path {
1267 &self.workspace
1268 }
1269
1270 pub fn init_warning(&self) -> Option<&str> {
1272 self.init_warning.as_deref()
1273 }
1274
1275 pub fn session_id(&self) -> &str {
1277 &self.session_id
1278 }
1279
1280 pub fn register_hook(&self, hook: crate::hooks::Hook) {
1286 self.hook_engine.register(hook);
1287 }
1288
1289 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
1291 self.hook_engine.unregister(hook_id)
1292 }
1293
1294 pub fn register_hook_handler(
1296 &self,
1297 hook_id: &str,
1298 handler: Arc<dyn crate::hooks::HookHandler>,
1299 ) {
1300 self.hook_engine.register_handler(hook_id, handler);
1301 }
1302
1303 pub fn unregister_hook_handler(&self, hook_id: &str) {
1305 self.hook_engine.unregister_handler(hook_id);
1306 }
1307
1308 pub fn hook_count(&self) -> usize {
1310 self.hook_engine.hook_count()
1311 }
1312
1313 pub async fn save(&self) -> Result<()> {
1317 let store = match &self.session_store {
1318 Some(s) => s,
1319 None => return Ok(()),
1320 };
1321
1322 let history = self.history.read().unwrap().clone();
1323 let now = chrono::Utc::now().timestamp();
1324
1325 let data = crate::store::SessionData {
1326 id: self.session_id.clone(),
1327 config: crate::session::SessionConfig {
1328 name: String::new(),
1329 workspace: self.workspace.display().to_string(),
1330 system_prompt: Some(self.config.prompt_slots.build()),
1331 max_context_length: 200_000,
1332 auto_compact: false,
1333 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
1334 storage_type: crate::config::StorageBackend::File,
1335 queue_config: None,
1336 confirmation_policy: None,
1337 permission_policy: None,
1338 parent_id: None,
1339 security_config: None,
1340 hook_engine: None,
1341 planning_enabled: self.config.planning_enabled,
1342 goal_tracking: self.config.goal_tracking,
1343 },
1344 state: crate::session::SessionState::Active,
1345 messages: history,
1346 context_usage: crate::session::ContextUsage::default(),
1347 total_usage: crate::llm::TokenUsage::default(),
1348 total_cost: 0.0,
1349 model_name: None,
1350 cost_records: Vec::new(),
1351 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
1352 thinking_enabled: false,
1353 thinking_budget: None,
1354 created_at: now,
1355 updated_at: now,
1356 llm_config: None,
1357 tasks: Vec::new(),
1358 parent_id: None,
1359 };
1360
1361 store.save(&data).await?;
1362 tracing::debug!("Session {} saved", self.session_id);
1363 Ok(())
1364 }
1365
1366 pub async fn read_file(&self, path: &str) -> Result<String> {
1368 let args = serde_json::json!({ "file_path": path });
1369 let result = self.tool_executor.execute("read", &args).await?;
1370 Ok(result.output)
1371 }
1372
1373 pub async fn bash(&self, command: &str) -> Result<String> {
1378 let args = serde_json::json!({ "command": command });
1379 let result = self
1380 .tool_executor
1381 .execute_with_context("bash", &args, &self.tool_context)
1382 .await?;
1383 Ok(result.output)
1384 }
1385
1386 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
1388 let args = serde_json::json!({ "pattern": pattern });
1389 let result = self.tool_executor.execute("glob", &args).await?;
1390 let files: Vec<String> = result
1391 .output
1392 .lines()
1393 .filter(|l| !l.is_empty())
1394 .map(|l| l.to_string())
1395 .collect();
1396 Ok(files)
1397 }
1398
1399 pub async fn grep(&self, pattern: &str) -> Result<String> {
1401 let args = serde_json::json!({ "pattern": pattern });
1402 let result = self.tool_executor.execute("grep", &args).await?;
1403 Ok(result.output)
1404 }
1405
1406 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
1408 let result = self.tool_executor.execute(name, &args).await?;
1409 Ok(ToolCallResult {
1410 name: name.to_string(),
1411 output: result.output,
1412 exit_code: result.exit_code,
1413 })
1414 }
1415
1416 pub fn has_queue(&self) -> bool {
1422 self.command_queue.is_some()
1423 }
1424
1425 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
1429 if let Some(ref queue) = self.command_queue {
1430 queue.set_lane_handler(lane, config).await;
1431 }
1432 }
1433
1434 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
1438 if let Some(ref queue) = self.command_queue {
1439 queue.complete_external_task(task_id, result).await
1440 } else {
1441 false
1442 }
1443 }
1444
1445 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
1447 if let Some(ref queue) = self.command_queue {
1448 queue.pending_external_tasks().await
1449 } else {
1450 Vec::new()
1451 }
1452 }
1453
1454 pub async fn queue_stats(&self) -> SessionQueueStats {
1456 if let Some(ref queue) = self.command_queue {
1457 queue.stats().await
1458 } else {
1459 SessionQueueStats::default()
1460 }
1461 }
1462
1463 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
1465 if let Some(ref queue) = self.command_queue {
1466 queue.metrics_snapshot().await
1467 } else {
1468 None
1469 }
1470 }
1471
1472 pub async fn submit(
1478 &self,
1479 lane: SessionLane,
1480 command: Box<dyn crate::queue::SessionCommand>,
1481 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
1482 let queue = self
1483 .command_queue
1484 .as_ref()
1485 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1486 Ok(queue.submit(lane, command).await)
1487 }
1488
1489 pub async fn submit_batch(
1496 &self,
1497 lane: SessionLane,
1498 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
1499 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>> {
1500 let queue = self
1501 .command_queue
1502 .as_ref()
1503 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
1504 Ok(queue.submit_batch(lane, commands).await)
1505 }
1506
1507 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
1509 if let Some(ref queue) = self.command_queue {
1510 queue.dead_letters().await
1511 } else {
1512 Vec::new()
1513 }
1514 }
1515
1516 pub async fn add_mcp_server(
1528 &self,
1529 manager: Arc<crate::mcp::manager::McpManager>,
1530 server_name: &str,
1531 ) -> crate::error::Result<usize> {
1532 manager.connect(server_name).await.map_err(|e| {
1533 crate::error::CodeError::Tool {
1534 tool: server_name.to_string(),
1535 message: format!("Failed to connect MCP server: {}", e),
1536 }
1537 })?;
1538
1539 let tools = manager.get_server_tools(server_name).await;
1540 let count = tools.len();
1541
1542 for tool in crate::mcp::tools::create_mcp_tools(server_name, tools, Arc::clone(&manager)) {
1543 self.tool_executor.register_dynamic_tool(tool);
1544 }
1545
1546 tracing::info!(
1547 session_id = %self.session_id,
1548 server = server_name,
1549 tools = count,
1550 "MCP server added to live session"
1551 );
1552
1553 Ok(count)
1554 }
1555}
1556
1557#[cfg(test)]
1562mod tests {
1563 use super::*;
1564 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
1565 use crate::store::SessionStore;
1566
1567 #[tokio::test]
1568 async fn test_session_submit_no_queue_returns_err() {
1569 let agent = Agent::from_config(test_config()).await.unwrap();
1570 let session = agent.session(".", None).unwrap();
1571 struct Noop;
1572 #[async_trait::async_trait]
1573 impl crate::queue::SessionCommand for Noop {
1574 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1575 Ok(serde_json::json!(null))
1576 }
1577 fn command_type(&self) -> &str { "noop" }
1578 }
1579 let result: anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> = session.submit(SessionLane::Query, Box::new(Noop)).await;
1580 assert!(result.is_err());
1581 assert!(result.unwrap_err().to_string().contains("No queue"));
1582 }
1583
1584 #[tokio::test]
1585 async fn test_session_submit_batch_no_queue_returns_err() {
1586 let agent = Agent::from_config(test_config()).await.unwrap();
1587 let session = agent.session(".", None).unwrap();
1588 struct Noop;
1589 #[async_trait::async_trait]
1590 impl crate::queue::SessionCommand for Noop {
1591 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
1592 Ok(serde_json::json!(null))
1593 }
1594 fn command_type(&self) -> &str { "noop" }
1595 }
1596 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
1597 let result: anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>> = session.submit_batch(SessionLane::Query, cmds).await;
1598 assert!(result.is_err());
1599 assert!(result.unwrap_err().to_string().contains("No queue"));
1600 }
1601
1602 fn test_config() -> CodeConfig {
1603 CodeConfig {
1604 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
1605 providers: vec![
1606 ProviderConfig {
1607 name: "anthropic".to_string(),
1608 api_key: Some("test-key".to_string()),
1609 base_url: None,
1610 models: vec![ModelConfig {
1611 id: "claude-sonnet-4-20250514".to_string(),
1612 name: "Claude Sonnet 4".to_string(),
1613 family: "claude-sonnet".to_string(),
1614 api_key: None,
1615 base_url: None,
1616 attachment: false,
1617 reasoning: false,
1618 tool_call: true,
1619 temperature: true,
1620 release_date: None,
1621 modalities: ModelModalities::default(),
1622 cost: Default::default(),
1623 limit: Default::default(),
1624 }],
1625 },
1626 ProviderConfig {
1627 name: "openai".to_string(),
1628 api_key: Some("test-openai-key".to_string()),
1629 base_url: None,
1630 models: vec![ModelConfig {
1631 id: "gpt-4o".to_string(),
1632 name: "GPT-4o".to_string(),
1633 family: "gpt-4".to_string(),
1634 api_key: None,
1635 base_url: None,
1636 attachment: false,
1637 reasoning: false,
1638 tool_call: true,
1639 temperature: true,
1640 release_date: None,
1641 modalities: ModelModalities::default(),
1642 cost: Default::default(),
1643 limit: Default::default(),
1644 }],
1645 },
1646 ],
1647 ..Default::default()
1648 }
1649 }
1650
1651 #[tokio::test]
1652 async fn test_from_config() {
1653 let agent = Agent::from_config(test_config()).await;
1654 assert!(agent.is_ok());
1655 }
1656
1657 #[tokio::test]
1658 async fn test_session_default() {
1659 let agent = Agent::from_config(test_config()).await.unwrap();
1660 let session = agent.session("/tmp/test-workspace", None);
1661 assert!(session.is_ok());
1662 let debug = format!("{:?}", session.unwrap());
1663 assert!(debug.contains("AgentSession"));
1664 }
1665
1666 #[tokio::test]
1667 async fn test_session_with_model_override() {
1668 let agent = Agent::from_config(test_config()).await.unwrap();
1669 let opts = SessionOptions::new().with_model("openai/gpt-4o");
1670 let session = agent.session("/tmp/test-workspace", Some(opts));
1671 assert!(session.is_ok());
1672 }
1673
1674 #[tokio::test]
1675 async fn test_session_with_invalid_model_format() {
1676 let agent = Agent::from_config(test_config()).await.unwrap();
1677 let opts = SessionOptions::new().with_model("gpt-4o");
1678 let session = agent.session("/tmp/test-workspace", Some(opts));
1679 assert!(session.is_err());
1680 }
1681
1682 #[tokio::test]
1683 async fn test_session_with_model_not_found() {
1684 let agent = Agent::from_config(test_config()).await.unwrap();
1685 let opts = SessionOptions::new().with_model("openai/nonexistent");
1686 let session = agent.session("/tmp/test-workspace", Some(opts));
1687 assert!(session.is_err());
1688 }
1689
1690 #[tokio::test]
1691 async fn test_new_with_hcl_string() {
1692 let hcl = r#"
1693 default_model = "anthropic/claude-sonnet-4-20250514"
1694 providers {
1695 name = "anthropic"
1696 api_key = "test-key"
1697 models {
1698 id = "claude-sonnet-4-20250514"
1699 name = "Claude Sonnet 4"
1700 }
1701 }
1702 "#;
1703 let agent = Agent::new(hcl).await;
1704 assert!(agent.is_ok());
1705 }
1706
1707 #[tokio::test]
1708 async fn test_create_alias_hcl() {
1709 let hcl = r#"
1710 default_model = "anthropic/claude-sonnet-4-20250514"
1711 providers {
1712 name = "anthropic"
1713 api_key = "test-key"
1714 models {
1715 id = "claude-sonnet-4-20250514"
1716 name = "Claude Sonnet 4"
1717 }
1718 }
1719 "#;
1720 let agent = Agent::create(hcl).await;
1721 assert!(agent.is_ok());
1722 }
1723
1724 #[tokio::test]
1725 async fn test_create_and_new_produce_same_result() {
1726 let hcl = r#"
1727 default_model = "anthropic/claude-sonnet-4-20250514"
1728 providers {
1729 name = "anthropic"
1730 api_key = "test-key"
1731 models {
1732 id = "claude-sonnet-4-20250514"
1733 name = "Claude Sonnet 4"
1734 }
1735 }
1736 "#;
1737 let agent_new = Agent::new(hcl).await;
1738 let agent_create = Agent::create(hcl).await;
1739 assert!(agent_new.is_ok());
1740 assert!(agent_create.is_ok());
1741
1742 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
1744 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
1745 assert!(session_new.is_ok());
1746 assert!(session_create.is_ok());
1747 }
1748
1749 #[test]
1750 fn test_from_config_requires_default_model() {
1751 let rt = tokio::runtime::Runtime::new().unwrap();
1752 let config = CodeConfig {
1753 providers: vec![ProviderConfig {
1754 name: "anthropic".to_string(),
1755 api_key: Some("test-key".to_string()),
1756 base_url: None,
1757 models: vec![],
1758 }],
1759 ..Default::default()
1760 };
1761 let result = rt.block_on(Agent::from_config(config));
1762 assert!(result.is_err());
1763 }
1764
1765 #[tokio::test]
1766 async fn test_history_empty_on_new_session() {
1767 let agent = Agent::from_config(test_config()).await.unwrap();
1768 let session = agent.session("/tmp/test-workspace", None).unwrap();
1769 assert!(session.history().is_empty());
1770 }
1771
1772 #[tokio::test]
1773 async fn test_session_options_with_agent_dir() {
1774 let opts = SessionOptions::new()
1775 .with_agent_dir("/tmp/agents")
1776 .with_agent_dir("/tmp/more-agents");
1777 assert_eq!(opts.agent_dirs.len(), 2);
1778 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
1779 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
1780 }
1781
1782 #[test]
1787 fn test_session_options_with_queue_config() {
1788 let qc = SessionQueueConfig::default().with_lane_features();
1789 let opts = SessionOptions::new().with_queue_config(qc.clone());
1790 assert!(opts.queue_config.is_some());
1791
1792 let config = opts.queue_config.unwrap();
1793 assert!(config.enable_dlq);
1794 assert!(config.enable_metrics);
1795 assert!(config.enable_alerts);
1796 assert_eq!(config.default_timeout_ms, Some(60_000));
1797 }
1798
1799 #[tokio::test(flavor = "multi_thread")]
1800 async fn test_session_with_queue_config() {
1801 let agent = Agent::from_config(test_config()).await.unwrap();
1802 let qc = SessionQueueConfig::default();
1803 let opts = SessionOptions::new().with_queue_config(qc);
1804 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
1805 assert!(session.is_ok());
1806 let session = session.unwrap();
1807 assert!(session.has_queue());
1808 }
1809
1810 #[tokio::test]
1811 async fn test_session_without_queue_config() {
1812 let agent = Agent::from_config(test_config()).await.unwrap();
1813 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
1814 assert!(!session.has_queue());
1815 }
1816
1817 #[tokio::test]
1818 async fn test_session_queue_stats_without_queue() {
1819 let agent = Agent::from_config(test_config()).await.unwrap();
1820 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
1821 let stats = session.queue_stats().await;
1822 assert_eq!(stats.total_pending, 0);
1824 assert_eq!(stats.total_active, 0);
1825 }
1826
1827 #[tokio::test(flavor = "multi_thread")]
1828 async fn test_session_queue_stats_with_queue() {
1829 let agent = Agent::from_config(test_config()).await.unwrap();
1830 let qc = SessionQueueConfig::default();
1831 let opts = SessionOptions::new().with_queue_config(qc);
1832 let session = agent
1833 .session("/tmp/test-workspace-qstats", Some(opts))
1834 .unwrap();
1835 let stats = session.queue_stats().await;
1836 assert_eq!(stats.total_pending, 0);
1838 assert_eq!(stats.total_active, 0);
1839 }
1840
1841 #[tokio::test(flavor = "multi_thread")]
1842 async fn test_session_pending_external_tasks_empty() {
1843 let agent = Agent::from_config(test_config()).await.unwrap();
1844 let qc = SessionQueueConfig::default();
1845 let opts = SessionOptions::new().with_queue_config(qc);
1846 let session = agent
1847 .session("/tmp/test-workspace-ext", Some(opts))
1848 .unwrap();
1849 let tasks = session.pending_external_tasks().await;
1850 assert!(tasks.is_empty());
1851 }
1852
1853 #[tokio::test(flavor = "multi_thread")]
1854 async fn test_session_dead_letters_empty() {
1855 let agent = Agent::from_config(test_config()).await.unwrap();
1856 let qc = SessionQueueConfig::default().with_dlq(Some(100));
1857 let opts = SessionOptions::new().with_queue_config(qc);
1858 let session = agent
1859 .session("/tmp/test-workspace-dlq", Some(opts))
1860 .unwrap();
1861 let dead = session.dead_letters().await;
1862 assert!(dead.is_empty());
1863 }
1864
1865 #[tokio::test(flavor = "multi_thread")]
1866 async fn test_session_queue_metrics_disabled() {
1867 let agent = Agent::from_config(test_config()).await.unwrap();
1868 let qc = SessionQueueConfig::default();
1870 let opts = SessionOptions::new().with_queue_config(qc);
1871 let session = agent
1872 .session("/tmp/test-workspace-nomet", Some(opts))
1873 .unwrap();
1874 let metrics = session.queue_metrics().await;
1875 assert!(metrics.is_none());
1876 }
1877
1878 #[tokio::test(flavor = "multi_thread")]
1879 async fn test_session_queue_metrics_enabled() {
1880 let agent = Agent::from_config(test_config()).await.unwrap();
1881 let qc = SessionQueueConfig::default().with_metrics();
1882 let opts = SessionOptions::new().with_queue_config(qc);
1883 let session = agent
1884 .session("/tmp/test-workspace-met", Some(opts))
1885 .unwrap();
1886 let metrics = session.queue_metrics().await;
1887 assert!(metrics.is_some());
1888 }
1889
1890 #[tokio::test(flavor = "multi_thread")]
1891 async fn test_session_set_lane_handler() {
1892 let agent = Agent::from_config(test_config()).await.unwrap();
1893 let qc = SessionQueueConfig::default();
1894 let opts = SessionOptions::new().with_queue_config(qc);
1895 let session = agent
1896 .session("/tmp/test-workspace-handler", Some(opts))
1897 .unwrap();
1898
1899 session
1901 .set_lane_handler(
1902 SessionLane::Execute,
1903 LaneHandlerConfig {
1904 mode: crate::queue::TaskHandlerMode::External,
1905 timeout_ms: 30_000,
1906 },
1907 )
1908 .await;
1909
1910 }
1913
1914 #[tokio::test(flavor = "multi_thread")]
1919 async fn test_session_has_id() {
1920 let agent = Agent::from_config(test_config()).await.unwrap();
1921 let session = agent.session("/tmp/test-ws-id", None).unwrap();
1922 assert!(!session.session_id().is_empty());
1924 assert_eq!(session.session_id().len(), 36); }
1926
1927 #[tokio::test(flavor = "multi_thread")]
1928 async fn test_session_explicit_id() {
1929 let agent = Agent::from_config(test_config()).await.unwrap();
1930 let opts = SessionOptions::new().with_session_id("my-session-42");
1931 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
1932 assert_eq!(session.session_id(), "my-session-42");
1933 }
1934
1935 #[tokio::test(flavor = "multi_thread")]
1936 async fn test_session_save_no_store() {
1937 let agent = Agent::from_config(test_config()).await.unwrap();
1938 let session = agent.session("/tmp/test-ws-save", None).unwrap();
1939 session.save().await.unwrap();
1941 }
1942
1943 #[tokio::test(flavor = "multi_thread")]
1944 async fn test_session_save_and_load() {
1945 let store = Arc::new(crate::store::MemorySessionStore::new());
1946 let agent = Agent::from_config(test_config()).await.unwrap();
1947
1948 let opts = SessionOptions::new()
1949 .with_session_store(store.clone())
1950 .with_session_id("persist-test");
1951 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
1952
1953 session.save().await.unwrap();
1955
1956 assert!(store.exists("persist-test").await.unwrap());
1958
1959 let data = store.load("persist-test").await.unwrap().unwrap();
1960 assert_eq!(data.id, "persist-test");
1961 assert!(data.messages.is_empty());
1962 }
1963
1964 #[tokio::test(flavor = "multi_thread")]
1965 async fn test_session_save_with_history() {
1966 let store = Arc::new(crate::store::MemorySessionStore::new());
1967 let agent = Agent::from_config(test_config()).await.unwrap();
1968
1969 let opts = SessionOptions::new()
1970 .with_session_store(store.clone())
1971 .with_session_id("history-test");
1972 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
1973
1974 {
1976 let mut h = session.history.write().unwrap();
1977 h.push(Message::user("Hello"));
1978 h.push(Message::user("How are you?"));
1979 }
1980
1981 session.save().await.unwrap();
1982
1983 let data = store.load("history-test").await.unwrap().unwrap();
1984 assert_eq!(data.messages.len(), 2);
1985 }
1986
1987 #[tokio::test(flavor = "multi_thread")]
1988 async fn test_resume_session() {
1989 let store = Arc::new(crate::store::MemorySessionStore::new());
1990 let agent = Agent::from_config(test_config()).await.unwrap();
1991
1992 let opts = SessionOptions::new()
1994 .with_session_store(store.clone())
1995 .with_session_id("resume-test");
1996 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
1997 {
1998 let mut h = session.history.write().unwrap();
1999 h.push(Message::user("What is Rust?"));
2000 h.push(Message::user("Tell me more"));
2001 }
2002 session.save().await.unwrap();
2003
2004 let opts2 = SessionOptions::new().with_session_store(store.clone());
2006 let resumed = agent.resume_session("resume-test", opts2).unwrap();
2007
2008 assert_eq!(resumed.session_id(), "resume-test");
2009 let history = resumed.history();
2010 assert_eq!(history.len(), 2);
2011 assert_eq!(history[0].text(), "What is Rust?");
2012 }
2013
2014 #[tokio::test(flavor = "multi_thread")]
2015 async fn test_resume_session_not_found() {
2016 let store = Arc::new(crate::store::MemorySessionStore::new());
2017 let agent = Agent::from_config(test_config()).await.unwrap();
2018
2019 let opts = SessionOptions::new().with_session_store(store.clone());
2020 let result = agent.resume_session("nonexistent", opts);
2021 assert!(result.is_err());
2022 assert!(result.unwrap_err().to_string().contains("not found"));
2023 }
2024
2025 #[tokio::test(flavor = "multi_thread")]
2026 async fn test_resume_session_no_store() {
2027 let agent = Agent::from_config(test_config()).await.unwrap();
2028 let opts = SessionOptions::new();
2029 let result = agent.resume_session("any-id", opts);
2030 assert!(result.is_err());
2031 assert!(result.unwrap_err().to_string().contains("session_store"));
2032 }
2033
2034 #[tokio::test(flavor = "multi_thread")]
2035 async fn test_file_session_store_persistence() {
2036 let dir = tempfile::TempDir::new().unwrap();
2037 let store = Arc::new(
2038 crate::store::FileSessionStore::new(dir.path())
2039 .await
2040 .unwrap(),
2041 );
2042 let agent = Agent::from_config(test_config()).await.unwrap();
2043
2044 let opts = SessionOptions::new()
2046 .with_session_store(store.clone())
2047 .with_session_id("file-persist");
2048 let session = agent
2049 .session("/tmp/test-ws-file-persist", Some(opts))
2050 .unwrap();
2051 {
2052 let mut h = session.history.write().unwrap();
2053 h.push(Message::user("test message"));
2054 }
2055 session.save().await.unwrap();
2056
2057 let store2 = Arc::new(
2059 crate::store::FileSessionStore::new(dir.path())
2060 .await
2061 .unwrap(),
2062 );
2063 let data = store2.load("file-persist").await.unwrap().unwrap();
2064 assert_eq!(data.messages.len(), 1);
2065 }
2066
2067 #[tokio::test(flavor = "multi_thread")]
2068 async fn test_session_options_builders() {
2069 let opts = SessionOptions::new()
2070 .with_session_id("test-id")
2071 .with_auto_save(true);
2072 assert_eq!(opts.session_id, Some("test-id".to_string()));
2073 assert!(opts.auto_save);
2074 }
2075
2076 #[test]
2081 fn test_session_options_with_sandbox_sets_config() {
2082 use crate::sandbox::SandboxConfig;
2083 let cfg = SandboxConfig {
2084 image: "ubuntu:22.04".into(),
2085 memory_mb: 1024,
2086 ..SandboxConfig::default()
2087 };
2088 let opts = SessionOptions::new().with_sandbox(cfg);
2089 assert!(opts.sandbox_config.is_some());
2090 let sc = opts.sandbox_config.unwrap();
2091 assert_eq!(sc.image, "ubuntu:22.04");
2092 assert_eq!(sc.memory_mb, 1024);
2093 }
2094
2095 #[test]
2096 fn test_session_options_default_has_no_sandbox() {
2097 let opts = SessionOptions::default();
2098 assert!(opts.sandbox_config.is_none());
2099 }
2100
2101 #[tokio::test]
2102 async fn test_session_debug_includes_sandbox_config() {
2103 use crate::sandbox::SandboxConfig;
2104 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
2105 let debug = format!("{:?}", opts);
2106 assert!(debug.contains("sandbox_config"));
2107 }
2108
2109 #[tokio::test]
2110 async fn test_session_build_with_sandbox_config_no_feature_warn() {
2111 let agent = Agent::from_config(test_config()).await.unwrap();
2114 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
2115 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
2117 assert!(session.is_ok());
2118 }
2119
2120 #[tokio::test(flavor = "multi_thread")]
2125 async fn test_session_with_memory_store() {
2126 use a3s_memory::InMemoryStore;
2127 let store = Arc::new(InMemoryStore::new());
2128 let agent = Agent::from_config(test_config()).await.unwrap();
2129 let opts = SessionOptions::new().with_memory(store);
2130 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
2131 assert!(session.memory().is_some());
2132 }
2133
2134 #[tokio::test(flavor = "multi_thread")]
2135 async fn test_session_without_memory_store() {
2136 let agent = Agent::from_config(test_config()).await.unwrap();
2137 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
2138 assert!(session.memory().is_none());
2139 }
2140
2141 #[tokio::test(flavor = "multi_thread")]
2142 async fn test_session_memory_wired_into_config() {
2143 use a3s_memory::InMemoryStore;
2144 let store = Arc::new(InMemoryStore::new());
2145 let agent = Agent::from_config(test_config()).await.unwrap();
2146 let opts = SessionOptions::new().with_memory(store);
2147 let session = agent
2148 .session("/tmp/test-ws-mem-config", Some(opts))
2149 .unwrap();
2150 assert!(session.memory().is_some());
2152 }
2153
2154 #[tokio::test(flavor = "multi_thread")]
2155 async fn test_session_with_file_memory() {
2156 let dir = tempfile::TempDir::new().unwrap();
2157 let agent = Agent::from_config(test_config()).await.unwrap();
2158 let opts = SessionOptions::new().with_file_memory(dir.path());
2159 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
2160 assert!(session.memory().is_some());
2161 }
2162
2163 #[tokio::test(flavor = "multi_thread")]
2164 async fn test_memory_remember_and_recall() {
2165 use a3s_memory::InMemoryStore;
2166 let store = Arc::new(InMemoryStore::new());
2167 let agent = Agent::from_config(test_config()).await.unwrap();
2168 let opts = SessionOptions::new().with_memory(store);
2169 let session = agent
2170 .session("/tmp/test-ws-mem-recall", Some(opts))
2171 .unwrap();
2172
2173 let memory = session.memory().unwrap();
2174 memory
2175 .remember_success("write a file", &["write".to_string()], "done")
2176 .await
2177 .unwrap();
2178
2179 let results = memory.recall_similar("write", 5).await.unwrap();
2180 assert!(!results.is_empty());
2181 let stats = memory.stats().await.unwrap();
2182 assert_eq!(stats.long_term_count, 1);
2183 }
2184
2185 #[tokio::test(flavor = "multi_thread")]
2190 async fn test_session_tool_timeout_configured() {
2191 let agent = Agent::from_config(test_config()).await.unwrap();
2192 let opts = SessionOptions::new().with_tool_timeout(5000);
2193 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
2194 assert!(!session.id().is_empty());
2195 }
2196
2197 #[tokio::test(flavor = "multi_thread")]
2202 async fn test_session_without_queue_builds_ok() {
2203 let agent = Agent::from_config(test_config()).await.unwrap();
2204 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
2205 assert!(!session.id().is_empty());
2206 }
2207
2208 #[tokio::test(flavor = "multi_thread")]
2213 async fn test_concurrent_history_reads() {
2214 let agent = Agent::from_config(test_config()).await.unwrap();
2215 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
2216
2217 let handles: Vec<_> = (0..10)
2218 .map(|_| {
2219 let s = Arc::clone(&session);
2220 tokio::spawn(async move { s.history().len() })
2221 })
2222 .collect();
2223
2224 for h in handles {
2225 h.await.unwrap();
2226 }
2227 }
2228
2229 #[tokio::test(flavor = "multi_thread")]
2234 async fn test_session_no_init_warning_without_file_memory() {
2235 let agent = Agent::from_config(test_config()).await.unwrap();
2236 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
2237 assert!(session.init_warning().is_none());
2238 }
2239
2240 #[tokio::test(flavor = "multi_thread")]
2241 async fn test_session_with_mcp_manager_builds_ok() {
2242 use crate::mcp::manager::McpManager;
2243 let mcp = Arc::new(McpManager::new());
2244 let agent = Agent::from_config(test_config()).await.unwrap();
2245 let opts = SessionOptions::new().with_mcp(mcp);
2246 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
2248 assert!(!session.id().is_empty());
2249 }
2250
2251 #[test]
2252 fn test_session_command_is_pub() {
2253 use crate::SessionCommand;
2255 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
2256 }
2257
2258 #[tokio::test(flavor = "multi_thread")]
2259 async fn test_session_submit_with_queue_executes() {
2260 let agent = Agent::from_config(test_config()).await.unwrap();
2261 let qc = SessionQueueConfig::default();
2262 let opts = SessionOptions::new().with_queue_config(qc);
2263 let session = agent
2264 .session("/tmp/test-ws-submit-exec", Some(opts))
2265 .unwrap();
2266
2267 struct Echo(serde_json::Value);
2268 #[async_trait::async_trait]
2269 impl crate::queue::SessionCommand for Echo {
2270 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2271 Ok(self.0.clone())
2272 }
2273 fn command_type(&self) -> &str { "echo" }
2274 }
2275
2276 let rx = session
2277 .submit(SessionLane::Query, Box::new(Echo(serde_json::json!({"ok": true}))))
2278 .await
2279 .expect("submit should succeed with queue configured");
2280
2281 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
2282 .await
2283 .expect("timed out waiting for command result")
2284 .expect("channel closed before result")
2285 .expect("command returned an error");
2286
2287 assert_eq!(result["ok"], true);
2288 }
2289}