1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{
21 CommandAction, CommandContext, CommandRegistry, CronCancelCommand, CronListCommand, LoopCommand,
22};
23use crate::config::CodeConfig;
24use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
25use crate::hitl::PendingConfirmationInfo;
26use crate::llm::{LlmClient, Message};
27use crate::prompts::SystemPromptSlots;
28use crate::queue::{
29 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
30 SessionQueueStats,
31};
32use crate::scheduler::{CronScheduler, ScheduledFire};
33use crate::session_lane_queue::SessionLaneQueue;
34use crate::tools::{ToolContext, ToolExecutor};
35use a3s_lane::{DeadLetter, MetricsSnapshot};
36use a3s_memory::{FileMemoryStore, MemoryStore};
37use anyhow::Context;
38use std::collections::HashMap;
39use std::path::{Path, PathBuf};
40use std::sync::atomic::{AtomicBool, Ordering};
41use std::sync::{Arc, RwLock};
42use tokio::sync::{broadcast, mpsc};
43use tokio::task::JoinHandle;
44
45fn safe_canonicalize(path: &Path) -> PathBuf {
48 match std::fs::canonicalize(path) {
49 Ok(p) => strip_unc_prefix(p),
50 Err(_) => path.to_path_buf(),
51 }
52}
53
54fn strip_unc_prefix(path: PathBuf) -> PathBuf {
57 #[cfg(windows)]
58 {
59 let s = path.to_string_lossy();
60 if let Some(stripped) = s.strip_prefix(r"\\?\") {
61 return PathBuf::from(stripped);
62 }
63 }
64 path
65}
66
67#[derive(Debug, Clone)]
73pub struct ToolCallResult {
74 pub name: String,
75 pub output: String,
76 pub exit_code: i32,
77}
78
79#[derive(Clone, Default)]
85pub struct SessionOptions {
86 pub model: Option<String>,
88 pub agent_dirs: Vec<PathBuf>,
91 pub queue_config: Option<SessionQueueConfig>,
96 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
98 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
100 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
102 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
104 pub planning_enabled: bool,
106 pub goal_tracking: bool,
108 pub skill_dirs: Vec<PathBuf>,
111 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
113 pub memory_store: Option<Arc<dyn MemoryStore>>,
115 pub(crate) file_memory_dir: Option<PathBuf>,
117 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
119 pub session_id: Option<String>,
121 pub auto_save: bool,
123 pub max_parse_retries: Option<u32>,
126 pub tool_timeout_ms: Option<u64>,
129 pub circuit_breaker_threshold: Option<u32>,
133 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
141 pub sandbox_handle: Option<Arc<dyn crate::sandbox::BashSandbox>>,
147 pub auto_compact: bool,
149 pub auto_compact_threshold: Option<f32>,
152 pub continuation_enabled: Option<bool>,
155 pub max_continuation_turns: Option<u32>,
158 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
163 pub temperature: Option<f32>,
165 pub thinking_budget: Option<usize>,
167 pub max_tool_rounds: Option<usize>,
173 pub prompt_slots: Option<SystemPromptSlots>,
179 pub hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
186 pub document_parser_registry: Option<Arc<crate::document_parser::DocumentParserRegistry>>,
192 pub plugins: Vec<std::sync::Arc<dyn crate::plugin::Plugin>>,
207}
208
209impl std::fmt::Debug for SessionOptions {
210 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
211 f.debug_struct("SessionOptions")
212 .field("model", &self.model)
213 .field("agent_dirs", &self.agent_dirs)
214 .field("skill_dirs", &self.skill_dirs)
215 .field("queue_config", &self.queue_config)
216 .field("security_provider", &self.security_provider.is_some())
217 .field("context_providers", &self.context_providers.len())
218 .field("confirmation_manager", &self.confirmation_manager.is_some())
219 .field("permission_checker", &self.permission_checker.is_some())
220 .field("planning_enabled", &self.planning_enabled)
221 .field("goal_tracking", &self.goal_tracking)
222 .field(
223 "skill_registry",
224 &self
225 .skill_registry
226 .as_ref()
227 .map(|r| format!("{} skills", r.len())),
228 )
229 .field("memory_store", &self.memory_store.is_some())
230 .field("session_store", &self.session_store.is_some())
231 .field("session_id", &self.session_id)
232 .field("auto_save", &self.auto_save)
233 .field("max_parse_retries", &self.max_parse_retries)
234 .field("tool_timeout_ms", &self.tool_timeout_ms)
235 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
236 .field("sandbox_config", &self.sandbox_config)
237 .field("auto_compact", &self.auto_compact)
238 .field("auto_compact_threshold", &self.auto_compact_threshold)
239 .field("continuation_enabled", &self.continuation_enabled)
240 .field("max_continuation_turns", &self.max_continuation_turns)
241 .field(
242 "plugins",
243 &self.plugins.iter().map(|p| p.name()).collect::<Vec<_>>(),
244 )
245 .field("mcp_manager", &self.mcp_manager.is_some())
246 .field("temperature", &self.temperature)
247 .field("thinking_budget", &self.thinking_budget)
248 .field("max_tool_rounds", &self.max_tool_rounds)
249 .field("prompt_slots", &self.prompt_slots.is_some())
250 .finish()
251 }
252}
253
254impl SessionOptions {
255 pub fn new() -> Self {
256 Self::default()
257 }
258
259 pub fn with_plugin(mut self, plugin: impl crate::plugin::Plugin + 'static) -> Self {
264 self.plugins.push(std::sync::Arc::new(plugin));
265 self
266 }
267
268 pub fn with_model(mut self, model: impl Into<String>) -> Self {
269 self.model = Some(model.into());
270 self
271 }
272
273 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
274 self.agent_dirs.push(dir.into());
275 self
276 }
277
278 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
279 self.queue_config = Some(config);
280 self
281 }
282
283 pub fn with_default_security(mut self) -> Self {
285 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
286 self
287 }
288
289 pub fn with_security_provider(
291 mut self,
292 provider: Arc<dyn crate::security::SecurityProvider>,
293 ) -> Self {
294 self.security_provider = Some(provider);
295 self
296 }
297
298 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
300 let config = crate::context::FileSystemContextConfig::new(root_path);
301 self.context_providers
302 .push(Arc::new(crate::context::FileSystemContextProvider::new(
303 config,
304 )));
305 self
306 }
307
308 pub fn with_context_provider(
310 mut self,
311 provider: Arc<dyn crate::context::ContextProvider>,
312 ) -> Self {
313 self.context_providers.push(provider);
314 self
315 }
316
317 pub fn with_confirmation_manager(
319 mut self,
320 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
321 ) -> Self {
322 self.confirmation_manager = Some(manager);
323 self
324 }
325
326 pub fn with_permission_checker(
328 mut self,
329 checker: Arc<dyn crate::permissions::PermissionChecker>,
330 ) -> Self {
331 self.permission_checker = Some(checker);
332 self
333 }
334
335 pub fn with_permissive_policy(self) -> Self {
342 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
343 }
344
345 pub fn with_planning(mut self, enabled: bool) -> Self {
347 self.planning_enabled = enabled;
348 self
349 }
350
351 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
353 self.goal_tracking = enabled;
354 self
355 }
356
357 pub fn with_builtin_skills(mut self) -> Self {
359 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
360 self
361 }
362
363 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
365 self.skill_registry = Some(registry);
366 self
367 }
368
369 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
372 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
373 self
374 }
375
376 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
378 let registry = self
379 .skill_registry
380 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
381 if let Err(e) = registry.load_from_dir(&dir) {
382 tracing::warn!(
383 dir = %dir.as_ref().display(),
384 error = %e,
385 "Failed to load skills from directory — continuing without them"
386 );
387 }
388 self.skill_registry = Some(registry);
389 self
390 }
391
392 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
394 self.memory_store = Some(store);
395 self
396 }
397
398 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
404 self.file_memory_dir = Some(dir.into());
405 self
406 }
407
408 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
410 self.session_store = Some(store);
411 self
412 }
413
414 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
416 let dir = dir.into();
417 match tokio::runtime::Handle::try_current() {
418 Ok(handle) => {
419 match tokio::task::block_in_place(|| {
420 handle.block_on(crate::store::FileSessionStore::new(dir))
421 }) {
422 Ok(store) => {
423 self.session_store =
424 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
425 }
426 Err(e) => {
427 tracing::warn!("Failed to create file session store: {}", e);
428 }
429 }
430 }
431 Err(_) => {
432 tracing::warn!(
433 "No async runtime available for file session store — persistence disabled"
434 );
435 }
436 }
437 self
438 }
439
440 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
442 self.session_id = Some(id.into());
443 self
444 }
445
446 pub fn with_auto_save(mut self, enabled: bool) -> Self {
448 self.auto_save = enabled;
449 self
450 }
451
452 pub fn with_parse_retries(mut self, max: u32) -> Self {
458 self.max_parse_retries = Some(max);
459 self
460 }
461
462 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
468 self.tool_timeout_ms = Some(timeout_ms);
469 self
470 }
471
472 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
478 self.circuit_breaker_threshold = Some(threshold);
479 self
480 }
481
482 pub fn with_resilience_defaults(self) -> Self {
488 self.with_parse_retries(2)
489 .with_tool_timeout(120_000)
490 .with_circuit_breaker(3)
491 }
492
493 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
512 self.sandbox_config = Some(config);
513 self
514 }
515
516 pub fn with_sandbox_handle(mut self, handle: Arc<dyn crate::sandbox::BashSandbox>) -> Self {
524 self.sandbox_handle = Some(handle);
525 self
526 }
527
528 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
533 self.auto_compact = enabled;
534 self
535 }
536
537 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
539 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
540 self
541 }
542
543 pub fn with_continuation(mut self, enabled: bool) -> Self {
548 self.continuation_enabled = Some(enabled);
549 self
550 }
551
552 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
554 self.max_continuation_turns = Some(turns);
555 self
556 }
557
558 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
563 self.mcp_manager = Some(manager);
564 self
565 }
566
567 pub fn with_temperature(mut self, temperature: f32) -> Self {
568 self.temperature = Some(temperature);
569 self
570 }
571
572 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
573 self.thinking_budget = Some(budget);
574 self
575 }
576
577 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
582 self.max_tool_rounds = Some(rounds);
583 self
584 }
585
586 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
591 self.prompt_slots = Some(slots);
592 self
593 }
594
595 pub fn with_hook_executor(mut self, executor: Arc<dyn crate::hooks::HookExecutor>) -> Self {
601 self.hook_executor = Some(executor);
602 self
603 }
604}
605
606pub struct Agent {
615 llm_client: Arc<dyn LlmClient>,
616 code_config: CodeConfig,
617 config: AgentConfig,
618 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
620 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
623}
624
625impl std::fmt::Debug for Agent {
626 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
627 f.debug_struct("Agent").finish()
628 }
629}
630
631impl Agent {
632 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
636 let source = config_source.into();
637
638 let expanded = if let Some(rest) = source.strip_prefix("~/") {
640 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
641 if let Some(home) = home {
642 PathBuf::from(home).join(rest).display().to_string()
643 } else {
644 source.clone()
645 }
646 } else {
647 source.clone()
648 };
649
650 let path = Path::new(&expanded);
651
652 let config = if matches!(
653 path.extension().and_then(|ext| ext.to_str()),
654 Some("hcl" | "json")
655 ) {
656 if !path.exists() {
657 return Err(CodeError::Config(format!(
658 "Config file not found: {}",
659 path.display()
660 )));
661 }
662
663 CodeConfig::from_file(path)
664 .with_context(|| format!("Failed to load config: {}", path.display()))?
665 } else {
666 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
668 };
669
670 Self::from_config(config).await
671 }
672
673 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
678 Self::new(config_source).await
679 }
680
681 pub async fn from_config(config: CodeConfig) -> Result<Self> {
683 let llm_config = config
684 .default_llm_config()
685 .context("default_model must be set in 'provider/model' format with a valid API key")?;
686 let llm_client = crate::llm::create_client_with_config(llm_config);
687
688 let agent_config = AgentConfig {
689 max_tool_rounds: config
690 .max_tool_rounds
691 .unwrap_or(AgentConfig::default().max_tool_rounds),
692 ..AgentConfig::default()
693 };
694
695 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
697 (None, vec![])
698 } else {
699 let manager = Arc::new(crate::mcp::manager::McpManager::new());
700 for server in &config.mcp_servers {
701 if !server.enabled {
702 continue;
703 }
704 manager.register_server(server.clone()).await;
705 if let Err(e) = manager.connect(&server.name).await {
706 tracing::warn!(
707 server = %server.name,
708 error = %e,
709 "Failed to connect to MCP server — skipping"
710 );
711 }
712 }
713 let tools = manager.get_all_tools().await;
715 (Some(manager), tools)
716 };
717
718 let mut agent = Agent {
719 llm_client,
720 code_config: config,
721 config: agent_config,
722 global_mcp,
723 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
724 };
725
726 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
728 for dir in &agent.code_config.skill_dirs.clone() {
729 if let Err(e) = registry.load_from_dir(dir) {
730 tracing::warn!(
731 dir = %dir.display(),
732 error = %e,
733 "Failed to load skills from directory — skipping"
734 );
735 }
736 }
737 agent.config.skill_registry = Some(registry);
738
739 Ok(agent)
740 }
741
742 pub async fn refresh_mcp_tools(&self) -> Result<()> {
750 if let Some(ref mcp) = self.global_mcp {
751 let fresh = mcp.get_all_tools().await;
752 *self
753 .global_mcp_tools
754 .lock()
755 .expect("global_mcp_tools lock poisoned") = fresh;
756 }
757 Ok(())
758 }
759
760 pub fn session(
765 &self,
766 workspace: impl Into<String>,
767 options: Option<SessionOptions>,
768 ) -> Result<AgentSession> {
769 let opts = options.unwrap_or_default();
770
771 let llm_client = if let Some(ref model) = opts.model {
772 let (provider_name, model_id) = model
773 .split_once('/')
774 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
775
776 let mut llm_config = self
777 .code_config
778 .llm_config(provider_name, model_id)
779 .with_context(|| {
780 format!("provider '{provider_name}' or model '{model_id}' not found in config")
781 })?;
782
783 if let Some(temp) = opts.temperature {
784 llm_config = llm_config.with_temperature(temp);
785 }
786 if let Some(budget) = opts.thinking_budget {
787 llm_config = llm_config.with_thinking_budget(budget);
788 }
789
790 crate::llm::create_client_with_config(llm_config)
791 } else {
792 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
793 tracing::warn!(
794 "temperature/thinking_budget set without model override — these will be ignored. \
795 Use with_model() to apply LLM parameter overrides."
796 );
797 }
798 self.llm_client.clone()
799 };
800
801 let merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
804 (Some(global), Some(session)) => {
805 let global = Arc::clone(global);
806 let session_mgr = Arc::clone(session);
807 match tokio::runtime::Handle::try_current() {
808 Ok(handle) => {
809 let global_for_merge = Arc::clone(&global);
810 tokio::task::block_in_place(|| {
811 handle.block_on(async move {
812 for config in session_mgr.all_configs().await {
813 let name = config.name.clone();
814 global_for_merge.register_server(config).await;
815 if let Err(e) = global_for_merge.connect(&name).await {
816 tracing::warn!(
817 server = %name,
818 error = %e,
819 "Failed to connect session-level MCP server — skipping"
820 );
821 }
822 }
823 })
824 });
825 }
826 Err(_) => {
827 tracing::warn!(
828 "No async runtime available to merge session-level MCP servers \
829 into global manager — session MCP servers will not be available"
830 );
831 }
832 }
833 SessionOptions {
834 mcp_manager: Some(Arc::clone(&global)),
835 ..opts
836 }
837 }
838 (Some(global), None) => SessionOptions {
839 mcp_manager: Some(Arc::clone(global)),
840 ..opts
841 },
842 _ => opts,
843 };
844
845 self.build_session(workspace.into(), llm_client, &merged_opts)
846 }
847
848 pub fn session_for_agent(
863 &self,
864 workspace: impl Into<String>,
865 def: &crate::subagent::AgentDefinition,
866 extra: Option<SessionOptions>,
867 ) -> Result<AgentSession> {
868 let mut opts = extra.unwrap_or_default();
869
870 if opts.permission_checker.is_none()
872 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
873 {
874 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
875 }
876
877 if opts.max_tool_rounds.is_none() {
879 if let Some(steps) = def.max_steps {
880 opts.max_tool_rounds = Some(steps);
881 }
882 }
883
884 if opts.model.is_none() {
886 if let Some(ref m) = def.model {
887 let provider = m.provider.as_deref().unwrap_or("anthropic");
888 opts.model = Some(format!("{}/{}", provider, m.model));
889 }
890 }
891
892 if let Some(ref prompt) = def.prompt {
899 let slots = opts
900 .prompt_slots
901 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
902 if slots.extra.is_none() {
903 slots.extra = Some(prompt.clone());
904 }
905 }
906
907 self.session(workspace, Some(opts))
908 }
909
910 pub fn resume_session(
918 &self,
919 session_id: &str,
920 options: SessionOptions,
921 ) -> Result<AgentSession> {
922 let store = options.session_store.as_ref().ok_or_else(|| {
923 crate::error::CodeError::Session(
924 "resume_session requires a session_store in SessionOptions".to_string(),
925 )
926 })?;
927
928 let data = match tokio::runtime::Handle::try_current() {
930 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
931 .map_err(|e| {
932 crate::error::CodeError::Session(format!(
933 "Failed to load session {}: {}",
934 session_id, e
935 ))
936 })?,
937 Err(_) => {
938 return Err(crate::error::CodeError::Session(
939 "No async runtime available for session resume".to_string(),
940 ))
941 }
942 };
943
944 let data = data.ok_or_else(|| {
945 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
946 })?;
947
948 let mut opts = options;
950 opts.session_id = Some(data.id.clone());
951
952 let llm_client = if let Some(ref model) = opts.model {
953 let (provider_name, model_id) = model
954 .split_once('/')
955 .context("model format must be 'provider/model'")?;
956 let llm_config = self
957 .code_config
958 .llm_config(provider_name, model_id)
959 .with_context(|| {
960 format!("provider '{provider_name}' or model '{model_id}' not found")
961 })?;
962 crate::llm::create_client_with_config(llm_config)
963 } else {
964 self.llm_client.clone()
965 };
966
967 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
968
969 *write_or_recover(&session.history) = data.messages;
971
972 Ok(session)
973 }
974
975 fn build_session(
976 &self,
977 workspace: String,
978 llm_client: Arc<dyn LlmClient>,
979 opts: &SessionOptions,
980 ) -> Result<AgentSession> {
981 let canonical = safe_canonicalize(Path::new(&workspace));
982
983 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
984
985 let agent_registry = {
989 use crate::subagent::{load_agents_from_dir, AgentRegistry};
990 use crate::tools::register_task_with_mcp;
991 let registry = AgentRegistry::new();
992 for dir in self
993 .code_config
994 .agent_dirs
995 .iter()
996 .chain(opts.agent_dirs.iter())
997 {
998 for agent in load_agents_from_dir(dir) {
999 registry.register(agent);
1000 }
1001 }
1002 let registry = Arc::new(registry);
1003 register_task_with_mcp(
1004 tool_executor.registry(),
1005 Arc::clone(&llm_client),
1006 Arc::clone(®istry),
1007 canonical.display().to_string(),
1008 opts.mcp_manager.clone(),
1009 );
1010 registry
1011 };
1012
1013 if let Some(ref mcp) = opts.mcp_manager {
1016 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
1019 Arc::as_ptr(mcp),
1020 self.global_mcp
1021 .as_ref()
1022 .map(Arc::as_ptr)
1023 .unwrap_or(std::ptr::null()),
1024 ) {
1025 self.global_mcp_tools
1027 .lock()
1028 .expect("global_mcp_tools lock poisoned")
1029 .clone()
1030 } else {
1031 match tokio::runtime::Handle::try_current() {
1033 Ok(handle) => {
1034 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
1035 }
1036 Err(_) => {
1037 tracing::warn!(
1038 "No async runtime available for session-level MCP tools — \
1039 MCP tools will not be registered"
1040 );
1041 vec![]
1042 }
1043 }
1044 };
1045
1046 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
1047 std::collections::HashMap::new();
1048 for (server, tool) in all_tools {
1049 by_server.entry(server).or_default().push(tool);
1050 }
1051 for (server_name, tools) in by_server {
1052 for tool in
1053 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
1054 {
1055 tool_executor.register_dynamic_tool(tool);
1056 }
1057 }
1058 }
1059
1060 let tool_defs = tool_executor.definitions();
1061
1062 let mut prompt_slots = opts
1064 .prompt_slots
1065 .clone()
1066 .unwrap_or_else(|| self.config.prompt_slots.clone());
1067
1068 let agents_md_path = canonical.join("AGENTS.md");
1070 if agents_md_path.exists() && agents_md_path.is_file() {
1071 match std::fs::read_to_string(&agents_md_path) {
1072 Ok(content) if !content.trim().is_empty() => {
1073 tracing::info!(
1074 path = %agents_md_path.display(),
1075 "Auto-loaded AGENTS.md from workspace root"
1076 );
1077 prompt_slots.extra = match prompt_slots.extra {
1078 Some(existing) => Some(format!(
1079 "{}\n\n# Project Instructions (AGENTS.md)\n\n{}",
1080 existing, content
1081 )),
1082 None => Some(format!("# Project Instructions (AGENTS.md)\n\n{}", content)),
1083 };
1084 }
1085 Ok(_) => {
1086 tracing::debug!(
1087 path = %agents_md_path.display(),
1088 "AGENTS.md exists but is empty — skipping"
1089 );
1090 }
1091 Err(e) => {
1092 tracing::warn!(
1093 path = %agents_md_path.display(),
1094 error = %e,
1095 "Failed to read AGENTS.md — skipping"
1096 );
1097 }
1098 }
1099 }
1100
1101 let base_registry = self
1105 .config
1106 .skill_registry
1107 .as_deref()
1108 .map(|r| r.fork())
1109 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
1110 if let Some(ref r) = opts.skill_registry {
1112 for skill in r.all() {
1113 base_registry.register_unchecked(skill);
1114 }
1115 }
1116 for dir in &opts.skill_dirs {
1118 if let Err(e) = base_registry.load_from_dir(dir) {
1119 tracing::warn!(
1120 dir = %dir.display(),
1121 error = %e,
1122 "Failed to load session skill dir — skipping"
1123 );
1124 }
1125 }
1126 let effective_registry = Arc::new(base_registry);
1127
1128 if !opts.plugins.is_empty() {
1131 use crate::plugin::PluginContext;
1132 let mut plugin_ctx = PluginContext::new()
1133 .with_llm(Arc::clone(&self.llm_client))
1134 .with_skill_registry(Arc::clone(&effective_registry));
1135 if let Some(ref dp) = opts.document_parser_registry {
1136 plugin_ctx = plugin_ctx.with_document_parsers(Arc::clone(dp));
1137 }
1138 let plugin_registry = tool_executor.registry();
1139 for plugin in &opts.plugins {
1140 tracing::info!("Loading plugin '{}' v{}", plugin.name(), plugin.version());
1141 match plugin.load(plugin_registry, &plugin_ctx) {
1142 Ok(()) => {
1143 for skill in plugin.skills() {
1144 tracing::debug!(
1145 "Plugin '{}' registered skill '{}'",
1146 plugin.name(),
1147 skill.name
1148 );
1149 effective_registry.register_unchecked(skill);
1150 }
1151 }
1152 Err(e) => {
1153 tracing::error!("Plugin '{}' failed to load: {}", plugin.name(), e);
1154 }
1155 }
1156 }
1157 }
1158
1159 let skill_prompt = effective_registry.to_system_prompt();
1161 if !skill_prompt.is_empty() {
1162 prompt_slots.extra = match prompt_slots.extra {
1163 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
1164 None => Some(skill_prompt),
1165 };
1166 }
1167
1168 let mut init_warning: Option<String> = None;
1170 let memory = {
1171 let store = if let Some(ref store) = opts.memory_store {
1172 Some(Arc::clone(store))
1173 } else if let Some(ref dir) = opts.file_memory_dir {
1174 match tokio::runtime::Handle::try_current() {
1175 Ok(handle) => {
1176 let dir = dir.clone();
1177 match tokio::task::block_in_place(|| {
1178 handle.block_on(FileMemoryStore::new(dir))
1179 }) {
1180 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1181 Err(e) => {
1182 let msg = format!("Failed to create file memory store: {}", e);
1183 tracing::warn!("{}", msg);
1184 init_warning = Some(msg);
1185 None
1186 }
1187 }
1188 }
1189 Err(_) => {
1190 let msg =
1191 "No async runtime available for file memory store — memory disabled"
1192 .to_string();
1193 tracing::warn!("{}", msg);
1194 init_warning = Some(msg);
1195 None
1196 }
1197 }
1198 } else {
1199 None
1200 };
1201 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1202 };
1203
1204 let base = self.config.clone();
1205 let config = AgentConfig {
1206 prompt_slots,
1207 tools: tool_defs,
1208 security_provider: opts.security_provider.clone(),
1209 permission_checker: opts.permission_checker.clone(),
1210 confirmation_manager: opts.confirmation_manager.clone(),
1211 context_providers: opts.context_providers.clone(),
1212 planning_enabled: opts.planning_enabled,
1213 goal_tracking: opts.goal_tracking,
1214 skill_registry: Some(Arc::clone(&effective_registry)),
1215 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1216 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1217 circuit_breaker_threshold: opts
1218 .circuit_breaker_threshold
1219 .unwrap_or(base.circuit_breaker_threshold),
1220 auto_compact: opts.auto_compact,
1221 auto_compact_threshold: opts
1222 .auto_compact_threshold
1223 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
1224 max_context_tokens: base.max_context_tokens,
1225 llm_client: Some(Arc::clone(&llm_client)),
1226 memory: memory.clone(),
1227 continuation_enabled: opts
1228 .continuation_enabled
1229 .unwrap_or(base.continuation_enabled),
1230 max_continuation_turns: opts
1231 .max_continuation_turns
1232 .unwrap_or(base.max_continuation_turns),
1233 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1234 ..base
1235 };
1236
1237 {
1241 use crate::tools::register_skill;
1242 register_skill(
1243 tool_executor.registry(),
1244 Arc::clone(&llm_client),
1245 Arc::clone(&effective_registry),
1246 Arc::clone(&tool_executor),
1247 config.clone(),
1248 );
1249 }
1250
1251 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1254 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1255 let session_id = uuid::Uuid::new_v4().to_string();
1256 let rt = tokio::runtime::Handle::try_current();
1257
1258 match rt {
1259 Ok(handle) => {
1260 let queue = tokio::task::block_in_place(|| {
1262 handle.block_on(SessionLaneQueue::new(
1263 &session_id,
1264 queue_config.clone(),
1265 agent_event_tx.clone(),
1266 ))
1267 });
1268 match queue {
1269 Ok(q) => {
1270 let q = Arc::new(q);
1272 let q2 = Arc::clone(&q);
1273 tokio::task::block_in_place(|| {
1274 handle.block_on(async { q2.start().await.ok() })
1275 });
1276 Some(q)
1277 }
1278 Err(e) => {
1279 tracing::warn!("Failed to create session lane queue: {}", e);
1280 None
1281 }
1282 }
1283 }
1284 Err(_) => {
1285 tracing::warn!(
1286 "No async runtime available for queue creation — queue disabled"
1287 );
1288 None
1289 }
1290 }
1291 } else {
1292 None
1293 };
1294
1295 let mut tool_context = ToolContext::new(canonical.clone());
1297 if let Some(ref search_config) = self.code_config.search {
1298 tool_context = tool_context.with_search_config(search_config.clone());
1299 }
1300 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1301
1302 if let Some(ref registry) = opts.document_parser_registry {
1304 tool_context = tool_context.with_document_parsers(Arc::clone(registry));
1305 }
1306
1307 if let Some(handle) = opts.sandbox_handle.clone() {
1309 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1310 tool_context = tool_context.with_sandbox(handle);
1311 } else if opts.sandbox_config.is_some() {
1312 tracing::warn!(
1313 "sandbox_config is set but no sandbox_handle was provided \
1314 — bash commands will run locally"
1315 );
1316 }
1317
1318 let session_id = opts
1319 .session_id
1320 .clone()
1321 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1322
1323 let session_store = if opts.session_store.is_some() {
1325 opts.session_store.clone()
1326 } else if let Some(ref dir) = self.code_config.sessions_dir {
1327 match tokio::runtime::Handle::try_current() {
1328 Ok(handle) => {
1329 let dir = dir.clone();
1330 match tokio::task::block_in_place(|| {
1331 handle.block_on(crate::store::FileSessionStore::new(dir))
1332 }) {
1333 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1334 Err(e) => {
1335 tracing::warn!(
1336 "Failed to create session store from sessions_dir: {}",
1337 e
1338 );
1339 None
1340 }
1341 }
1342 }
1343 Err(_) => {
1344 tracing::warn!(
1345 "No async runtime for sessions_dir store — persistence disabled"
1346 );
1347 None
1348 }
1349 }
1350 } else {
1351 None
1352 };
1353
1354 let (cron_scheduler, cron_rx) = CronScheduler::new();
1358 let mut command_registry = CommandRegistry::new();
1359 command_registry.register(Arc::new(LoopCommand {
1360 scheduler: Arc::clone(&cron_scheduler),
1361 }));
1362 command_registry.register(Arc::new(CronListCommand {
1363 scheduler: Arc::clone(&cron_scheduler),
1364 }));
1365 command_registry.register(Arc::new(CronCancelCommand {
1366 scheduler: Arc::clone(&cron_scheduler),
1367 }));
1368
1369 Ok(AgentSession {
1370 llm_client,
1371 tool_executor,
1372 tool_context,
1373 memory: config.memory.clone(),
1374 config,
1375 workspace: canonical,
1376 session_id,
1377 history: RwLock::new(Vec::new()),
1378 command_queue,
1379 session_store,
1380 auto_save: opts.auto_save,
1381 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1382 ahp_executor: opts.hook_executor.clone(),
1383 init_warning,
1384 command_registry: std::sync::Mutex::new(command_registry),
1385 model_name: opts
1386 .model
1387 .clone()
1388 .or_else(|| self.code_config.default_model.clone())
1389 .unwrap_or_else(|| "unknown".to_string()),
1390 mcp_manager: opts
1391 .mcp_manager
1392 .clone()
1393 .or_else(|| self.global_mcp.clone())
1394 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1395 agent_registry,
1396 cron_scheduler,
1397 cron_rx: tokio::sync::Mutex::new(cron_rx),
1398 is_processing_cron: AtomicBool::new(false),
1399 cron_started: AtomicBool::new(false),
1400 cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
1401 active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
1402 })
1403 }
1404}
1405
1406#[derive(Debug, Clone)]
1415pub struct BtwResult {
1416 pub question: String,
1418 pub answer: String,
1420 pub usage: crate::llm::TokenUsage,
1422}
1423
1424pub struct AgentSession {
1433 llm_client: Arc<dyn LlmClient>,
1434 tool_executor: Arc<ToolExecutor>,
1435 tool_context: ToolContext,
1436 config: AgentConfig,
1437 workspace: PathBuf,
1438 session_id: String,
1440 history: RwLock<Vec<Message>>,
1442 command_queue: Option<Arc<SessionLaneQueue>>,
1444 memory: Option<Arc<crate::memory::AgentMemory>>,
1446 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1448 auto_save: bool,
1450 hook_engine: Arc<crate::hooks::HookEngine>,
1452 ahp_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
1455 init_warning: Option<String>,
1457 command_registry: std::sync::Mutex<CommandRegistry>,
1460 model_name: String,
1462 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1464 agent_registry: Arc<crate::subagent::AgentRegistry>,
1466 cron_scheduler: Arc<CronScheduler>,
1468 cron_rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<ScheduledFire>>,
1470 is_processing_cron: AtomicBool,
1472 cron_started: AtomicBool,
1476 cancel_token: Arc<tokio::sync::Mutex<Option<tokio_util::sync::CancellationToken>>>,
1479 active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1481}
1482
1483#[derive(Debug, Clone)]
1484struct ActiveToolSnapshot {
1485 tool_name: String,
1486 started_at_ms: u64,
1487}
1488
1489impl std::fmt::Debug for AgentSession {
1490 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1491 f.debug_struct("AgentSession")
1492 .field("session_id", &self.session_id)
1493 .field("workspace", &self.workspace.display().to_string())
1494 .field("auto_save", &self.auto_save)
1495 .finish()
1496 }
1497}
1498
1499impl AgentSession {
1500 fn now_ms() -> u64 {
1501 std::time::SystemTime::now()
1502 .duration_since(std::time::UNIX_EPOCH)
1503 .map(|d| d.as_millis() as u64)
1504 .unwrap_or(0)
1505 }
1506
1507 fn compact_json_value(value: &serde_json::Value) -> String {
1508 let raw = match value {
1509 serde_json::Value::Null => String::new(),
1510 serde_json::Value::String(s) => s.clone(),
1511 _ => serde_json::to_string(value).unwrap_or_default(),
1512 };
1513 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1514 if compact.len() > 180 {
1515 format!("{}...", &compact[..180])
1516 } else {
1517 compact
1518 }
1519 }
1520
1521 async fn apply_runtime_event(
1522 active_tools: &Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1523 event: &AgentEvent,
1524 ) {
1525 match event {
1526 AgentEvent::ToolStart { id, name } => {
1527 active_tools.write().await.insert(
1528 id.clone(),
1529 ActiveToolSnapshot {
1530 tool_name: name.clone(),
1531 started_at_ms: Self::now_ms(),
1532 },
1533 );
1534 }
1535 AgentEvent::ToolEnd { id, .. }
1536 | AgentEvent::PermissionDenied { tool_id: id, .. }
1537 | AgentEvent::ConfirmationRequired { tool_id: id, .. }
1538 | AgentEvent::ConfirmationReceived { tool_id: id, .. }
1539 | AgentEvent::ConfirmationTimeout { tool_id: id, .. } => {
1540 active_tools.write().await.remove(id);
1541 }
1542 _ => {}
1543 }
1544 }
1545
1546 async fn clear_runtime_tracking(&self) {
1547 self.active_tools.write().await.clear();
1548 }
1549
1550 fn build_agent_loop(&self) -> AgentLoop {
1554 let mut config = self.config.clone();
1555 config.hook_engine = Some(if let Some(ref ahp) = self.ahp_executor {
1556 ahp.clone()
1557 } else {
1558 Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>
1559 });
1560 config.tools = self.tool_executor.definitions();
1564 let mut agent_loop = AgentLoop::new(
1565 self.llm_client.clone(),
1566 self.tool_executor.clone(),
1567 self.tool_context.clone(),
1568 config,
1569 );
1570 if let Some(ref queue) = self.command_queue {
1571 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1572 }
1573 agent_loop
1574 }
1575
1576 fn build_command_context(&self) -> CommandContext {
1578 let history = read_or_recover(&self.history);
1579
1580 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1582
1583 let mut mcp_map: std::collections::HashMap<String, usize> =
1585 std::collections::HashMap::new();
1586 for name in &tool_names {
1587 if let Some(rest) = name.strip_prefix("mcp__") {
1588 if let Some((server, _)) = rest.split_once("__") {
1589 *mcp_map.entry(server.to_string()).or_default() += 1;
1590 }
1591 }
1592 }
1593 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1594 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1595
1596 CommandContext {
1597 session_id: self.session_id.clone(),
1598 workspace: self.workspace.display().to_string(),
1599 model: self.model_name.clone(),
1600 history_len: history.len(),
1601 total_tokens: 0,
1602 total_cost: 0.0,
1603 tool_names,
1604 mcp_servers,
1605 }
1606 }
1607
1608 pub fn command_registry(&self) -> std::sync::MutexGuard<'_, CommandRegistry> {
1612 self.command_registry
1613 .lock()
1614 .expect("command_registry lock poisoned")
1615 }
1616
1617 pub fn register_command(&self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1621 self.command_registry
1622 .lock()
1623 .expect("command_registry lock poisoned")
1624 .register(cmd);
1625 }
1626
1627 pub fn cron_scheduler(&self) -> &Arc<CronScheduler> {
1629 &self.cron_scheduler
1630 }
1631
1632 fn ensure_cron_started(&self) {
1637 if !self.cron_started.swap(true, Ordering::Relaxed) {
1638 CronScheduler::start(Arc::clone(&self.cron_scheduler));
1639 }
1640 }
1641
1642 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1651 self.ensure_cron_started();
1654
1655 if CommandRegistry::is_command(prompt) {
1657 let ctx = self.build_command_context();
1658 let output = self.command_registry().dispatch(prompt, &ctx);
1659 if let Some(output) = output {
1661 if let Some(CommandAction::BtwQuery(ref question)) = output.action {
1663 let result = self.btw(question).await?;
1664 return Ok(AgentResult {
1665 text: result.answer,
1666 messages: history
1667 .map(|h| h.to_vec())
1668 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1669 tool_calls_count: 0,
1670 usage: result.usage,
1671 });
1672 }
1673 return Ok(AgentResult {
1674 text: output.text,
1675 messages: history
1676 .map(|h| h.to_vec())
1677 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1678 tool_calls_count: 0,
1679 usage: crate::llm::TokenUsage::default(),
1680 });
1681 }
1682 }
1683
1684 if let Some(ref w) = self.init_warning {
1685 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1686 }
1687 let agent_loop = self.build_agent_loop();
1688 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
1689 let runtime_state = Arc::clone(&self.active_tools);
1690 let runtime_collector = tokio::spawn(async move {
1691 while let Some(event) = runtime_rx.recv().await {
1692 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1693 }
1694 });
1695
1696 let use_internal = history.is_none();
1697 let effective_history = match history {
1698 Some(h) => h.to_vec(),
1699 None => read_or_recover(&self.history).clone(),
1700 };
1701
1702 let cancel_token = tokio_util::sync::CancellationToken::new();
1703 *self.cancel_token.lock().await = Some(cancel_token.clone());
1704 let result = agent_loop
1705 .execute_with_session(
1706 &effective_history,
1707 prompt,
1708 Some(&self.session_id),
1709 Some(runtime_tx),
1710 Some(&cancel_token),
1711 )
1712 .await;
1713 *self.cancel_token.lock().await = None;
1714 let _ = runtime_collector.await;
1715 let result = result?;
1716
1717 if use_internal {
1720 *write_or_recover(&self.history) = result.messages.clone();
1721
1722 if self.auto_save {
1724 if let Err(e) = self.save().await {
1725 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1726 }
1727 }
1728 }
1729
1730 if !self.is_processing_cron.swap(true, Ordering::Relaxed) {
1735 let fires = {
1736 let mut rx = self.cron_rx.lock().await;
1737 let mut fires: Vec<ScheduledFire> = Vec::new();
1738 while let Ok(fire) = rx.try_recv() {
1739 fires.push(fire);
1740 }
1741 fires
1742 };
1743 for fire in fires {
1744 tracing::debug!(
1745 task_id = %fire.task_id,
1746 "Firing scheduled cron task"
1747 );
1748 if let Err(e) = Box::pin(self.send(&fire.prompt, None)).await {
1749 tracing::warn!(
1750 task_id = %fire.task_id,
1751 "Scheduled task failed: {e}"
1752 );
1753 }
1754 }
1755 self.is_processing_cron.store(false, Ordering::Relaxed);
1756 }
1757
1758 self.clear_runtime_tracking().await;
1759
1760 Ok(result)
1761 }
1762
1763 async fn build_btw_runtime_context(&self) -> String {
1764 let mut sections = Vec::new();
1765
1766 let active_tools = {
1767 let tools = self.active_tools.read().await;
1768 let mut items = tools
1769 .iter()
1770 .map(|(tool_id, tool)| {
1771 let elapsed_ms = Self::now_ms().saturating_sub(tool.started_at_ms);
1772 format!(
1773 "- {} [{}] running_for={}ms",
1774 tool.tool_name, tool_id, elapsed_ms
1775 )
1776 })
1777 .collect::<Vec<_>>();
1778 items.sort();
1779 items
1780 };
1781 if !active_tools.is_empty() {
1782 sections.push(format!("[active tools]\n{}", active_tools.join("\n")));
1783 }
1784
1785 if let Some(cm) = &self.config.confirmation_manager {
1786 let pending = cm.pending_confirmations().await;
1787 if !pending.is_empty() {
1788 let mut lines = pending
1789 .into_iter()
1790 .map(
1791 |PendingConfirmationInfo {
1792 tool_id,
1793 tool_name,
1794 args,
1795 remaining_ms,
1796 }| {
1797 let arg_summary = Self::compact_json_value(&args);
1798 if arg_summary.is_empty() {
1799 format!(
1800 "- {} [{}] remaining={}ms",
1801 tool_name, tool_id, remaining_ms
1802 )
1803 } else {
1804 format!(
1805 "- {} [{}] remaining={}ms {}",
1806 tool_name, tool_id, remaining_ms, arg_summary
1807 )
1808 }
1809 },
1810 )
1811 .collect::<Vec<_>>();
1812 lines.sort();
1813 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1814 }
1815 }
1816
1817 if let Some(queue) = &self.command_queue {
1818 let stats = queue.stats().await;
1819 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1820 let mut lines = vec![format!(
1821 "active={}, pending={}, external_pending={}",
1822 stats.total_active, stats.total_pending, stats.external_pending
1823 )];
1824 let mut lanes = stats
1825 .lanes
1826 .into_values()
1827 .filter(|lane| lane.active > 0 || lane.pending > 0)
1828 .map(|lane| {
1829 format!(
1830 "- {:?}: active={}, pending={}, handler={:?}",
1831 lane.lane, lane.active, lane.pending, lane.handler_mode
1832 )
1833 })
1834 .collect::<Vec<_>>();
1835 lanes.sort();
1836 lines.extend(lanes);
1837 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1838 }
1839
1840 let external_tasks = queue.pending_external_tasks().await;
1841 if !external_tasks.is_empty() {
1842 let mut lines = external_tasks
1843 .into_iter()
1844 .take(6)
1845 .map(|task| {
1846 let payload_summary = Self::compact_json_value(&task.payload);
1847 if payload_summary.is_empty() {
1848 format!(
1849 "- {} {:?} remaining={}ms",
1850 task.command_type,
1851 task.lane,
1852 task.remaining_ms()
1853 )
1854 } else {
1855 format!(
1856 "- {} {:?} remaining={}ms {}",
1857 task.command_type,
1858 task.lane,
1859 task.remaining_ms(),
1860 payload_summary
1861 )
1862 }
1863 })
1864 .collect::<Vec<_>>();
1865 lines.sort();
1866 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1867 }
1868 }
1869
1870 if let Some(store) = &self.session_store {
1871 if let Ok(Some(session)) = store.load(&self.session_id).await {
1872 let active_tasks = session
1873 .tasks
1874 .into_iter()
1875 .filter(|task| task.status.is_active())
1876 .take(6)
1877 .map(|task| match task.tool {
1878 Some(tool) if !tool.is_empty() => {
1879 format!("- [{}] {} ({})", task.status, task.content, tool)
1880 }
1881 _ => format!("- [{}] {}", task.status, task.content),
1882 })
1883 .collect::<Vec<_>>();
1884 if !active_tasks.is_empty() {
1885 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1886 }
1887 }
1888 }
1889
1890 sections.join("\n\n")
1891 }
1892
1893 pub async fn btw(&self, question: &str) -> Result<BtwResult> {
1911 self.btw_with_context(question, None).await
1912 }
1913
1914 pub async fn btw_with_context(
1919 &self,
1920 question: &str,
1921 runtime_context: Option<&str>,
1922 ) -> Result<BtwResult> {
1923 let question = question.trim();
1924 if question.is_empty() {
1925 return Err(crate::error::CodeError::Session(
1926 "btw: question cannot be empty".to_string(),
1927 ));
1928 }
1929
1930 let history_snapshot = read_or_recover(&self.history).clone();
1932
1933 let mut messages = history_snapshot;
1935 let mut injected_sections = Vec::new();
1936 let session_runtime = self.build_btw_runtime_context().await;
1937 if !session_runtime.is_empty() {
1938 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1939 }
1940 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1941 injected_sections.push(format!("[host runtime context]\n{}", extra));
1942 }
1943 if !injected_sections.is_empty() {
1944 let injected_context = format!(
1945 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1946 injected_sections.join("\n\n")
1947 );
1948 messages.push(Message::user(&injected_context));
1949 }
1950 messages.push(Message::user(question));
1951
1952 let response = self
1953 .llm_client
1954 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1955 .await
1956 .map_err(|e| {
1957 crate::error::CodeError::Llm(format!("btw: ephemeral LLM call failed: {e}"))
1958 })?;
1959
1960 Ok(BtwResult {
1961 question: question.to_string(),
1962 answer: response.text(),
1963 usage: response.usage,
1964 })
1965 }
1966
1967 pub async fn send_with_attachments(
1972 &self,
1973 prompt: &str,
1974 attachments: &[crate::llm::Attachment],
1975 history: Option<&[Message]>,
1976 ) -> Result<AgentResult> {
1977 let use_internal = history.is_none();
1981 let mut effective_history = match history {
1982 Some(h) => h.to_vec(),
1983 None => read_or_recover(&self.history).clone(),
1984 };
1985 effective_history.push(Message::user_with_attachments(prompt, attachments));
1986
1987 let agent_loop = self.build_agent_loop();
1988 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
1989 let runtime_state = Arc::clone(&self.active_tools);
1990 let runtime_collector = tokio::spawn(async move {
1991 while let Some(event) = runtime_rx.recv().await {
1992 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1993 }
1994 });
1995 let result = agent_loop
1996 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
1997 .await?;
1998 let _ = runtime_collector.await;
1999
2000 if use_internal {
2001 *write_or_recover(&self.history) = result.messages.clone();
2002 if self.auto_save {
2003 if let Err(e) = self.save().await {
2004 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2005 }
2006 }
2007 }
2008
2009 self.clear_runtime_tracking().await;
2010
2011 Ok(result)
2012 }
2013
2014 pub async fn stream_with_attachments(
2019 &self,
2020 prompt: &str,
2021 attachments: &[crate::llm::Attachment],
2022 history: Option<&[Message]>,
2023 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2024 let (tx, rx) = mpsc::channel(256);
2025 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2026 let mut effective_history = match history {
2027 Some(h) => h.to_vec(),
2028 None => read_or_recover(&self.history).clone(),
2029 };
2030 effective_history.push(Message::user_with_attachments(prompt, attachments));
2031
2032 let agent_loop = self.build_agent_loop();
2033 let runtime_state = Arc::clone(&self.active_tools);
2034 let forwarder = tokio::spawn(async move {
2035 let mut forward_enabled = true;
2036 while let Some(event) = runtime_rx.recv().await {
2037 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2038 if forward_enabled && tx.send(event).await.is_err() {
2039 forward_enabled = false;
2040 }
2041 }
2042 });
2043 let handle = tokio::spawn(async move {
2044 let _ = agent_loop
2045 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2046 .await;
2047 });
2048 let active_tools = Arc::clone(&self.active_tools);
2049 let wrapped_handle = tokio::spawn(async move {
2050 let _ = handle.await;
2051 let _ = forwarder.await;
2052 active_tools.write().await.clear();
2053 });
2054
2055 Ok((rx, wrapped_handle))
2056 }
2057
2058 pub async fn stream(
2068 &self,
2069 prompt: &str,
2070 history: Option<&[Message]>,
2071 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2072 self.ensure_cron_started();
2073
2074 if CommandRegistry::is_command(prompt) {
2076 let ctx = self.build_command_context();
2077 let output = self.command_registry().dispatch(prompt, &ctx);
2078 if let Some(output) = output {
2080 let (tx, rx) = mpsc::channel(256);
2081
2082 if let Some(CommandAction::BtwQuery(question)) = output.action {
2084 let llm_client = self.llm_client.clone();
2086 let history_snapshot = read_or_recover(&self.history).clone();
2087 let handle = tokio::spawn(async move {
2088 let mut messages = history_snapshot;
2089 messages.push(Message::user(&question));
2090 match llm_client
2091 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2092 .await
2093 {
2094 Ok(response) => {
2095 let answer = response.text();
2096 let _ = tx
2097 .send(AgentEvent::BtwAnswer {
2098 question: question.clone(),
2099 answer: answer.clone(),
2100 usage: response.usage,
2101 })
2102 .await;
2103 let _ = tx
2104 .send(AgentEvent::End {
2105 text: answer,
2106 usage: crate::llm::TokenUsage::default(),
2107 meta: None,
2108 })
2109 .await;
2110 }
2111 Err(e) => {
2112 let _ = tx
2113 .send(AgentEvent::Error {
2114 message: format!("btw failed: {e}"),
2115 })
2116 .await;
2117 }
2118 }
2119 });
2120 return Ok((rx, handle));
2121 }
2122
2123 let handle = tokio::spawn(async move {
2124 let _ = tx
2125 .send(AgentEvent::TextDelta {
2126 text: output.text.clone(),
2127 })
2128 .await;
2129 let _ = tx
2130 .send(AgentEvent::End {
2131 text: output.text.clone(),
2132 usage: crate::llm::TokenUsage::default(),
2133 meta: None,
2134 })
2135 .await;
2136 });
2137 return Ok((rx, handle));
2138 }
2139 }
2140
2141 let (tx, rx) = mpsc::channel(256);
2142 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2143 let agent_loop = self.build_agent_loop();
2144 let effective_history = match history {
2145 Some(h) => h.to_vec(),
2146 None => read_or_recover(&self.history).clone(),
2147 };
2148 let prompt = prompt.to_string();
2149 let session_id = self.session_id.clone();
2150
2151 let cancel_token = tokio_util::sync::CancellationToken::new();
2152 *self.cancel_token.lock().await = Some(cancel_token.clone());
2153 let token_clone = cancel_token.clone();
2154 let runtime_state = Arc::clone(&self.active_tools);
2155 let forwarder = tokio::spawn(async move {
2156 let mut forward_enabled = true;
2157 while let Some(event) = runtime_rx.recv().await {
2158 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2159 if forward_enabled && tx.send(event).await.is_err() {
2160 forward_enabled = false;
2161 }
2162 }
2163 });
2164
2165 let handle = tokio::spawn(async move {
2166 let _ = agent_loop
2167 .execute_with_session(
2168 &effective_history,
2169 &prompt,
2170 Some(&session_id),
2171 Some(runtime_tx),
2172 Some(&token_clone),
2173 )
2174 .await;
2175 });
2176
2177 let cancel_token_ref = self.cancel_token.clone();
2179 let active_tools = Arc::clone(&self.active_tools);
2180 let wrapped_handle = tokio::spawn(async move {
2181 let _ = handle.await;
2182 let _ = forwarder.await;
2183 *cancel_token_ref.lock().await = None;
2184 active_tools.write().await.clear();
2185 });
2186
2187 Ok((rx, wrapped_handle))
2188 }
2189
2190 pub async fn cancel(&self) -> bool {
2197 let token = self.cancel_token.lock().await.clone();
2198 if let Some(token) = token {
2199 token.cancel();
2200 tracing::info!(session_id = %self.session_id, "Cancelled ongoing operation");
2201 true
2202 } else {
2203 tracing::debug!(session_id = %self.session_id, "No ongoing operation to cancel");
2204 false
2205 }
2206 }
2207
2208 pub fn history(&self) -> Vec<Message> {
2210 read_or_recover(&self.history).clone()
2211 }
2212
2213 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
2215 self.memory.as_ref()
2216 }
2217
2218 pub fn id(&self) -> &str {
2220 &self.session_id
2221 }
2222
2223 pub fn workspace(&self) -> &std::path::Path {
2225 &self.workspace
2226 }
2227
2228 pub fn init_warning(&self) -> Option<&str> {
2230 self.init_warning.as_deref()
2231 }
2232
2233 pub fn session_id(&self) -> &str {
2235 &self.session_id
2236 }
2237
2238 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
2244 self.tool_executor.definitions()
2245 }
2246
2247 pub fn tool_names(&self) -> Vec<String> {
2253 self.tool_executor
2254 .definitions()
2255 .into_iter()
2256 .map(|t| t.name)
2257 .collect()
2258 }
2259
2260 pub fn register_hook(&self, hook: crate::hooks::Hook) {
2266 self.hook_engine.register(hook);
2267 }
2268
2269 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
2271 self.hook_engine.unregister(hook_id)
2272 }
2273
2274 pub fn register_hook_handler(
2276 &self,
2277 hook_id: &str,
2278 handler: Arc<dyn crate::hooks::HookHandler>,
2279 ) {
2280 self.hook_engine.register_handler(hook_id, handler);
2281 }
2282
2283 pub fn unregister_hook_handler(&self, hook_id: &str) {
2285 self.hook_engine.unregister_handler(hook_id);
2286 }
2287
2288 pub fn hook_count(&self) -> usize {
2290 self.hook_engine.hook_count()
2291 }
2292
2293 pub async fn save(&self) -> Result<()> {
2297 let store = match &self.session_store {
2298 Some(s) => s,
2299 None => return Ok(()),
2300 };
2301
2302 let history = read_or_recover(&self.history).clone();
2303 let now = chrono::Utc::now().timestamp();
2304
2305 let data = crate::store::SessionData {
2306 id: self.session_id.clone(),
2307 config: crate::session::SessionConfig {
2308 name: String::new(),
2309 workspace: self.workspace.display().to_string(),
2310 system_prompt: Some(self.config.prompt_slots.build()),
2311 max_context_length: 200_000,
2312 auto_compact: false,
2313 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
2314 storage_type: crate::config::StorageBackend::File,
2315 queue_config: None,
2316 confirmation_policy: None,
2317 permission_policy: None,
2318 parent_id: None,
2319 security_config: None,
2320 hook_engine: None,
2321 planning_enabled: self.config.planning_enabled,
2322 goal_tracking: self.config.goal_tracking,
2323 },
2324 state: crate::session::SessionState::Active,
2325 messages: history,
2326 context_usage: crate::session::ContextUsage::default(),
2327 total_usage: crate::llm::TokenUsage::default(),
2328 total_cost: 0.0,
2329 model_name: None,
2330 cost_records: Vec::new(),
2331 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
2332 thinking_enabled: false,
2333 thinking_budget: None,
2334 created_at: now,
2335 updated_at: now,
2336 llm_config: None,
2337 tasks: Vec::new(),
2338 parent_id: None,
2339 };
2340
2341 store.save(&data).await?;
2342 tracing::debug!("Session {} saved", self.session_id);
2343 Ok(())
2344 }
2345
2346 pub async fn read_file(&self, path: &str) -> Result<String> {
2348 let args = serde_json::json!({ "file_path": path });
2349 let result = self.tool_executor.execute("read", &args).await?;
2350 Ok(result.output)
2351 }
2352
2353 pub async fn bash(&self, command: &str) -> Result<String> {
2358 let args = serde_json::json!({ "command": command });
2359 let result = self
2360 .tool_executor
2361 .execute_with_context("bash", &args, &self.tool_context)
2362 .await?;
2363 Ok(result.output)
2364 }
2365
2366 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
2368 let args = serde_json::json!({ "pattern": pattern });
2369 let result = self.tool_executor.execute("glob", &args).await?;
2370 let files: Vec<String> = result
2371 .output
2372 .lines()
2373 .filter(|l| !l.is_empty())
2374 .map(|l| l.to_string())
2375 .collect();
2376 Ok(files)
2377 }
2378
2379 pub async fn grep(&self, pattern: &str) -> Result<String> {
2381 let args = serde_json::json!({ "pattern": pattern });
2382 let result = self.tool_executor.execute("grep", &args).await?;
2383 Ok(result.output)
2384 }
2385
2386 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
2388 let result = self.tool_executor.execute(name, &args).await?;
2389 Ok(ToolCallResult {
2390 name: name.to_string(),
2391 output: result.output,
2392 exit_code: result.exit_code,
2393 })
2394 }
2395
2396 pub fn has_queue(&self) -> bool {
2402 self.command_queue.is_some()
2403 }
2404
2405 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
2409 if let Some(ref queue) = self.command_queue {
2410 queue.set_lane_handler(lane, config).await;
2411 }
2412 }
2413
2414 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
2418 if let Some(ref queue) = self.command_queue {
2419 queue.complete_external_task(task_id, result).await
2420 } else {
2421 false
2422 }
2423 }
2424
2425 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
2427 if let Some(ref queue) = self.command_queue {
2428 queue.pending_external_tasks().await
2429 } else {
2430 Vec::new()
2431 }
2432 }
2433
2434 pub async fn queue_stats(&self) -> SessionQueueStats {
2436 if let Some(ref queue) = self.command_queue {
2437 queue.stats().await
2438 } else {
2439 SessionQueueStats::default()
2440 }
2441 }
2442
2443 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
2445 if let Some(ref queue) = self.command_queue {
2446 queue.metrics_snapshot().await
2447 } else {
2448 None
2449 }
2450 }
2451
2452 pub async fn submit(
2458 &self,
2459 lane: SessionLane,
2460 command: Box<dyn crate::queue::SessionCommand>,
2461 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
2462 let queue = self
2463 .command_queue
2464 .as_ref()
2465 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2466 Ok(queue.submit(lane, command).await)
2467 }
2468
2469 pub async fn submit_batch(
2476 &self,
2477 lane: SessionLane,
2478 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
2479 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
2480 {
2481 let queue = self
2482 .command_queue
2483 .as_ref()
2484 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2485 Ok(queue.submit_batch(lane, commands).await)
2486 }
2487
2488 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
2490 if let Some(ref queue) = self.command_queue {
2491 queue.dead_letters().await
2492 } else {
2493 Vec::new()
2494 }
2495 }
2496
2497 pub fn register_agent_dir(&self, dir: &std::path::Path) -> usize {
2510 use crate::subagent::load_agents_from_dir;
2511 let agents = load_agents_from_dir(dir);
2512 let count = agents.len();
2513 for agent in agents {
2514 tracing::info!(
2515 session_id = %self.session_id,
2516 agent = agent.name,
2517 dir = %dir.display(),
2518 "Dynamically registered agent"
2519 );
2520 self.agent_registry.register(agent);
2521 }
2522 count
2523 }
2524
2525 pub async fn add_mcp_server(
2532 &self,
2533 config: crate::mcp::McpServerConfig,
2534 ) -> crate::error::Result<usize> {
2535 let server_name = config.name.clone();
2536 self.mcp_manager.register_server(config).await;
2537 self.mcp_manager.connect(&server_name).await.map_err(|e| {
2538 crate::error::CodeError::Tool {
2539 tool: server_name.clone(),
2540 message: format!("Failed to connect MCP server: {}", e),
2541 }
2542 })?;
2543
2544 let tools = self.mcp_manager.get_server_tools(&server_name).await;
2545 let count = tools.len();
2546
2547 for tool in
2548 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
2549 {
2550 self.tool_executor.register_dynamic_tool(tool);
2551 }
2552
2553 tracing::info!(
2554 session_id = %self.session_id,
2555 server = server_name,
2556 tools = count,
2557 "MCP server added to live session"
2558 );
2559
2560 Ok(count)
2561 }
2562
2563 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
2568 self.tool_executor
2569 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
2570 self.mcp_manager
2571 .disconnect(server_name)
2572 .await
2573 .map_err(|e| crate::error::CodeError::Tool {
2574 tool: server_name.to_string(),
2575 message: format!("Failed to disconnect MCP server: {}", e),
2576 })?;
2577 tracing::info!(
2578 session_id = %self.session_id,
2579 server = server_name,
2580 "MCP server removed from live session"
2581 );
2582 Ok(())
2583 }
2584
2585 pub async fn mcp_status(
2587 &self,
2588 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
2589 self.mcp_manager.get_status().await
2590 }
2591}
2592
2593#[cfg(test)]
2598mod tests {
2599 use super::*;
2600 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
2601 use crate::store::SessionStore;
2602
2603 #[tokio::test]
2604 async fn test_session_submit_no_queue_returns_err() {
2605 let agent = Agent::from_config(test_config()).await.unwrap();
2606 let session = agent.session(".", None).unwrap();
2607 struct Noop;
2608 #[async_trait::async_trait]
2609 impl crate::queue::SessionCommand for Noop {
2610 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2611 Ok(serde_json::json!(null))
2612 }
2613 fn command_type(&self) -> &str {
2614 "noop"
2615 }
2616 }
2617 let result: anyhow::Result<
2618 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
2619 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
2620 assert!(result.is_err());
2621 assert!(result.unwrap_err().to_string().contains("No queue"));
2622 }
2623
2624 #[tokio::test]
2625 async fn test_session_submit_batch_no_queue_returns_err() {
2626 let agent = Agent::from_config(test_config()).await.unwrap();
2627 let session = agent.session(".", None).unwrap();
2628 struct Noop;
2629 #[async_trait::async_trait]
2630 impl crate::queue::SessionCommand for Noop {
2631 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2632 Ok(serde_json::json!(null))
2633 }
2634 fn command_type(&self) -> &str {
2635 "noop"
2636 }
2637 }
2638 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
2639 let result: anyhow::Result<
2640 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
2641 > = session.submit_batch(SessionLane::Query, cmds).await;
2642 assert!(result.is_err());
2643 assert!(result.unwrap_err().to_string().contains("No queue"));
2644 }
2645
2646 fn test_config() -> CodeConfig {
2647 CodeConfig {
2648 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
2649 providers: vec![
2650 ProviderConfig {
2651 name: "anthropic".to_string(),
2652 api_key: Some("test-key".to_string()),
2653 base_url: None,
2654 models: vec![ModelConfig {
2655 id: "claude-sonnet-4-20250514".to_string(),
2656 name: "Claude Sonnet 4".to_string(),
2657 family: "claude-sonnet".to_string(),
2658 api_key: None,
2659 base_url: None,
2660 attachment: false,
2661 reasoning: false,
2662 tool_call: true,
2663 temperature: true,
2664 release_date: None,
2665 modalities: ModelModalities::default(),
2666 cost: Default::default(),
2667 limit: Default::default(),
2668 }],
2669 },
2670 ProviderConfig {
2671 name: "openai".to_string(),
2672 api_key: Some("test-openai-key".to_string()),
2673 base_url: None,
2674 models: vec![ModelConfig {
2675 id: "gpt-4o".to_string(),
2676 name: "GPT-4o".to_string(),
2677 family: "gpt-4".to_string(),
2678 api_key: None,
2679 base_url: None,
2680 attachment: false,
2681 reasoning: false,
2682 tool_call: true,
2683 temperature: true,
2684 release_date: None,
2685 modalities: ModelModalities::default(),
2686 cost: Default::default(),
2687 limit: Default::default(),
2688 }],
2689 },
2690 ],
2691 ..Default::default()
2692 }
2693 }
2694
2695 fn build_effective_registry_for_test(
2696 agent_registry: Option<Arc<crate::skills::SkillRegistry>>,
2697 opts: &SessionOptions,
2698 ) -> Arc<crate::skills::SkillRegistry> {
2699 let base_registry = agent_registry
2700 .as_deref()
2701 .map(|r| r.fork())
2702 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
2703 if let Some(ref r) = opts.skill_registry {
2704 for skill in r.all() {
2705 base_registry.register_unchecked(skill);
2706 }
2707 }
2708 for dir in &opts.skill_dirs {
2709 if let Err(e) = base_registry.load_from_dir(dir) {
2710 tracing::warn!(
2711 dir = %dir.display(),
2712 error = %e,
2713 "Failed to load session skill dir — skipping"
2714 );
2715 }
2716 }
2717 Arc::new(base_registry)
2718 }
2719
2720 #[tokio::test]
2721 async fn test_from_config() {
2722 let agent = Agent::from_config(test_config()).await;
2723 assert!(agent.is_ok());
2724 }
2725
2726 #[tokio::test]
2727 async fn test_session_default() {
2728 let agent = Agent::from_config(test_config()).await.unwrap();
2729 let session = agent.session("/tmp/test-workspace", None);
2730 assert!(session.is_ok());
2731 let debug = format!("{:?}", session.unwrap());
2732 assert!(debug.contains("AgentSession"));
2733 }
2734
2735 #[tokio::test]
2736 async fn test_session_with_model_override() {
2737 let agent = Agent::from_config(test_config()).await.unwrap();
2738 let opts = SessionOptions::new().with_model("openai/gpt-4o");
2739 let session = agent.session("/tmp/test-workspace", Some(opts));
2740 assert!(session.is_ok());
2741 }
2742
2743 #[tokio::test]
2744 async fn test_session_with_invalid_model_format() {
2745 let agent = Agent::from_config(test_config()).await.unwrap();
2746 let opts = SessionOptions::new().with_model("gpt-4o");
2747 let session = agent.session("/tmp/test-workspace", Some(opts));
2748 assert!(session.is_err());
2749 }
2750
2751 #[tokio::test]
2752 async fn test_session_with_model_not_found() {
2753 let agent = Agent::from_config(test_config()).await.unwrap();
2754 let opts = SessionOptions::new().with_model("openai/nonexistent");
2755 let session = agent.session("/tmp/test-workspace", Some(opts));
2756 assert!(session.is_err());
2757 }
2758
2759 #[tokio::test]
2760 async fn test_session_preserves_skill_scorer_from_agent_registry() {
2761 use crate::skills::feedback::{
2762 DefaultSkillScorer, SkillFeedback, SkillOutcome, SkillScorer,
2763 };
2764 use crate::skills::{Skill, SkillKind, SkillRegistry};
2765
2766 let registry = Arc::new(SkillRegistry::new());
2767 let scorer = Arc::new(DefaultSkillScorer::default());
2768 registry.set_scorer(scorer.clone());
2769
2770 registry.register_unchecked(Arc::new(Skill {
2771 name: "healthy-skill".to_string(),
2772 description: "healthy".to_string(),
2773 allowed_tools: None,
2774 disable_model_invocation: false,
2775 kind: SkillKind::Instruction,
2776 content: "healthy".to_string(),
2777 tags: vec![],
2778 version: None,
2779 }));
2780 registry.register_unchecked(Arc::new(Skill {
2781 name: "disabled-skill".to_string(),
2782 description: "disabled".to_string(),
2783 allowed_tools: None,
2784 disable_model_invocation: false,
2785 kind: SkillKind::Instruction,
2786 content: "disabled".to_string(),
2787 tags: vec![],
2788 version: None,
2789 }));
2790
2791 for _ in 0..5 {
2792 scorer.record(SkillFeedback {
2793 skill_name: "disabled-skill".to_string(),
2794 outcome: SkillOutcome::Failure,
2795 score_delta: -1.0,
2796 reason: "bad".to_string(),
2797 timestamp: 0,
2798 });
2799 }
2800
2801 let effective_registry =
2802 build_effective_registry_for_test(Some(registry), &SessionOptions::new());
2803 let prompt = effective_registry.to_system_prompt();
2804
2805 assert!(prompt.contains("healthy-skill"));
2806 assert!(!prompt.contains("disabled-skill"));
2807 }
2808
2809 #[tokio::test]
2810 async fn test_session_skill_dirs_preserve_agent_registry_validator() {
2811 use crate::skills::validator::DefaultSkillValidator;
2812 use crate::skills::SkillRegistry;
2813
2814 let registry = Arc::new(SkillRegistry::new());
2815 registry.set_validator(Arc::new(DefaultSkillValidator::default()));
2816
2817 let temp_dir = tempfile::tempdir().unwrap();
2818 let invalid_skill = temp_dir.path().join("invalid.md");
2819 std::fs::write(
2820 &invalid_skill,
2821 r#"---
2822name: BadName
2823description: "invalid skill name"
2824kind: instruction
2825---
2826# Invalid Skill
2827"#,
2828 )
2829 .unwrap();
2830
2831 let opts = SessionOptions::new().with_skill_dirs([temp_dir.path()]);
2832 let effective_registry = build_effective_registry_for_test(Some(registry), &opts);
2833 assert!(effective_registry.get("BadName").is_none());
2834 }
2835
2836 #[tokio::test]
2837 async fn test_session_skill_registry_overrides_agent_registry_without_polluting_parent() {
2838 use crate::skills::{Skill, SkillKind, SkillRegistry};
2839
2840 let registry = Arc::new(SkillRegistry::new());
2841 registry.register_unchecked(Arc::new(Skill {
2842 name: "shared-skill".to_string(),
2843 description: "agent level".to_string(),
2844 allowed_tools: None,
2845 disable_model_invocation: false,
2846 kind: SkillKind::Instruction,
2847 content: "agent content".to_string(),
2848 tags: vec![],
2849 version: None,
2850 }));
2851
2852 let session_registry = Arc::new(SkillRegistry::new());
2853 session_registry.register_unchecked(Arc::new(Skill {
2854 name: "shared-skill".to_string(),
2855 description: "session level".to_string(),
2856 allowed_tools: None,
2857 disable_model_invocation: false,
2858 kind: SkillKind::Instruction,
2859 content: "session content".to_string(),
2860 tags: vec![],
2861 version: None,
2862 }));
2863
2864 let opts = SessionOptions::new().with_skill_registry(session_registry);
2865 let effective_registry = build_effective_registry_for_test(Some(registry.clone()), &opts);
2866
2867 assert_eq!(
2868 effective_registry.get("shared-skill").unwrap().content,
2869 "session content"
2870 );
2871 assert_eq!(
2872 registry.get("shared-skill").unwrap().content,
2873 "agent content"
2874 );
2875 }
2876
2877 #[tokio::test]
2878 async fn test_session_skill_dirs_override_session_registry_and_skip_invalid_entries() {
2879 use crate::skills::{Skill, SkillKind, SkillRegistry};
2880
2881 let session_registry = Arc::new(SkillRegistry::new());
2882 session_registry.register_unchecked(Arc::new(Skill {
2883 name: "shared-skill".to_string(),
2884 description: "session registry".to_string(),
2885 allowed_tools: None,
2886 disable_model_invocation: false,
2887 kind: SkillKind::Instruction,
2888 content: "registry content".to_string(),
2889 tags: vec![],
2890 version: None,
2891 }));
2892
2893 let temp_dir = tempfile::tempdir().unwrap();
2894 std::fs::write(
2895 temp_dir.path().join("shared.md"),
2896 r#"---
2897name: shared-skill
2898description: "skill dir override"
2899kind: instruction
2900---
2901# Shared Skill
2902dir content
2903"#,
2904 )
2905 .unwrap();
2906 std::fs::write(temp_dir.path().join("README.md"), "# not a skill").unwrap();
2907
2908 let opts = SessionOptions::new()
2909 .with_skill_registry(session_registry)
2910 .with_skill_dirs([temp_dir.path()]);
2911 let effective_registry = build_effective_registry_for_test(None, &opts);
2912
2913 assert_eq!(
2914 effective_registry.get("shared-skill").unwrap().description,
2915 "skill dir override"
2916 );
2917 assert!(effective_registry.get("README").is_none());
2918 }
2919
2920 #[tokio::test]
2921 async fn test_session_plugin_skills_are_loaded_into_session_registry_only() {
2922 use crate::plugin::{Plugin, PluginContext};
2923 use crate::skills::{Skill, SkillKind, SkillRegistry};
2924 use crate::tools::ToolRegistry;
2925
2926 struct SessionOnlySkillPlugin;
2927
2928 impl Plugin for SessionOnlySkillPlugin {
2929 fn name(&self) -> &str {
2930 "session-only-skill"
2931 }
2932
2933 fn version(&self) -> &str {
2934 "0.1.0"
2935 }
2936
2937 fn tool_names(&self) -> &[&str] {
2938 &[]
2939 }
2940
2941 fn load(
2942 &self,
2943 _registry: &Arc<ToolRegistry>,
2944 _ctx: &PluginContext,
2945 ) -> anyhow::Result<()> {
2946 Ok(())
2947 }
2948
2949 fn skills(&self) -> Vec<Arc<Skill>> {
2950 vec![Arc::new(Skill {
2951 name: "plugin-session-skill".to_string(),
2952 description: "plugin skill".to_string(),
2953 allowed_tools: None,
2954 disable_model_invocation: false,
2955 kind: SkillKind::Instruction,
2956 content: "plugin content".to_string(),
2957 tags: vec!["plugin".to_string()],
2958 version: None,
2959 })]
2960 }
2961 }
2962
2963 let mut agent = Agent::from_config(test_config()).await.unwrap();
2964 let agent_registry = Arc::new(SkillRegistry::with_builtins());
2965 agent.config.skill_registry = Some(Arc::clone(&agent_registry));
2966
2967 let opts = SessionOptions::new().with_plugin(SessionOnlySkillPlugin);
2968 let session = agent.session("/tmp/test-workspace", Some(opts)).unwrap();
2969
2970 let session_registry = session.config.skill_registry.as_ref().unwrap();
2971 assert!(session_registry.get("plugin-session-skill").is_some());
2972 assert!(agent_registry.get("plugin-session-skill").is_none());
2973 }
2974
2975 #[tokio::test]
2976 async fn test_session_specific_skills_do_not_leak_across_sessions() {
2977 use crate::skills::{Skill, SkillKind, SkillRegistry};
2978
2979 let mut agent = Agent::from_config(test_config()).await.unwrap();
2980 let agent_registry = Arc::new(SkillRegistry::with_builtins());
2981 agent.config.skill_registry = Some(agent_registry);
2982
2983 let session_registry = Arc::new(SkillRegistry::new());
2984 session_registry.register_unchecked(Arc::new(Skill {
2985 name: "session-only".to_string(),
2986 description: "only for first session".to_string(),
2987 allowed_tools: None,
2988 disable_model_invocation: false,
2989 kind: SkillKind::Instruction,
2990 content: "session one".to_string(),
2991 tags: vec![],
2992 version: None,
2993 }));
2994
2995 let session_one = agent
2996 .session(
2997 "/tmp/test-workspace",
2998 Some(SessionOptions::new().with_skill_registry(session_registry)),
2999 )
3000 .unwrap();
3001 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3002
3003 assert!(session_one
3004 .config
3005 .skill_registry
3006 .as_ref()
3007 .unwrap()
3008 .get("session-only")
3009 .is_some());
3010 assert!(session_two
3011 .config
3012 .skill_registry
3013 .as_ref()
3014 .unwrap()
3015 .get("session-only")
3016 .is_none());
3017 }
3018
3019 #[tokio::test]
3020 async fn test_plugin_skills_do_not_leak_across_sessions() {
3021 use crate::plugin::{Plugin, PluginContext};
3022 use crate::skills::{Skill, SkillKind, SkillRegistry};
3023 use crate::tools::ToolRegistry;
3024
3025 struct LeakyPlugin;
3026
3027 impl Plugin for LeakyPlugin {
3028 fn name(&self) -> &str {
3029 "leaky-plugin"
3030 }
3031
3032 fn version(&self) -> &str {
3033 "0.1.0"
3034 }
3035
3036 fn tool_names(&self) -> &[&str] {
3037 &[]
3038 }
3039
3040 fn load(
3041 &self,
3042 _registry: &Arc<ToolRegistry>,
3043 _ctx: &PluginContext,
3044 ) -> anyhow::Result<()> {
3045 Ok(())
3046 }
3047
3048 fn skills(&self) -> Vec<Arc<Skill>> {
3049 vec![Arc::new(Skill {
3050 name: "plugin-only".to_string(),
3051 description: "plugin only".to_string(),
3052 allowed_tools: None,
3053 disable_model_invocation: false,
3054 kind: SkillKind::Instruction,
3055 content: "plugin skill".to_string(),
3056 tags: vec![],
3057 version: None,
3058 })]
3059 }
3060 }
3061
3062 let mut agent = Agent::from_config(test_config()).await.unwrap();
3063 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3064
3065 let session_one = agent
3066 .session(
3067 "/tmp/test-workspace",
3068 Some(SessionOptions::new().with_plugin(LeakyPlugin)),
3069 )
3070 .unwrap();
3071 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3072
3073 assert!(session_one
3074 .config
3075 .skill_registry
3076 .as_ref()
3077 .unwrap()
3078 .get("plugin-only")
3079 .is_some());
3080 assert!(session_two
3081 .config
3082 .skill_registry
3083 .as_ref()
3084 .unwrap()
3085 .get("plugin-only")
3086 .is_none());
3087 }
3088
3089 #[tokio::test]
3090 async fn test_session_for_agent_applies_definition_and_keeps_skill_overrides_isolated() {
3091 use crate::skills::{Skill, SkillKind, SkillRegistry};
3092 use crate::subagent::AgentDefinition;
3093
3094 let mut agent = Agent::from_config(test_config()).await.unwrap();
3095 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3096
3097 let definition = AgentDefinition::new("reviewer", "Review code")
3098 .with_prompt("Agent definition prompt")
3099 .with_max_steps(7);
3100
3101 let session_registry = Arc::new(SkillRegistry::new());
3102 session_registry.register_unchecked(Arc::new(Skill {
3103 name: "agent-session-skill".to_string(),
3104 description: "agent session only".to_string(),
3105 allowed_tools: None,
3106 disable_model_invocation: false,
3107 kind: SkillKind::Instruction,
3108 content: "agent session content".to_string(),
3109 tags: vec![],
3110 version: None,
3111 }));
3112
3113 let session_one = agent
3114 .session_for_agent(
3115 "/tmp/test-workspace",
3116 &definition,
3117 Some(SessionOptions::new().with_skill_registry(session_registry)),
3118 )
3119 .unwrap();
3120 let session_two = agent
3121 .session_for_agent("/tmp/test-workspace", &definition, None)
3122 .unwrap();
3123
3124 assert_eq!(session_one.config.max_tool_rounds, 7);
3125 let extra = session_one.config.prompt_slots.extra.as_deref().unwrap();
3126 assert!(extra.contains("Agent definition prompt"));
3127 assert!(extra.contains("agent-session-skill"));
3128 assert!(session_one
3129 .config
3130 .skill_registry
3131 .as_ref()
3132 .unwrap()
3133 .get("agent-session-skill")
3134 .is_some());
3135 assert!(session_two
3136 .config
3137 .skill_registry
3138 .as_ref()
3139 .unwrap()
3140 .get("agent-session-skill")
3141 .is_none());
3142 }
3143
3144 #[tokio::test]
3145 async fn test_session_for_agent_preserves_existing_prompt_slots_when_injecting_definition_prompt(
3146 ) {
3147 use crate::prompts::SystemPromptSlots;
3148 use crate::subagent::AgentDefinition;
3149
3150 let agent = Agent::from_config(test_config()).await.unwrap();
3151 let definition = AgentDefinition::new("planner", "Plan work")
3152 .with_prompt("Definition extra prompt")
3153 .with_max_steps(3);
3154
3155 let opts = SessionOptions::new().with_prompt_slots(SystemPromptSlots {
3156 role: Some("Custom role".to_string()),
3157 guidelines: None,
3158 response_style: None,
3159 extra: None,
3160 });
3161
3162 let session = agent
3163 .session_for_agent("/tmp/test-workspace", &definition, Some(opts))
3164 .unwrap();
3165
3166 assert_eq!(
3167 session.config.prompt_slots.role.as_deref(),
3168 Some("Custom role")
3169 );
3170 assert!(session
3171 .config
3172 .prompt_slots
3173 .extra
3174 .as_deref()
3175 .unwrap()
3176 .contains("Definition extra prompt"));
3177 assert_eq!(session.config.max_tool_rounds, 3);
3178 }
3179
3180 #[tokio::test]
3181 async fn test_new_with_hcl_string() {
3182 let hcl = r#"
3183 default_model = "anthropic/claude-sonnet-4-20250514"
3184 providers {
3185 name = "anthropic"
3186 api_key = "test-key"
3187 models {
3188 id = "claude-sonnet-4-20250514"
3189 name = "Claude Sonnet 4"
3190 }
3191 }
3192 "#;
3193 let agent = Agent::new(hcl).await;
3194 assert!(agent.is_ok());
3195 }
3196
3197 #[tokio::test]
3198 async fn test_create_alias_hcl() {
3199 let hcl = r#"
3200 default_model = "anthropic/claude-sonnet-4-20250514"
3201 providers {
3202 name = "anthropic"
3203 api_key = "test-key"
3204 models {
3205 id = "claude-sonnet-4-20250514"
3206 name = "Claude Sonnet 4"
3207 }
3208 }
3209 "#;
3210 let agent = Agent::create(hcl).await;
3211 assert!(agent.is_ok());
3212 }
3213
3214 #[tokio::test]
3215 async fn test_create_and_new_produce_same_result() {
3216 let hcl = r#"
3217 default_model = "anthropic/claude-sonnet-4-20250514"
3218 providers {
3219 name = "anthropic"
3220 api_key = "test-key"
3221 models {
3222 id = "claude-sonnet-4-20250514"
3223 name = "Claude Sonnet 4"
3224 }
3225 }
3226 "#;
3227 let agent_new = Agent::new(hcl).await;
3228 let agent_create = Agent::create(hcl).await;
3229 assert!(agent_new.is_ok());
3230 assert!(agent_create.is_ok());
3231
3232 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
3234 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
3235 assert!(session_new.is_ok());
3236 assert!(session_create.is_ok());
3237 }
3238
3239 #[tokio::test]
3240 async fn test_new_with_existing_hcl_file_uses_file_loading() {
3241 let temp_dir = tempfile::tempdir().unwrap();
3242 let config_path = temp_dir.path().join("agent.hcl");
3243 std::fs::write(&config_path, "this is not valid hcl").unwrap();
3244
3245 let err = Agent::new(config_path.display().to_string())
3246 .await
3247 .unwrap_err();
3248 let msg = err.to_string();
3249
3250 assert!(msg.contains("Failed to load config"));
3251 assert!(msg.contains("agent.hcl"));
3252 assert!(!msg.contains("Failed to parse config as HCL string"));
3253 }
3254
3255 #[tokio::test]
3256 async fn test_new_with_missing_hcl_file_reports_not_found() {
3257 let temp_dir = tempfile::tempdir().unwrap();
3258 let missing_path = temp_dir.path().join("agent.hcl");
3259
3260 let err = Agent::new(missing_path.display().to_string())
3261 .await
3262 .unwrap_err();
3263 let msg = err.to_string();
3264
3265 assert!(msg.contains("Config file not found"));
3266 assert!(msg.contains("agent.hcl"));
3267 assert!(!msg.contains("Failed to parse config as HCL string"));
3268 }
3269
3270 #[test]
3271 fn test_from_config_requires_default_model() {
3272 let rt = tokio::runtime::Runtime::new().unwrap();
3273 let config = CodeConfig {
3274 providers: vec![ProviderConfig {
3275 name: "anthropic".to_string(),
3276 api_key: Some("test-key".to_string()),
3277 base_url: None,
3278 models: vec![],
3279 }],
3280 ..Default::default()
3281 };
3282 let result = rt.block_on(Agent::from_config(config));
3283 assert!(result.is_err());
3284 }
3285
3286 #[tokio::test]
3287 async fn test_history_empty_on_new_session() {
3288 let agent = Agent::from_config(test_config()).await.unwrap();
3289 let session = agent.session("/tmp/test-workspace", None).unwrap();
3290 assert!(session.history().is_empty());
3291 }
3292
3293 #[tokio::test]
3294 async fn test_session_options_with_agent_dir() {
3295 let opts = SessionOptions::new()
3296 .with_agent_dir("/tmp/agents")
3297 .with_agent_dir("/tmp/more-agents");
3298 assert_eq!(opts.agent_dirs.len(), 2);
3299 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
3300 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
3301 }
3302
3303 #[test]
3308 fn test_session_options_with_queue_config() {
3309 let qc = SessionQueueConfig::default().with_lane_features();
3310 let opts = SessionOptions::new().with_queue_config(qc.clone());
3311 assert!(opts.queue_config.is_some());
3312
3313 let config = opts.queue_config.unwrap();
3314 assert!(config.enable_dlq);
3315 assert!(config.enable_metrics);
3316 assert!(config.enable_alerts);
3317 assert_eq!(config.default_timeout_ms, Some(60_000));
3318 }
3319
3320 #[tokio::test(flavor = "multi_thread")]
3321 async fn test_session_with_queue_config() {
3322 let agent = Agent::from_config(test_config()).await.unwrap();
3323 let qc = SessionQueueConfig::default();
3324 let opts = SessionOptions::new().with_queue_config(qc);
3325 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
3326 assert!(session.is_ok());
3327 let session = session.unwrap();
3328 assert!(session.has_queue());
3329 }
3330
3331 #[tokio::test]
3332 async fn test_session_without_queue_config() {
3333 let agent = Agent::from_config(test_config()).await.unwrap();
3334 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
3335 assert!(!session.has_queue());
3336 }
3337
3338 #[tokio::test]
3339 async fn test_session_queue_stats_without_queue() {
3340 let agent = Agent::from_config(test_config()).await.unwrap();
3341 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
3342 let stats = session.queue_stats().await;
3343 assert_eq!(stats.total_pending, 0);
3345 assert_eq!(stats.total_active, 0);
3346 }
3347
3348 #[tokio::test(flavor = "multi_thread")]
3349 async fn test_session_queue_stats_with_queue() {
3350 let agent = Agent::from_config(test_config()).await.unwrap();
3351 let qc = SessionQueueConfig::default();
3352 let opts = SessionOptions::new().with_queue_config(qc);
3353 let session = agent
3354 .session("/tmp/test-workspace-qstats", Some(opts))
3355 .unwrap();
3356 let stats = session.queue_stats().await;
3357 assert_eq!(stats.total_pending, 0);
3359 assert_eq!(stats.total_active, 0);
3360 }
3361
3362 #[tokio::test(flavor = "multi_thread")]
3363 async fn test_session_pending_external_tasks_empty() {
3364 let agent = Agent::from_config(test_config()).await.unwrap();
3365 let qc = SessionQueueConfig::default();
3366 let opts = SessionOptions::new().with_queue_config(qc);
3367 let session = agent
3368 .session("/tmp/test-workspace-ext", Some(opts))
3369 .unwrap();
3370 let tasks = session.pending_external_tasks().await;
3371 assert!(tasks.is_empty());
3372 }
3373
3374 #[tokio::test(flavor = "multi_thread")]
3375 async fn test_session_dead_letters_empty() {
3376 let agent = Agent::from_config(test_config()).await.unwrap();
3377 let qc = SessionQueueConfig::default().with_dlq(Some(100));
3378 let opts = SessionOptions::new().with_queue_config(qc);
3379 let session = agent
3380 .session("/tmp/test-workspace-dlq", Some(opts))
3381 .unwrap();
3382 let dead = session.dead_letters().await;
3383 assert!(dead.is_empty());
3384 }
3385
3386 #[tokio::test(flavor = "multi_thread")]
3387 async fn test_session_queue_metrics_disabled() {
3388 let agent = Agent::from_config(test_config()).await.unwrap();
3389 let qc = SessionQueueConfig::default();
3391 let opts = SessionOptions::new().with_queue_config(qc);
3392 let session = agent
3393 .session("/tmp/test-workspace-nomet", Some(opts))
3394 .unwrap();
3395 let metrics = session.queue_metrics().await;
3396 assert!(metrics.is_none());
3397 }
3398
3399 #[tokio::test(flavor = "multi_thread")]
3400 async fn test_session_queue_metrics_enabled() {
3401 let agent = Agent::from_config(test_config()).await.unwrap();
3402 let qc = SessionQueueConfig::default().with_metrics();
3403 let opts = SessionOptions::new().with_queue_config(qc);
3404 let session = agent
3405 .session("/tmp/test-workspace-met", Some(opts))
3406 .unwrap();
3407 let metrics = session.queue_metrics().await;
3408 assert!(metrics.is_some());
3409 }
3410
3411 #[tokio::test(flavor = "multi_thread")]
3412 async fn test_session_set_lane_handler() {
3413 let agent = Agent::from_config(test_config()).await.unwrap();
3414 let qc = SessionQueueConfig::default();
3415 let opts = SessionOptions::new().with_queue_config(qc);
3416 let session = agent
3417 .session("/tmp/test-workspace-handler", Some(opts))
3418 .unwrap();
3419
3420 session
3422 .set_lane_handler(
3423 SessionLane::Execute,
3424 LaneHandlerConfig {
3425 mode: crate::queue::TaskHandlerMode::External,
3426 timeout_ms: 30_000,
3427 },
3428 )
3429 .await;
3430
3431 }
3434
3435 #[tokio::test(flavor = "multi_thread")]
3440 async fn test_session_has_id() {
3441 let agent = Agent::from_config(test_config()).await.unwrap();
3442 let session = agent.session("/tmp/test-ws-id", None).unwrap();
3443 assert!(!session.session_id().is_empty());
3445 assert_eq!(session.session_id().len(), 36); }
3447
3448 #[tokio::test(flavor = "multi_thread")]
3449 async fn test_session_explicit_id() {
3450 let agent = Agent::from_config(test_config()).await.unwrap();
3451 let opts = SessionOptions::new().with_session_id("my-session-42");
3452 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
3453 assert_eq!(session.session_id(), "my-session-42");
3454 }
3455
3456 #[tokio::test(flavor = "multi_thread")]
3457 async fn test_session_save_no_store() {
3458 let agent = Agent::from_config(test_config()).await.unwrap();
3459 let session = agent.session("/tmp/test-ws-save", None).unwrap();
3460 session.save().await.unwrap();
3462 }
3463
3464 #[tokio::test(flavor = "multi_thread")]
3465 async fn test_session_save_and_load() {
3466 let store = Arc::new(crate::store::MemorySessionStore::new());
3467 let agent = Agent::from_config(test_config()).await.unwrap();
3468
3469 let opts = SessionOptions::new()
3470 .with_session_store(store.clone())
3471 .with_session_id("persist-test");
3472 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
3473
3474 session.save().await.unwrap();
3476
3477 assert!(store.exists("persist-test").await.unwrap());
3479
3480 let data = store.load("persist-test").await.unwrap().unwrap();
3481 assert_eq!(data.id, "persist-test");
3482 assert!(data.messages.is_empty());
3483 }
3484
3485 #[tokio::test(flavor = "multi_thread")]
3486 async fn test_session_save_with_history() {
3487 let store = Arc::new(crate::store::MemorySessionStore::new());
3488 let agent = Agent::from_config(test_config()).await.unwrap();
3489
3490 let opts = SessionOptions::new()
3491 .with_session_store(store.clone())
3492 .with_session_id("history-test");
3493 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
3494
3495 {
3497 let mut h = session.history.write().unwrap();
3498 h.push(Message::user("Hello"));
3499 h.push(Message::user("How are you?"));
3500 }
3501
3502 session.save().await.unwrap();
3503
3504 let data = store.load("history-test").await.unwrap().unwrap();
3505 assert_eq!(data.messages.len(), 2);
3506 }
3507
3508 #[tokio::test(flavor = "multi_thread")]
3509 async fn test_resume_session() {
3510 let store = Arc::new(crate::store::MemorySessionStore::new());
3511 let agent = Agent::from_config(test_config()).await.unwrap();
3512
3513 let opts = SessionOptions::new()
3515 .with_session_store(store.clone())
3516 .with_session_id("resume-test");
3517 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
3518 {
3519 let mut h = session.history.write().unwrap();
3520 h.push(Message::user("What is Rust?"));
3521 h.push(Message::user("Tell me more"));
3522 }
3523 session.save().await.unwrap();
3524
3525 let opts2 = SessionOptions::new().with_session_store(store.clone());
3527 let resumed = agent.resume_session("resume-test", opts2).unwrap();
3528
3529 assert_eq!(resumed.session_id(), "resume-test");
3530 let history = resumed.history();
3531 assert_eq!(history.len(), 2);
3532 assert_eq!(history[0].text(), "What is Rust?");
3533 }
3534
3535 #[tokio::test(flavor = "multi_thread")]
3536 async fn test_resume_session_not_found() {
3537 let store = Arc::new(crate::store::MemorySessionStore::new());
3538 let agent = Agent::from_config(test_config()).await.unwrap();
3539
3540 let opts = SessionOptions::new().with_session_store(store.clone());
3541 let result = agent.resume_session("nonexistent", opts);
3542 assert!(result.is_err());
3543 assert!(result.unwrap_err().to_string().contains("not found"));
3544 }
3545
3546 #[tokio::test(flavor = "multi_thread")]
3547 async fn test_resume_session_no_store() {
3548 let agent = Agent::from_config(test_config()).await.unwrap();
3549 let opts = SessionOptions::new();
3550 let result = agent.resume_session("any-id", opts);
3551 assert!(result.is_err());
3552 assert!(result.unwrap_err().to_string().contains("session_store"));
3553 }
3554
3555 #[tokio::test(flavor = "multi_thread")]
3556 async fn test_file_session_store_persistence() {
3557 let dir = tempfile::TempDir::new().unwrap();
3558 let store = Arc::new(
3559 crate::store::FileSessionStore::new(dir.path())
3560 .await
3561 .unwrap(),
3562 );
3563 let agent = Agent::from_config(test_config()).await.unwrap();
3564
3565 let opts = SessionOptions::new()
3567 .with_session_store(store.clone())
3568 .with_session_id("file-persist");
3569 let session = agent
3570 .session("/tmp/test-ws-file-persist", Some(opts))
3571 .unwrap();
3572 {
3573 let mut h = session.history.write().unwrap();
3574 h.push(Message::user("test message"));
3575 }
3576 session.save().await.unwrap();
3577
3578 let store2 = Arc::new(
3580 crate::store::FileSessionStore::new(dir.path())
3581 .await
3582 .unwrap(),
3583 );
3584 let data = store2.load("file-persist").await.unwrap().unwrap();
3585 assert_eq!(data.messages.len(), 1);
3586 }
3587
3588 #[tokio::test(flavor = "multi_thread")]
3589 async fn test_session_options_builders() {
3590 let opts = SessionOptions::new()
3591 .with_session_id("test-id")
3592 .with_auto_save(true);
3593 assert_eq!(opts.session_id, Some("test-id".to_string()));
3594 assert!(opts.auto_save);
3595 }
3596
3597 #[test]
3602 fn test_session_options_with_sandbox_sets_config() {
3603 use crate::sandbox::SandboxConfig;
3604 let cfg = SandboxConfig {
3605 image: "ubuntu:22.04".into(),
3606 memory_mb: 1024,
3607 ..SandboxConfig::default()
3608 };
3609 let opts = SessionOptions::new().with_sandbox(cfg);
3610 assert!(opts.sandbox_config.is_some());
3611 let sc = opts.sandbox_config.unwrap();
3612 assert_eq!(sc.image, "ubuntu:22.04");
3613 assert_eq!(sc.memory_mb, 1024);
3614 }
3615
3616 #[test]
3617 fn test_session_options_default_has_no_sandbox() {
3618 let opts = SessionOptions::default();
3619 assert!(opts.sandbox_config.is_none());
3620 }
3621
3622 #[tokio::test]
3623 async fn test_session_debug_includes_sandbox_config() {
3624 use crate::sandbox::SandboxConfig;
3625 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
3626 let debug = format!("{:?}", opts);
3627 assert!(debug.contains("sandbox_config"));
3628 }
3629
3630 #[tokio::test]
3631 async fn test_session_build_with_sandbox_config_no_feature_warn() {
3632 let agent = Agent::from_config(test_config()).await.unwrap();
3635 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
3636 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
3638 assert!(session.is_ok());
3639 }
3640
3641 #[tokio::test(flavor = "multi_thread")]
3646 async fn test_session_with_memory_store() {
3647 use a3s_memory::InMemoryStore;
3648 let store = Arc::new(InMemoryStore::new());
3649 let agent = Agent::from_config(test_config()).await.unwrap();
3650 let opts = SessionOptions::new().with_memory(store);
3651 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
3652 assert!(session.memory().is_some());
3653 }
3654
3655 #[tokio::test(flavor = "multi_thread")]
3656 async fn test_session_without_memory_store() {
3657 let agent = Agent::from_config(test_config()).await.unwrap();
3658 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
3659 assert!(session.memory().is_none());
3660 }
3661
3662 #[tokio::test(flavor = "multi_thread")]
3663 async fn test_session_memory_wired_into_config() {
3664 use a3s_memory::InMemoryStore;
3665 let store = Arc::new(InMemoryStore::new());
3666 let agent = Agent::from_config(test_config()).await.unwrap();
3667 let opts = SessionOptions::new().with_memory(store);
3668 let session = agent
3669 .session("/tmp/test-ws-mem-config", Some(opts))
3670 .unwrap();
3671 assert!(session.memory().is_some());
3673 }
3674
3675 #[tokio::test(flavor = "multi_thread")]
3676 async fn test_session_with_file_memory() {
3677 let dir = tempfile::TempDir::new().unwrap();
3678 let agent = Agent::from_config(test_config()).await.unwrap();
3679 let opts = SessionOptions::new().with_file_memory(dir.path());
3680 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
3681 assert!(session.memory().is_some());
3682 }
3683
3684 #[tokio::test(flavor = "multi_thread")]
3685 async fn test_memory_remember_and_recall() {
3686 use a3s_memory::InMemoryStore;
3687 let store = Arc::new(InMemoryStore::new());
3688 let agent = Agent::from_config(test_config()).await.unwrap();
3689 let opts = SessionOptions::new().with_memory(store);
3690 let session = agent
3691 .session("/tmp/test-ws-mem-recall", Some(opts))
3692 .unwrap();
3693
3694 let memory = session.memory().unwrap();
3695 memory
3696 .remember_success("write a file", &["write".to_string()], "done")
3697 .await
3698 .unwrap();
3699
3700 let results = memory.recall_similar("write", 5).await.unwrap();
3701 assert!(!results.is_empty());
3702 let stats = memory.stats().await.unwrap();
3703 assert_eq!(stats.long_term_count, 1);
3704 }
3705
3706 #[tokio::test(flavor = "multi_thread")]
3711 async fn test_session_tool_timeout_configured() {
3712 let agent = Agent::from_config(test_config()).await.unwrap();
3713 let opts = SessionOptions::new().with_tool_timeout(5000);
3714 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
3715 assert!(!session.id().is_empty());
3716 }
3717
3718 #[tokio::test(flavor = "multi_thread")]
3723 async fn test_session_without_queue_builds_ok() {
3724 let agent = Agent::from_config(test_config()).await.unwrap();
3725 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
3726 assert!(!session.id().is_empty());
3727 }
3728
3729 #[tokio::test(flavor = "multi_thread")]
3734 async fn test_concurrent_history_reads() {
3735 let agent = Agent::from_config(test_config()).await.unwrap();
3736 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
3737
3738 let handles: Vec<_> = (0..10)
3739 .map(|_| {
3740 let s = Arc::clone(&session);
3741 tokio::spawn(async move { s.history().len() })
3742 })
3743 .collect();
3744
3745 for h in handles {
3746 h.await.unwrap();
3747 }
3748 }
3749
3750 #[tokio::test(flavor = "multi_thread")]
3755 async fn test_session_no_init_warning_without_file_memory() {
3756 let agent = Agent::from_config(test_config()).await.unwrap();
3757 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
3758 assert!(session.init_warning().is_none());
3759 }
3760
3761 #[tokio::test(flavor = "multi_thread")]
3762 async fn test_register_agent_dir_loads_agents_into_live_session() {
3763 let temp_dir = tempfile::tempdir().unwrap();
3764
3765 std::fs::write(
3767 temp_dir.path().join("my-agent.yaml"),
3768 "name: my-dynamic-agent\ndescription: Dynamically registered agent\n",
3769 )
3770 .unwrap();
3771
3772 let agent = Agent::from_config(test_config()).await.unwrap();
3773 let session = agent.session(".", None).unwrap();
3774
3775 assert!(!session.agent_registry.exists("my-dynamic-agent"));
3777
3778 let count = session.register_agent_dir(temp_dir.path());
3779 assert_eq!(count, 1);
3780 assert!(session.agent_registry.exists("my-dynamic-agent"));
3781 }
3782
3783 #[tokio::test(flavor = "multi_thread")]
3784 async fn test_register_agent_dir_empty_dir_returns_zero() {
3785 let temp_dir = tempfile::tempdir().unwrap();
3786 let agent = Agent::from_config(test_config()).await.unwrap();
3787 let session = agent.session(".", None).unwrap();
3788 let count = session.register_agent_dir(temp_dir.path());
3789 assert_eq!(count, 0);
3790 }
3791
3792 #[tokio::test(flavor = "multi_thread")]
3793 async fn test_register_agent_dir_nonexistent_returns_zero() {
3794 let agent = Agent::from_config(test_config()).await.unwrap();
3795 let session = agent.session(".", None).unwrap();
3796 let count = session.register_agent_dir(std::path::Path::new("/nonexistent/path/abc"));
3797 assert_eq!(count, 0);
3798 }
3799
3800 #[tokio::test(flavor = "multi_thread")]
3801 async fn test_session_with_mcp_manager_builds_ok() {
3802 use crate::mcp::manager::McpManager;
3803 let mcp = Arc::new(McpManager::new());
3804 let agent = Agent::from_config(test_config()).await.unwrap();
3805 let opts = SessionOptions::new().with_mcp(mcp);
3806 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
3808 assert!(!session.id().is_empty());
3809 }
3810
3811 #[test]
3812 fn test_session_command_is_pub() {
3813 use crate::SessionCommand;
3815 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
3816 }
3817
3818 #[tokio::test(flavor = "multi_thread")]
3819 async fn test_session_submit_with_queue_executes() {
3820 let agent = Agent::from_config(test_config()).await.unwrap();
3821 let qc = SessionQueueConfig::default();
3822 let opts = SessionOptions::new().with_queue_config(qc);
3823 let session = agent
3824 .session("/tmp/test-ws-submit-exec", Some(opts))
3825 .unwrap();
3826
3827 struct Echo(serde_json::Value);
3828 #[async_trait::async_trait]
3829 impl crate::queue::SessionCommand for Echo {
3830 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
3831 Ok(self.0.clone())
3832 }
3833 fn command_type(&self) -> &str {
3834 "echo"
3835 }
3836 }
3837
3838 let rx = session
3839 .submit(
3840 SessionLane::Query,
3841 Box::new(Echo(serde_json::json!({"ok": true}))),
3842 )
3843 .await
3844 .expect("submit should succeed with queue configured");
3845
3846 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
3847 .await
3848 .expect("timed out waiting for command result")
3849 .expect("channel closed before result")
3850 .expect("command returned an error");
3851
3852 assert_eq!(result["ok"], true);
3853 }
3854}