1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{
21 CommandAction, CommandContext, CommandRegistry, CronCancelCommand, CronListCommand, LoopCommand,
22};
23use crate::config::CodeConfig;
24use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
25use crate::hitl::PendingConfirmationInfo;
26use crate::llm::{LlmClient, Message};
27use crate::prompts::{PlanningMode, SystemPromptSlots};
28use crate::queue::{
29 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
30 SessionQueueStats,
31};
32use crate::scheduler::{CronScheduler, ScheduledFire};
33use crate::session_lane_queue::SessionLaneQueue;
34use crate::task::{ProgressTracker, TaskManager};
35use crate::text::truncate_utf8;
36use crate::tools::{ToolContext, ToolExecutor};
37use a3s_lane::{DeadLetter, MetricsSnapshot};
38use a3s_memory::{FileMemoryStore, MemoryStore};
39use anyhow::Context;
40use std::collections::HashMap;
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicBool, Ordering};
43use std::sync::{Arc, RwLock};
44use tokio::sync::{broadcast, mpsc};
45use tokio::task::JoinHandle;
46
47fn safe_canonicalize(path: &Path) -> PathBuf {
50 match std::fs::canonicalize(path) {
51 Ok(p) => strip_unc_prefix(p),
52 Err(_) => path.to_path_buf(),
53 }
54}
55
56fn strip_unc_prefix(path: PathBuf) -> PathBuf {
59 #[cfg(windows)]
60 {
61 let s = path.to_string_lossy();
62 if let Some(stripped) = s.strip_prefix(r"\\?\") {
63 return PathBuf::from(stripped);
64 }
65 }
66 path
67}
68
69#[derive(Debug, Clone)]
75pub struct ToolCallResult {
76 pub name: String,
77 pub output: String,
78 pub exit_code: i32,
79 pub metadata: Option<serde_json::Value>,
80}
81
82#[derive(Clone, Default)]
88pub struct SessionOptions {
89 pub model: Option<String>,
91 pub agent_dirs: Vec<PathBuf>,
94 pub queue_config: Option<SessionQueueConfig>,
99 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
101 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
103 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
105 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
107 pub planning_mode: PlanningMode,
109 pub goal_tracking: bool,
111 pub skill_dirs: Vec<PathBuf>,
114 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
116 pub memory_store: Option<Arc<dyn MemoryStore>>,
118 pub(crate) file_memory_dir: Option<PathBuf>,
120 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
122 pub session_id: Option<String>,
124 pub auto_save: bool,
126 pub max_parse_retries: Option<u32>,
129 pub tool_timeout_ms: Option<u64>,
132 pub circuit_breaker_threshold: Option<u32>,
136 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
144 pub sandbox_handle: Option<Arc<dyn crate::sandbox::BashSandbox>>,
150 pub auto_compact: bool,
152 pub auto_compact_threshold: Option<f32>,
155 pub continuation_enabled: Option<bool>,
158 pub max_continuation_turns: Option<u32>,
161 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
166 pub temperature: Option<f32>,
168 pub thinking_budget: Option<usize>,
170 pub max_tool_rounds: Option<usize>,
176 pub prompt_slots: Option<SystemPromptSlots>,
182 pub hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
189 pub plugins: Vec<std::sync::Arc<dyn crate::plugin::Plugin>>,
198}
199
200impl std::fmt::Debug for SessionOptions {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 f.debug_struct("SessionOptions")
203 .field("model", &self.model)
204 .field("agent_dirs", &self.agent_dirs)
205 .field("skill_dirs", &self.skill_dirs)
206 .field("queue_config", &self.queue_config)
207 .field("security_provider", &self.security_provider.is_some())
208 .field("context_providers", &self.context_providers.len())
209 .field("confirmation_manager", &self.confirmation_manager.is_some())
210 .field("permission_checker", &self.permission_checker.is_some())
211 .field("planning_mode", &self.planning_mode)
212 .field("goal_tracking", &self.goal_tracking)
213 .field(
214 "skill_registry",
215 &self
216 .skill_registry
217 .as_ref()
218 .map(|r| format!("{} skills", r.len())),
219 )
220 .field("memory_store", &self.memory_store.is_some())
221 .field("session_store", &self.session_store.is_some())
222 .field("session_id", &self.session_id)
223 .field("auto_save", &self.auto_save)
224 .field("max_parse_retries", &self.max_parse_retries)
225 .field("tool_timeout_ms", &self.tool_timeout_ms)
226 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
227 .field("sandbox_config", &self.sandbox_config)
228 .field("auto_compact", &self.auto_compact)
229 .field("auto_compact_threshold", &self.auto_compact_threshold)
230 .field("continuation_enabled", &self.continuation_enabled)
231 .field("max_continuation_turns", &self.max_continuation_turns)
232 .field(
233 "plugins",
234 &self.plugins.iter().map(|p| p.name()).collect::<Vec<_>>(),
235 )
236 .field("mcp_manager", &self.mcp_manager.is_some())
237 .field("temperature", &self.temperature)
238 .field("thinking_budget", &self.thinking_budget)
239 .field("max_tool_rounds", &self.max_tool_rounds)
240 .field("prompt_slots", &self.prompt_slots.is_some())
241 .finish()
242 }
243}
244
245impl SessionOptions {
246 pub fn new() -> Self {
247 Self::default()
248 }
249
250 pub fn with_plugin(mut self, plugin: impl crate::plugin::Plugin + 'static) -> Self {
255 self.plugins.push(std::sync::Arc::new(plugin));
256 self
257 }
258
259 pub fn with_model(mut self, model: impl Into<String>) -> Self {
260 self.model = Some(model.into());
261 self
262 }
263
264 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
265 self.agent_dirs.push(dir.into());
266 self
267 }
268
269 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
270 self.queue_config = Some(config);
271 self
272 }
273
274 pub fn with_default_security(mut self) -> Self {
276 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
277 self
278 }
279
280 pub fn with_security_provider(
282 mut self,
283 provider: Arc<dyn crate::security::SecurityProvider>,
284 ) -> Self {
285 self.security_provider = Some(provider);
286 self
287 }
288
289 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
291 let config = crate::context::FileSystemContextConfig::new(root_path);
292 self.context_providers
293 .push(Arc::new(crate::context::FileSystemContextProvider::new(
294 config,
295 )));
296 self
297 }
298
299 pub fn with_context_provider(
301 mut self,
302 provider: Arc<dyn crate::context::ContextProvider>,
303 ) -> Self {
304 self.context_providers.push(provider);
305 self
306 }
307
308 pub fn with_confirmation_manager(
310 mut self,
311 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
312 ) -> Self {
313 self.confirmation_manager = Some(manager);
314 self
315 }
316
317 pub fn with_permission_checker(
319 mut self,
320 checker: Arc<dyn crate::permissions::PermissionChecker>,
321 ) -> Self {
322 self.permission_checker = Some(checker);
323 self
324 }
325
326 pub fn with_permissive_policy(self) -> Self {
333 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
334 }
335
336 pub fn with_planning_mode(mut self, mode: PlanningMode) -> Self {
338 self.planning_mode = mode;
339 self
340 }
341
342 pub fn with_planning(mut self, enabled: bool) -> Self {
344 self.planning_mode = if enabled {
345 PlanningMode::Enabled
346 } else {
347 PlanningMode::Disabled
348 };
349 self
350 }
351
352 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
354 self.goal_tracking = enabled;
355 self
356 }
357
358 pub fn with_builtin_skills(mut self) -> Self {
360 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
361 self
362 }
363
364 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
366 self.skill_registry = Some(registry);
367 self
368 }
369
370 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
373 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
374 self
375 }
376
377 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
379 let registry = self
380 .skill_registry
381 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
382 if let Err(e) = registry.load_from_dir(&dir) {
383 tracing::warn!(
384 dir = %dir.as_ref().display(),
385 error = %e,
386 "Failed to load skills from directory — continuing without them"
387 );
388 }
389 self.skill_registry = Some(registry);
390 self
391 }
392
393 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
395 self.memory_store = Some(store);
396 self
397 }
398
399 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
405 self.file_memory_dir = Some(dir.into());
406 self
407 }
408
409 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
411 self.session_store = Some(store);
412 self
413 }
414
415 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
417 let dir = dir.into();
418 match tokio::runtime::Handle::try_current() {
419 Ok(handle) => {
420 match tokio::task::block_in_place(|| {
421 handle.block_on(crate::store::FileSessionStore::new(dir))
422 }) {
423 Ok(store) => {
424 self.session_store =
425 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
426 }
427 Err(e) => {
428 tracing::warn!("Failed to create file session store: {}", e);
429 }
430 }
431 }
432 Err(_) => {
433 tracing::warn!(
434 "No async runtime available for file session store — persistence disabled"
435 );
436 }
437 }
438 self
439 }
440
441 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
443 self.session_id = Some(id.into());
444 self
445 }
446
447 pub fn with_auto_save(mut self, enabled: bool) -> Self {
449 self.auto_save = enabled;
450 self
451 }
452
453 pub fn with_parse_retries(mut self, max: u32) -> Self {
459 self.max_parse_retries = Some(max);
460 self
461 }
462
463 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
469 self.tool_timeout_ms = Some(timeout_ms);
470 self
471 }
472
473 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
479 self.circuit_breaker_threshold = Some(threshold);
480 self
481 }
482
483 pub fn with_resilience_defaults(self) -> Self {
489 self.with_parse_retries(2)
490 .with_tool_timeout(120_000)
491 .with_circuit_breaker(3)
492 }
493
494 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
513 self.sandbox_config = Some(config);
514 self
515 }
516
517 pub fn with_sandbox_handle(mut self, handle: Arc<dyn crate::sandbox::BashSandbox>) -> Self {
525 self.sandbox_handle = Some(handle);
526 self
527 }
528
529 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
534 self.auto_compact = enabled;
535 self
536 }
537
538 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
540 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
541 self
542 }
543
544 pub fn with_continuation(mut self, enabled: bool) -> Self {
549 self.continuation_enabled = Some(enabled);
550 self
551 }
552
553 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
555 self.max_continuation_turns = Some(turns);
556 self
557 }
558
559 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
564 self.mcp_manager = Some(manager);
565 self
566 }
567
568 pub fn with_temperature(mut self, temperature: f32) -> Self {
569 self.temperature = Some(temperature);
570 self
571 }
572
573 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
574 self.thinking_budget = Some(budget);
575 self
576 }
577
578 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
583 self.max_tool_rounds = Some(rounds);
584 self
585 }
586
587 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
592 self.prompt_slots = Some(slots);
593 self
594 }
595
596 pub fn with_hook_executor(mut self, executor: Arc<dyn crate::hooks::HookExecutor>) -> Self {
602 self.hook_executor = Some(executor);
603 self
604 }
605}
606
607pub struct Agent {
616 llm_client: Arc<dyn LlmClient>,
617 code_config: CodeConfig,
618 config: AgentConfig,
619 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
621 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
624}
625
626impl std::fmt::Debug for Agent {
627 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628 f.debug_struct("Agent").finish()
629 }
630}
631
632impl Agent {
633 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
637 let source = config_source.into();
638
639 let expanded = if let Some(rest) = source.strip_prefix("~/") {
641 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
642 if let Some(home) = home {
643 PathBuf::from(home).join(rest).display().to_string()
644 } else {
645 source.clone()
646 }
647 } else {
648 source.clone()
649 };
650
651 let path = Path::new(&expanded);
652
653 let config = if matches!(
654 path.extension().and_then(|ext| ext.to_str()),
655 Some("hcl" | "json")
656 ) {
657 if !path.exists() {
658 return Err(CodeError::Config(format!(
659 "Config file not found: {}",
660 path.display()
661 )));
662 }
663
664 CodeConfig::from_file(path)
665 .with_context(|| format!("Failed to load config: {}", path.display()))?
666 } else {
667 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
669 };
670
671 Self::from_config(config).await
672 }
673
674 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
679 Self::new(config_source).await
680 }
681
682 pub async fn from_config(config: CodeConfig) -> Result<Self> {
684 let llm_config = config
685 .default_llm_config()
686 .context("default_model must be set in 'provider/model' format with a valid API key")?;
687 let llm_client = crate::llm::create_client_with_config(llm_config);
688
689 let agent_config = AgentConfig {
690 max_tool_rounds: config
691 .max_tool_rounds
692 .unwrap_or(AgentConfig::default().max_tool_rounds),
693 ..AgentConfig::default()
694 };
695
696 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
698 (None, vec![])
699 } else {
700 let manager = Arc::new(crate::mcp::manager::McpManager::new());
701 for server in &config.mcp_servers {
702 if !server.enabled {
703 continue;
704 }
705 manager.register_server(server.clone()).await;
706 if let Err(e) = manager.connect(&server.name).await {
707 tracing::warn!(
708 server = %server.name,
709 error = %e,
710 "Failed to connect to MCP server — skipping"
711 );
712 }
713 }
714 let tools = manager.get_all_tools().await;
716 (Some(manager), tools)
717 };
718
719 let mut agent = Agent {
720 llm_client,
721 code_config: config,
722 config: agent_config,
723 global_mcp,
724 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
725 };
726
727 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
729 for dir in &agent.code_config.skill_dirs.clone() {
730 if let Err(e) = registry.load_from_dir(dir) {
731 tracing::warn!(
732 dir = %dir.display(),
733 error = %e,
734 "Failed to load skills from directory — skipping"
735 );
736 }
737 }
738 agent.config.skill_registry = Some(registry);
739
740 Ok(agent)
741 }
742
743 pub async fn refresh_mcp_tools(&self) -> Result<()> {
751 if let Some(ref mcp) = self.global_mcp {
752 let fresh = mcp.get_all_tools().await;
753 *self
754 .global_mcp_tools
755 .lock()
756 .expect("global_mcp_tools lock poisoned") = fresh;
757 }
758 Ok(())
759 }
760
761 pub fn session(
766 &self,
767 workspace: impl Into<String>,
768 options: Option<SessionOptions>,
769 ) -> Result<AgentSession> {
770 let opts = options.unwrap_or_default();
771
772 let mut merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
775 (Some(global), Some(session)) => {
776 let global = Arc::clone(global);
777 let session_mgr = Arc::clone(session);
778 match tokio::runtime::Handle::try_current() {
779 Ok(handle) => {
780 let global_for_merge = Arc::clone(&global);
781 tokio::task::block_in_place(|| {
782 handle.block_on(async move {
783 for config in session_mgr.all_configs().await {
784 let name = config.name.clone();
785 global_for_merge.register_server(config).await;
786 if let Err(e) = global_for_merge.connect(&name).await {
787 tracing::warn!(
788 server = %name,
789 error = %e,
790 "Failed to connect session-level MCP server — skipping"
791 );
792 }
793 }
794 })
795 });
796 }
797 Err(_) => {
798 tracing::warn!(
799 "No async runtime available to merge session-level MCP servers \
800 into global manager — session MCP servers will not be available"
801 );
802 }
803 }
804 SessionOptions {
805 mcp_manager: Some(Arc::clone(&global)),
806 ..opts
807 }
808 }
809 (Some(global), None) => SessionOptions {
810 mcp_manager: Some(Arc::clone(global)),
811 ..opts
812 },
813 _ => opts,
814 };
815
816 let session_id = merged_opts
817 .session_id
818 .clone()
819 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
820 merged_opts.session_id = Some(session_id.clone());
821 let llm_client = self.resolve_session_llm_client(&merged_opts, Some(&session_id))?;
822
823 self.build_session(workspace.into(), llm_client, &merged_opts)
824 }
825
826 pub fn session_for_agent(
841 &self,
842 workspace: impl Into<String>,
843 def: &crate::subagent::AgentDefinition,
844 extra: Option<SessionOptions>,
845 ) -> Result<AgentSession> {
846 let mut opts = extra.unwrap_or_default();
847
848 if opts.permission_checker.is_none()
850 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
851 {
852 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
853 }
854
855 if opts.max_tool_rounds.is_none() {
857 if let Some(steps) = def.max_steps {
858 opts.max_tool_rounds = Some(steps);
859 }
860 }
861
862 if opts.model.is_none() {
864 if let Some(ref m) = def.model {
865 let provider = m.provider.as_deref().unwrap_or("anthropic");
866 opts.model = Some(format!("{}/{}", provider, m.model));
867 }
868 }
869
870 if let Some(ref prompt) = def.prompt {
877 let slots = opts
878 .prompt_slots
879 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
880 if slots.extra.is_none() {
881 slots.extra = Some(prompt.clone());
882 }
883 }
884
885 self.session(workspace, Some(opts))
886 }
887
888 pub fn resume_session(
896 &self,
897 session_id: &str,
898 options: SessionOptions,
899 ) -> Result<AgentSession> {
900 let store = options.session_store.as_ref().ok_or_else(|| {
901 crate::error::CodeError::Session(
902 "resume_session requires a session_store in SessionOptions".to_string(),
903 )
904 })?;
905
906 let data = match tokio::runtime::Handle::try_current() {
908 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
909 .map_err(|e| {
910 crate::error::CodeError::Session(format!(
911 "Failed to load session {}: {}",
912 session_id, e
913 ))
914 })?,
915 Err(_) => {
916 return Err(crate::error::CodeError::Session(
917 "No async runtime available for session resume".to_string(),
918 ))
919 }
920 };
921
922 let data = data.ok_or_else(|| {
923 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
924 })?;
925
926 let mut opts = options;
928 opts.session_id = Some(data.id.clone());
929 let llm_client = self.resolve_session_llm_client(&opts, Some(&data.id))?;
930
931 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
932
933 *write_or_recover(&session.history) = data.messages;
935
936 Ok(session)
937 }
938
939 fn resolve_session_llm_client(
940 &self,
941 opts: &SessionOptions,
942 session_id: Option<&str>,
943 ) -> Result<Arc<dyn LlmClient>> {
944 let model_ref = if let Some(ref model) = opts.model {
945 model.as_str()
946 } else {
947 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
948 tracing::warn!(
949 "temperature/thinking_budget set without model override — these will be ignored. \
950 Use with_model() to apply LLM parameter overrides."
951 );
952 }
953 self.code_config
954 .default_model
955 .as_deref()
956 .context("default_model must be set in 'provider/model' format")?
957 };
958
959 let (provider_name, model_id) = model_ref
960 .split_once('/')
961 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
962
963 let mut llm_config = self
964 .code_config
965 .llm_config(provider_name, model_id)
966 .with_context(|| {
967 format!("provider '{provider_name}' or model '{model_id}' not found in config")
968 })?;
969
970 if opts.model.is_some() {
971 if let Some(temp) = opts.temperature {
972 llm_config = llm_config.with_temperature(temp);
973 }
974 if let Some(budget) = opts.thinking_budget {
975 llm_config = llm_config.with_thinking_budget(budget);
976 }
977 }
978
979 if let Some(session_id) = session_id {
980 llm_config = llm_config.with_session_id(session_id);
981 }
982
983 Ok(crate::llm::create_client_with_config(llm_config))
984 }
985
986 fn build_session(
987 &self,
988 workspace: String,
989 llm_client: Arc<dyn LlmClient>,
990 opts: &SessionOptions,
991 ) -> Result<AgentSession> {
992 let canonical = safe_canonicalize(Path::new(&workspace));
993
994 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
995
996 if let Some(ref search_config) = self.code_config.search {
998 tool_executor
999 .registry()
1000 .set_search_config(search_config.clone());
1001 }
1002
1003 let agent_registry = {
1007 use crate::subagent::{load_agents_from_dir, AgentRegistry};
1008 use crate::tools::register_task_with_mcp;
1009 let registry = AgentRegistry::new();
1010 for dir in self
1011 .code_config
1012 .agent_dirs
1013 .iter()
1014 .chain(opts.agent_dirs.iter())
1015 {
1016 for agent in load_agents_from_dir(dir) {
1017 registry.register(agent);
1018 }
1019 }
1020 let registry = Arc::new(registry);
1021 register_task_with_mcp(
1022 tool_executor.registry(),
1023 Arc::clone(&llm_client),
1024 Arc::clone(®istry),
1025 canonical.display().to_string(),
1026 opts.mcp_manager.clone(),
1027 );
1028 registry
1029 };
1030
1031 if let Some(ref mcp) = opts.mcp_manager {
1034 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
1037 Arc::as_ptr(mcp),
1038 self.global_mcp
1039 .as_ref()
1040 .map(Arc::as_ptr)
1041 .unwrap_or(std::ptr::null()),
1042 ) {
1043 self.global_mcp_tools
1045 .lock()
1046 .expect("global_mcp_tools lock poisoned")
1047 .clone()
1048 } else {
1049 match tokio::runtime::Handle::try_current() {
1051 Ok(handle) => {
1052 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
1053 }
1054 Err(_) => {
1055 tracing::warn!(
1056 "No async runtime available for session-level MCP tools — \
1057 MCP tools will not be registered"
1058 );
1059 vec![]
1060 }
1061 }
1062 };
1063
1064 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
1065 std::collections::HashMap::new();
1066 for (server, tool) in all_tools {
1067 by_server.entry(server).or_default().push(tool);
1068 }
1069 for (server_name, tools) in by_server {
1070 for tool in
1071 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
1072 {
1073 tool_executor.register_dynamic_tool(tool);
1074 }
1075 }
1076 }
1077
1078 let tool_defs = tool_executor.definitions();
1079
1080 let mut prompt_slots = opts
1082 .prompt_slots
1083 .clone()
1084 .unwrap_or_else(|| self.config.prompt_slots.clone());
1085
1086 let agents_md_path = canonical.join("AGENTS.md");
1088 if agents_md_path.exists() && agents_md_path.is_file() {
1089 match std::fs::read_to_string(&agents_md_path) {
1090 Ok(content) if !content.trim().is_empty() => {
1091 tracing::info!(
1092 path = %agents_md_path.display(),
1093 "Auto-loaded AGENTS.md from workspace root"
1094 );
1095 prompt_slots.extra = match prompt_slots.extra {
1096 Some(existing) => Some(format!(
1097 "{}\n\n# Project Instructions (AGENTS.md)\n\n{}",
1098 existing, content
1099 )),
1100 None => Some(format!("# Project Instructions (AGENTS.md)\n\n{}", content)),
1101 };
1102 }
1103 Ok(_) => {
1104 tracing::debug!(
1105 path = %agents_md_path.display(),
1106 "AGENTS.md exists but is empty — skipping"
1107 );
1108 }
1109 Err(e) => {
1110 tracing::warn!(
1111 path = %agents_md_path.display(),
1112 error = %e,
1113 "Failed to read AGENTS.md — skipping"
1114 );
1115 }
1116 }
1117 }
1118
1119 let base_registry = self
1123 .config
1124 .skill_registry
1125 .as_deref()
1126 .map(|r| r.fork())
1127 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
1128 if let Some(ref r) = opts.skill_registry {
1130 for skill in r.all() {
1131 base_registry.register_unchecked(skill);
1132 }
1133 }
1134 for dir in &opts.skill_dirs {
1136 if let Err(e) = base_registry.load_from_dir(dir) {
1137 tracing::warn!(
1138 dir = %dir.display(),
1139 error = %e,
1140 "Failed to load session skill dir — skipping"
1141 );
1142 }
1143 }
1144 let effective_registry = Arc::new(base_registry);
1145
1146 if !opts.plugins.is_empty() {
1149 use crate::plugin::PluginContext;
1150 let plugin_ctx = PluginContext::new()
1151 .with_llm(Arc::clone(&self.llm_client))
1152 .with_skill_registry(Arc::clone(&effective_registry));
1153 let plugin_registry = tool_executor.registry();
1154 for plugin in &opts.plugins {
1155 tracing::info!("Loading plugin '{}' v{}", plugin.name(), plugin.version());
1156 match plugin.load(plugin_registry, &plugin_ctx) {
1157 Ok(()) => {
1158 for skill in plugin.skills() {
1159 tracing::debug!(
1160 "Plugin '{}' registered skill '{}'",
1161 plugin.name(),
1162 skill.name
1163 );
1164 effective_registry.register_unchecked(skill);
1165 }
1166 }
1167 Err(e) => {
1168 tracing::error!("Plugin '{}' failed to load: {}", plugin.name(), e);
1169 }
1170 }
1171 }
1172 }
1173
1174 let skill_prompt = effective_registry.to_system_prompt();
1176 if !skill_prompt.is_empty() {
1177 prompt_slots.extra = match prompt_slots.extra {
1178 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
1179 None => Some(skill_prompt),
1180 };
1181 }
1182
1183 let mut init_warning: Option<String> = None;
1185 let memory = {
1186 let store = if let Some(ref store) = opts.memory_store {
1187 Some(Arc::clone(store))
1188 } else if let Some(ref dir) = opts.file_memory_dir {
1189 match tokio::runtime::Handle::try_current() {
1190 Ok(handle) => {
1191 let dir = dir.clone();
1192 match tokio::task::block_in_place(|| {
1193 handle.block_on(FileMemoryStore::new(dir))
1194 }) {
1195 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1196 Err(e) => {
1197 let msg = format!("Failed to create file memory store: {}", e);
1198 tracing::warn!("{}", msg);
1199 init_warning = Some(msg);
1200 None
1201 }
1202 }
1203 }
1204 Err(_) => {
1205 let msg =
1206 "No async runtime available for file memory store — memory disabled"
1207 .to_string();
1208 tracing::warn!("{}", msg);
1209 init_warning = Some(msg);
1210 None
1211 }
1212 }
1213 } else {
1214 None
1215 };
1216 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1217 };
1218
1219 let base = self.config.clone();
1220 let config = AgentConfig {
1221 prompt_slots,
1222 tools: tool_defs,
1223 security_provider: opts.security_provider.clone(),
1224 permission_checker: opts.permission_checker.clone(),
1225 confirmation_manager: opts.confirmation_manager.clone(),
1226 context_providers: opts.context_providers.clone(),
1227 planning_mode: opts.planning_mode,
1228 goal_tracking: opts.goal_tracking,
1229 skill_registry: Some(Arc::clone(&effective_registry)),
1230 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1231 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1232 circuit_breaker_threshold: opts
1233 .circuit_breaker_threshold
1234 .unwrap_or(base.circuit_breaker_threshold),
1235 auto_compact: opts.auto_compact,
1236 auto_compact_threshold: opts
1237 .auto_compact_threshold
1238 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
1239 max_context_tokens: base.max_context_tokens,
1240 llm_client: Some(Arc::clone(&llm_client)),
1241 memory: memory.clone(),
1242 continuation_enabled: opts
1243 .continuation_enabled
1244 .unwrap_or(base.continuation_enabled),
1245 max_continuation_turns: opts
1246 .max_continuation_turns
1247 .unwrap_or(base.max_continuation_turns),
1248 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1249 ..base
1250 };
1251
1252 {
1256 use crate::tools::register_skill;
1257 register_skill(
1258 tool_executor.registry(),
1259 Arc::clone(&llm_client),
1260 Arc::clone(&effective_registry),
1261 Arc::clone(&tool_executor),
1262 config.clone(),
1263 );
1264 }
1265
1266 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1269 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1270 let session_id = uuid::Uuid::new_v4().to_string();
1271 let rt = tokio::runtime::Handle::try_current();
1272
1273 match rt {
1274 Ok(handle) => {
1275 let queue = tokio::task::block_in_place(|| {
1277 handle.block_on(SessionLaneQueue::new(
1278 &session_id,
1279 queue_config.clone(),
1280 agent_event_tx.clone(),
1281 ))
1282 });
1283 match queue {
1284 Ok(q) => {
1285 let q = Arc::new(q);
1287 let q2 = Arc::clone(&q);
1288 tokio::task::block_in_place(|| {
1289 handle.block_on(async { q2.start().await.ok() })
1290 });
1291 Some(q)
1292 }
1293 Err(e) => {
1294 tracing::warn!("Failed to create session lane queue: {}", e);
1295 None
1296 }
1297 }
1298 }
1299 Err(_) => {
1300 tracing::warn!(
1301 "No async runtime available for queue creation — queue disabled"
1302 );
1303 None
1304 }
1305 }
1306 } else {
1307 None
1308 };
1309
1310 let mut tool_context = ToolContext::new(canonical.clone());
1312 if let Some(ref search_config) = self.code_config.search {
1313 tool_context = tool_context.with_search_config(search_config.clone());
1314 }
1315 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1316
1317 if let Some(handle) = opts.sandbox_handle.clone() {
1319 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1320 tool_context = tool_context.with_sandbox(handle);
1321 } else if opts.sandbox_config.is_some() {
1322 tracing::warn!(
1323 "sandbox_config is set but no sandbox_handle was provided \
1324 — bash commands will run locally"
1325 );
1326 }
1327
1328 let session_id = opts
1329 .session_id
1330 .clone()
1331 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1332
1333 let session_store = if opts.session_store.is_some() {
1335 opts.session_store.clone()
1336 } else if let Some(ref dir) = self.code_config.sessions_dir {
1337 match tokio::runtime::Handle::try_current() {
1338 Ok(handle) => {
1339 let dir = dir.clone();
1340 match tokio::task::block_in_place(|| {
1341 handle.block_on(crate::store::FileSessionStore::new(dir))
1342 }) {
1343 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1344 Err(e) => {
1345 tracing::warn!(
1346 "Failed to create session store from sessions_dir: {}",
1347 e
1348 );
1349 None
1350 }
1351 }
1352 }
1353 Err(_) => {
1354 tracing::warn!(
1355 "No async runtime for sessions_dir store — persistence disabled"
1356 );
1357 None
1358 }
1359 }
1360 } else {
1361 None
1362 };
1363
1364 let (cron_scheduler, cron_rx) = CronScheduler::new();
1368 let mut command_registry = CommandRegistry::new();
1369 command_registry.register(Arc::new(LoopCommand {
1370 scheduler: Arc::clone(&cron_scheduler),
1371 }));
1372 command_registry.register(Arc::new(CronListCommand {
1373 scheduler: Arc::clone(&cron_scheduler),
1374 }));
1375 command_registry.register(Arc::new(CronCancelCommand {
1376 scheduler: Arc::clone(&cron_scheduler),
1377 }));
1378
1379 Ok(AgentSession {
1380 llm_client,
1381 tool_executor,
1382 tool_context,
1383 memory: config.memory.clone(),
1384 config,
1385 workspace: canonical,
1386 session_id,
1387 history: RwLock::new(Vec::new()),
1388 command_queue,
1389 session_store,
1390 auto_save: opts.auto_save,
1391 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1392 ahp_executor: opts.hook_executor.clone(),
1393 init_warning,
1394 command_registry: std::sync::Mutex::new(command_registry),
1395 model_name: opts
1396 .model
1397 .clone()
1398 .or_else(|| self.code_config.default_model.clone())
1399 .unwrap_or_else(|| "unknown".to_string()),
1400 mcp_manager: opts
1401 .mcp_manager
1402 .clone()
1403 .or_else(|| self.global_mcp.clone())
1404 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1405 agent_registry,
1406 cron_scheduler,
1407 cron_rx: tokio::sync::Mutex::new(cron_rx),
1408 is_processing_cron: AtomicBool::new(false),
1409 cron_started: AtomicBool::new(false),
1410 cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
1411 active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
1412 task_manager: Arc::new(TaskManager::new()),
1413 progress_tracker: Arc::new(tokio::sync::RwLock::new(ProgressTracker::new(30))),
1414 })
1415 }
1416}
1417
1418#[derive(Debug, Clone)]
1427pub struct BtwResult {
1428 pub question: String,
1430 pub answer: String,
1432 pub usage: crate::llm::TokenUsage,
1434}
1435
1436pub struct AgentSession {
1445 llm_client: Arc<dyn LlmClient>,
1446 tool_executor: Arc<ToolExecutor>,
1447 tool_context: ToolContext,
1448 config: AgentConfig,
1449 workspace: PathBuf,
1450 session_id: String,
1452 history: RwLock<Vec<Message>>,
1454 command_queue: Option<Arc<SessionLaneQueue>>,
1456 memory: Option<Arc<crate::memory::AgentMemory>>,
1458 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1460 auto_save: bool,
1462 hook_engine: Arc<crate::hooks::HookEngine>,
1464 ahp_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
1467 init_warning: Option<String>,
1469 command_registry: std::sync::Mutex<CommandRegistry>,
1472 model_name: String,
1474 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1476 agent_registry: Arc<crate::subagent::AgentRegistry>,
1478 cron_scheduler: Arc<CronScheduler>,
1480 cron_rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<ScheduledFire>>,
1482 is_processing_cron: AtomicBool,
1484 cron_started: AtomicBool,
1488 cancel_token: Arc<tokio::sync::Mutex<Option<tokio_util::sync::CancellationToken>>>,
1491 active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1493 task_manager: Arc<TaskManager>,
1495 progress_tracker: Arc<tokio::sync::RwLock<ProgressTracker>>,
1497}
1498
1499#[derive(Debug, Clone)]
1500struct ActiveToolSnapshot {
1501 tool_name: String,
1502 started_at_ms: u64,
1503}
1504
1505impl std::fmt::Debug for AgentSession {
1506 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1507 f.debug_struct("AgentSession")
1508 .field("session_id", &self.session_id)
1509 .field("workspace", &self.workspace.display().to_string())
1510 .field("auto_save", &self.auto_save)
1511 .finish()
1512 }
1513}
1514
1515impl AgentSession {
1516 fn now_ms() -> u64 {
1517 std::time::SystemTime::now()
1518 .duration_since(std::time::UNIX_EPOCH)
1519 .map(|d| d.as_millis() as u64)
1520 .unwrap_or(0)
1521 }
1522
1523 fn compact_json_value(value: &serde_json::Value) -> String {
1524 let raw = match value {
1525 serde_json::Value::Null => String::new(),
1526 serde_json::Value::String(s) => s.clone(),
1527 _ => serde_json::to_string(value).unwrap_or_default(),
1528 };
1529 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1530 if compact.len() > 180 {
1531 format!("{}...", truncate_utf8(&compact, 180))
1532 } else {
1533 compact
1534 }
1535 }
1536
1537 async fn apply_runtime_event(
1538 active_tools: &Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1539 event: &AgentEvent,
1540 ) {
1541 match event {
1542 AgentEvent::ToolStart { id, name } => {
1543 active_tools.write().await.insert(
1544 id.clone(),
1545 ActiveToolSnapshot {
1546 tool_name: name.clone(),
1547 started_at_ms: Self::now_ms(),
1548 },
1549 );
1550 }
1551 AgentEvent::ToolEnd { id, .. }
1552 | AgentEvent::PermissionDenied { tool_id: id, .. }
1553 | AgentEvent::ConfirmationRequired { tool_id: id, .. }
1554 | AgentEvent::ConfirmationReceived { tool_id: id, .. }
1555 | AgentEvent::ConfirmationTimeout { tool_id: id, .. } => {
1556 active_tools.write().await.remove(id);
1557 }
1558 _ => {}
1559 }
1560 }
1561
1562 async fn clear_runtime_tracking(&self) {
1563 self.active_tools.write().await.clear();
1564 }
1565
1566 fn build_agent_loop(&self) -> AgentLoop {
1570 let mut config = self.config.clone();
1571 config.hook_engine = Some(if let Some(ref ahp) = self.ahp_executor {
1572 ahp.clone()
1573 } else {
1574 Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>
1575 });
1576 config.tools = self.tool_executor.definitions();
1580 let mut agent_loop = AgentLoop::new(
1581 self.llm_client.clone(),
1582 self.tool_executor.clone(),
1583 self.tool_context.clone(),
1584 config,
1585 );
1586 if let Some(ref queue) = self.command_queue {
1587 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1588 }
1589 agent_loop = agent_loop.with_progress_tracker(Arc::clone(&self.progress_tracker));
1590 agent_loop = agent_loop.with_task_manager(Arc::clone(&self.task_manager));
1591 agent_loop
1592 }
1593
1594 fn build_command_context(&self) -> CommandContext {
1596 let history = read_or_recover(&self.history);
1597
1598 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1600
1601 let mut mcp_map: std::collections::HashMap<String, usize> =
1603 std::collections::HashMap::new();
1604 for name in &tool_names {
1605 if let Some(rest) = name.strip_prefix("mcp__") {
1606 if let Some((server, _)) = rest.split_once("__") {
1607 *mcp_map.entry(server.to_string()).or_default() += 1;
1608 }
1609 }
1610 }
1611 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1612 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1613
1614 CommandContext {
1615 session_id: self.session_id.clone(),
1616 workspace: self.workspace.display().to_string(),
1617 model: self.model_name.clone(),
1618 history_len: history.len(),
1619 total_tokens: 0,
1620 total_cost: 0.0,
1621 tool_names,
1622 mcp_servers,
1623 }
1624 }
1625
1626 pub fn command_registry(&self) -> std::sync::MutexGuard<'_, CommandRegistry> {
1630 self.command_registry
1631 .lock()
1632 .expect("command_registry lock poisoned")
1633 }
1634
1635 pub fn register_command(&self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1639 self.command_registry
1640 .lock()
1641 .expect("command_registry lock poisoned")
1642 .register(cmd);
1643 }
1644
1645 pub fn cron_scheduler(&self) -> &Arc<CronScheduler> {
1647 &self.cron_scheduler
1648 }
1649
1650 pub async fn close(&self) {
1652 let _ = self.cancel().await;
1653 self.cron_scheduler.stop();
1654 }
1655
1656 fn ensure_cron_started(&self) {
1661 if !self.cron_started.swap(true, Ordering::Relaxed) {
1662 CronScheduler::start(Arc::clone(&self.cron_scheduler));
1663 }
1664 }
1665
1666 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1675 self.ensure_cron_started();
1678
1679 if CommandRegistry::is_command(prompt) {
1681 let ctx = self.build_command_context();
1682 let output = self.command_registry().dispatch(prompt, &ctx);
1683 if let Some(output) = output {
1685 if let Some(CommandAction::BtwQuery(ref question)) = output.action {
1687 let result = self.btw(question).await?;
1688 return Ok(AgentResult {
1689 text: result.answer,
1690 messages: history
1691 .map(|h| h.to_vec())
1692 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1693 tool_calls_count: 0,
1694 usage: result.usage,
1695 });
1696 }
1697 return Ok(AgentResult {
1698 text: output.text,
1699 messages: history
1700 .map(|h| h.to_vec())
1701 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1702 tool_calls_count: 0,
1703 usage: crate::llm::TokenUsage::default(),
1704 });
1705 }
1706 }
1707
1708 if let Some(ref w) = self.init_warning {
1709 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1710 }
1711 let agent_loop = self.build_agent_loop();
1712 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
1713 let runtime_state = Arc::clone(&self.active_tools);
1714 let runtime_collector = tokio::spawn(async move {
1715 while let Some(event) = runtime_rx.recv().await {
1716 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1717 }
1718 });
1719
1720 let use_internal = history.is_none();
1721 let effective_history = match history {
1722 Some(h) => h.to_vec(),
1723 None => read_or_recover(&self.history).clone(),
1724 };
1725
1726 let cancel_token = tokio_util::sync::CancellationToken::new();
1727 *self.cancel_token.lock().await = Some(cancel_token.clone());
1728 let result = agent_loop
1729 .execute_with_session(
1730 &effective_history,
1731 prompt,
1732 Some(&self.session_id),
1733 Some(runtime_tx),
1734 Some(&cancel_token),
1735 )
1736 .await;
1737 *self.cancel_token.lock().await = None;
1738 let _ = runtime_collector.await;
1739 let result = result?;
1740
1741 if use_internal {
1744 *write_or_recover(&self.history) = result.messages.clone();
1745
1746 if self.auto_save {
1748 if let Err(e) = self.save().await {
1749 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1750 }
1751 }
1752 }
1753
1754 if !self.is_processing_cron.swap(true, Ordering::Relaxed) {
1759 let fires = {
1760 let mut rx = self.cron_rx.lock().await;
1761 let mut fires: Vec<ScheduledFire> = Vec::new();
1762 while let Ok(fire) = rx.try_recv() {
1763 fires.push(fire);
1764 }
1765 fires
1766 };
1767 for fire in fires {
1768 tracing::debug!(
1769 task_id = %fire.task_id,
1770 "Firing scheduled cron task"
1771 );
1772 if let Err(e) = Box::pin(self.send(&fire.prompt, None)).await {
1773 tracing::warn!(
1774 task_id = %fire.task_id,
1775 "Scheduled task failed: {e}"
1776 );
1777 }
1778 }
1779 self.is_processing_cron.store(false, Ordering::Relaxed);
1780 }
1781
1782 self.clear_runtime_tracking().await;
1783
1784 Ok(result)
1785 }
1786
1787 async fn build_btw_runtime_context(&self) -> String {
1788 let mut sections = Vec::new();
1789
1790 let active_tools = {
1791 let tools = self.active_tools.read().await;
1792 let mut items = tools
1793 .iter()
1794 .map(|(tool_id, tool)| {
1795 let elapsed_ms = Self::now_ms().saturating_sub(tool.started_at_ms);
1796 format!(
1797 "- {} [{}] running_for={}ms",
1798 tool.tool_name, tool_id, elapsed_ms
1799 )
1800 })
1801 .collect::<Vec<_>>();
1802 items.sort();
1803 items
1804 };
1805 if !active_tools.is_empty() {
1806 sections.push(format!("[active tools]\n{}", active_tools.join("\n")));
1807 }
1808
1809 if let Some(cm) = &self.config.confirmation_manager {
1810 let pending = cm.pending_confirmations().await;
1811 if !pending.is_empty() {
1812 let mut lines = pending
1813 .into_iter()
1814 .map(
1815 |PendingConfirmationInfo {
1816 tool_id,
1817 tool_name,
1818 args,
1819 remaining_ms,
1820 }| {
1821 let arg_summary = Self::compact_json_value(&args);
1822 if arg_summary.is_empty() {
1823 format!(
1824 "- {} [{}] remaining={}ms",
1825 tool_name, tool_id, remaining_ms
1826 )
1827 } else {
1828 format!(
1829 "- {} [{}] remaining={}ms {}",
1830 tool_name, tool_id, remaining_ms, arg_summary
1831 )
1832 }
1833 },
1834 )
1835 .collect::<Vec<_>>();
1836 lines.sort();
1837 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1838 }
1839 }
1840
1841 if let Some(queue) = &self.command_queue {
1842 let stats = queue.stats().await;
1843 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1844 let mut lines = vec![format!(
1845 "active={}, pending={}, external_pending={}",
1846 stats.total_active, stats.total_pending, stats.external_pending
1847 )];
1848 let mut lanes = stats
1849 .lanes
1850 .into_values()
1851 .filter(|lane| lane.active > 0 || lane.pending > 0)
1852 .map(|lane| {
1853 format!(
1854 "- {:?}: active={}, pending={}, handler={:?}",
1855 lane.lane, lane.active, lane.pending, lane.handler_mode
1856 )
1857 })
1858 .collect::<Vec<_>>();
1859 lanes.sort();
1860 lines.extend(lanes);
1861 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1862 }
1863
1864 let external_tasks = queue.pending_external_tasks().await;
1865 if !external_tasks.is_empty() {
1866 let mut lines = external_tasks
1867 .into_iter()
1868 .take(6)
1869 .map(|task| {
1870 let payload_summary = Self::compact_json_value(&task.payload);
1871 if payload_summary.is_empty() {
1872 format!(
1873 "- {} {:?} remaining={}ms",
1874 task.command_type,
1875 task.lane,
1876 task.remaining_ms()
1877 )
1878 } else {
1879 format!(
1880 "- {} {:?} remaining={}ms {}",
1881 task.command_type,
1882 task.lane,
1883 task.remaining_ms(),
1884 payload_summary
1885 )
1886 }
1887 })
1888 .collect::<Vec<_>>();
1889 lines.sort();
1890 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1891 }
1892 }
1893
1894 if let Some(store) = &self.session_store {
1895 if let Ok(Some(session)) = store.load(&self.session_id).await {
1896 let active_tasks = session
1897 .tasks
1898 .into_iter()
1899 .filter(|task| task.status.is_active())
1900 .take(6)
1901 .map(|task| match task.tool {
1902 Some(tool) if !tool.is_empty() => {
1903 format!("- [{}] {} ({})", task.status, task.content, tool)
1904 }
1905 _ => format!("- [{}] {}", task.status, task.content),
1906 })
1907 .collect::<Vec<_>>();
1908 if !active_tasks.is_empty() {
1909 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1910 }
1911 }
1912 }
1913
1914 sections.join("\n\n")
1915 }
1916
1917 pub async fn btw(&self, question: &str) -> Result<BtwResult> {
1935 self.btw_with_context(question, None).await
1936 }
1937
1938 pub async fn btw_with_context(
1943 &self,
1944 question: &str,
1945 runtime_context: Option<&str>,
1946 ) -> Result<BtwResult> {
1947 let question = question.trim();
1948 if question.is_empty() {
1949 return Err(crate::error::CodeError::Session(
1950 "btw: question cannot be empty".to_string(),
1951 ));
1952 }
1953
1954 let history_snapshot = read_or_recover(&self.history).clone();
1956
1957 let mut messages = history_snapshot;
1959 let mut injected_sections = Vec::new();
1960 let session_runtime = self.build_btw_runtime_context().await;
1961 if !session_runtime.is_empty() {
1962 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1963 }
1964 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1965 injected_sections.push(format!("[host runtime context]\n{}", extra));
1966 }
1967 if !injected_sections.is_empty() {
1968 let injected_context = format!(
1969 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1970 injected_sections.join("\n\n")
1971 );
1972 messages.push(Message::user(&injected_context));
1973 }
1974 messages.push(Message::user(question));
1975
1976 let response = self
1977 .llm_client
1978 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1979 .await
1980 .map_err(|e| {
1981 crate::error::CodeError::Llm(format!("btw: ephemeral LLM call failed: {e}"))
1982 })?;
1983
1984 Ok(BtwResult {
1985 question: question.to_string(),
1986 answer: response.text(),
1987 usage: response.usage,
1988 })
1989 }
1990
1991 pub async fn send_with_attachments(
1996 &self,
1997 prompt: &str,
1998 attachments: &[crate::llm::Attachment],
1999 history: Option<&[Message]>,
2000 ) -> Result<AgentResult> {
2001 let use_internal = history.is_none();
2005 let mut effective_history = match history {
2006 Some(h) => h.to_vec(),
2007 None => read_or_recover(&self.history).clone(),
2008 };
2009 effective_history.push(Message::user_with_attachments(prompt, attachments));
2010
2011 let agent_loop = self.build_agent_loop();
2012 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2013 let runtime_state = Arc::clone(&self.active_tools);
2014 let runtime_collector = tokio::spawn(async move {
2015 while let Some(event) = runtime_rx.recv().await {
2016 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2017 }
2018 });
2019 let result = agent_loop
2020 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2021 .await?;
2022 let _ = runtime_collector.await;
2023
2024 if use_internal {
2025 *write_or_recover(&self.history) = result.messages.clone();
2026 if self.auto_save {
2027 if let Err(e) = self.save().await {
2028 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2029 }
2030 }
2031 }
2032
2033 self.clear_runtime_tracking().await;
2034
2035 Ok(result)
2036 }
2037
2038 pub async fn stream_with_attachments(
2043 &self,
2044 prompt: &str,
2045 attachments: &[crate::llm::Attachment],
2046 history: Option<&[Message]>,
2047 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2048 let (tx, rx) = mpsc::channel(256);
2049 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2050 let mut effective_history = match history {
2051 Some(h) => h.to_vec(),
2052 None => read_or_recover(&self.history).clone(),
2053 };
2054 effective_history.push(Message::user_with_attachments(prompt, attachments));
2055
2056 let agent_loop = self.build_agent_loop();
2057 let runtime_state = Arc::clone(&self.active_tools);
2058 let forwarder = tokio::spawn(async move {
2059 let mut forward_enabled = true;
2060 while let Some(event) = runtime_rx.recv().await {
2061 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2062 if forward_enabled && tx.send(event).await.is_err() {
2063 forward_enabled = false;
2064 }
2065 }
2066 });
2067 let handle = tokio::spawn(async move {
2068 let _ = agent_loop
2069 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2070 .await;
2071 });
2072 let active_tools = Arc::clone(&self.active_tools);
2073 let wrapped_handle = tokio::spawn(async move {
2074 let _ = handle.await;
2075 let _ = forwarder.await;
2076 active_tools.write().await.clear();
2077 });
2078
2079 Ok((rx, wrapped_handle))
2080 }
2081
2082 pub async fn stream(
2092 &self,
2093 prompt: &str,
2094 history: Option<&[Message]>,
2095 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2096 self.ensure_cron_started();
2097
2098 if CommandRegistry::is_command(prompt) {
2100 let ctx = self.build_command_context();
2101 let output = self.command_registry().dispatch(prompt, &ctx);
2102 if let Some(output) = output {
2104 let (tx, rx) = mpsc::channel(256);
2105
2106 if let Some(CommandAction::BtwQuery(question)) = output.action {
2108 let llm_client = self.llm_client.clone();
2110 let history_snapshot = read_or_recover(&self.history).clone();
2111 let handle = tokio::spawn(async move {
2112 let mut messages = history_snapshot;
2113 messages.push(Message::user(&question));
2114 match llm_client
2115 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2116 .await
2117 {
2118 Ok(response) => {
2119 let answer = response.text();
2120 let _ = tx
2121 .send(AgentEvent::BtwAnswer {
2122 question: question.clone(),
2123 answer: answer.clone(),
2124 usage: response.usage,
2125 })
2126 .await;
2127 let _ = tx
2128 .send(AgentEvent::End {
2129 text: answer,
2130 usage: crate::llm::TokenUsage::default(),
2131 meta: None,
2132 })
2133 .await;
2134 }
2135 Err(e) => {
2136 let _ = tx
2137 .send(AgentEvent::Error {
2138 message: format!("btw failed: {e}"),
2139 })
2140 .await;
2141 }
2142 }
2143 });
2144 return Ok((rx, handle));
2145 }
2146
2147 let handle = tokio::spawn(async move {
2148 let _ = tx
2149 .send(AgentEvent::TextDelta {
2150 text: output.text.clone(),
2151 })
2152 .await;
2153 let _ = tx
2154 .send(AgentEvent::End {
2155 text: output.text.clone(),
2156 usage: crate::llm::TokenUsage::default(),
2157 meta: None,
2158 })
2159 .await;
2160 });
2161 return Ok((rx, handle));
2162 }
2163 }
2164
2165 let (tx, rx) = mpsc::channel(256);
2166 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2167 let agent_loop = self.build_agent_loop();
2168 let effective_history = match history {
2169 Some(h) => h.to_vec(),
2170 None => read_or_recover(&self.history).clone(),
2171 };
2172 let prompt = prompt.to_string();
2173 let session_id = self.session_id.clone();
2174
2175 let cancel_token = tokio_util::sync::CancellationToken::new();
2176 *self.cancel_token.lock().await = Some(cancel_token.clone());
2177 let token_clone = cancel_token.clone();
2178 let runtime_state = Arc::clone(&self.active_tools);
2179 let forwarder = tokio::spawn(async move {
2180 let mut forward_enabled = true;
2181 while let Some(event) = runtime_rx.recv().await {
2182 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2183 if forward_enabled && tx.send(event).await.is_err() {
2184 forward_enabled = false;
2185 }
2186 }
2187 });
2188
2189 let handle = tokio::spawn(async move {
2190 let _ = agent_loop
2191 .execute_with_session(
2192 &effective_history,
2193 &prompt,
2194 Some(&session_id),
2195 Some(runtime_tx),
2196 Some(&token_clone),
2197 )
2198 .await;
2199 });
2200
2201 let cancel_token_ref = self.cancel_token.clone();
2203 let active_tools = Arc::clone(&self.active_tools);
2204 let wrapped_handle = tokio::spawn(async move {
2205 let _ = handle.await;
2206 let _ = forwarder.await;
2207 *cancel_token_ref.lock().await = None;
2208 active_tools.write().await.clear();
2209 });
2210
2211 Ok((rx, wrapped_handle))
2212 }
2213
2214 pub async fn cancel(&self) -> bool {
2221 let token = self.cancel_token.lock().await.clone();
2222 if let Some(token) = token {
2223 token.cancel();
2224 tracing::info!(session_id = %self.session_id, "Cancelled ongoing operation");
2225 true
2226 } else {
2227 tracing::debug!(session_id = %self.session_id, "No ongoing operation to cancel");
2228 false
2229 }
2230 }
2231
2232 pub fn history(&self) -> Vec<Message> {
2234 read_or_recover(&self.history).clone()
2235 }
2236
2237 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
2239 self.memory.as_ref()
2240 }
2241
2242 pub fn id(&self) -> &str {
2244 &self.session_id
2245 }
2246
2247 pub fn workspace(&self) -> &std::path::Path {
2249 &self.workspace
2250 }
2251
2252 pub fn init_warning(&self) -> Option<&str> {
2254 self.init_warning.as_deref()
2255 }
2256
2257 pub fn session_id(&self) -> &str {
2259 &self.session_id
2260 }
2261
2262 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
2268 self.tool_executor.definitions()
2269 }
2270
2271 pub fn tool_names(&self) -> Vec<String> {
2277 self.tool_executor
2278 .definitions()
2279 .into_iter()
2280 .map(|t| t.name)
2281 .collect()
2282 }
2283
2284 pub fn task_manager(&self) -> &Arc<TaskManager> {
2293 &self.task_manager
2294 }
2295
2296 pub fn spawn_task(&self, task: crate::task::Task) -> crate::task::TaskId {
2311 self.task_manager.spawn(task)
2312 }
2313
2314 pub fn track_tool_call(&self, tool_name: &str, args_summary: &str, success: bool) {
2318 if let Ok(mut guard) = self.progress_tracker.try_write() {
2319 guard.track_tool_call(tool_name, args_summary, success);
2320 }
2321 }
2322
2323 pub async fn get_progress(&self) -> crate::task::AgentProgress {
2327 self.progress_tracker.read().await.progress()
2328 }
2329
2330 pub fn subscribe_tasks(
2334 &self,
2335 task_id: crate::task::TaskId,
2336 ) -> Option<tokio::sync::broadcast::Receiver<crate::task::manager::TaskEvent>> {
2337 self.task_manager.subscribe(task_id)
2338 }
2339
2340 pub fn subscribe_all_tasks(
2342 &self,
2343 ) -> tokio::sync::broadcast::Receiver<crate::task::manager::TaskEvent> {
2344 self.task_manager.subscribe_all()
2345 }
2346
2347 pub fn register_hook(&self, hook: crate::hooks::Hook) {
2353 self.hook_engine.register(hook);
2354 }
2355
2356 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
2358 self.hook_engine.unregister(hook_id)
2359 }
2360
2361 pub fn register_hook_handler(
2363 &self,
2364 hook_id: &str,
2365 handler: Arc<dyn crate::hooks::HookHandler>,
2366 ) {
2367 self.hook_engine.register_handler(hook_id, handler);
2368 }
2369
2370 pub fn unregister_hook_handler(&self, hook_id: &str) {
2372 self.hook_engine.unregister_handler(hook_id);
2373 }
2374
2375 pub fn hook_count(&self) -> usize {
2377 self.hook_engine.hook_count()
2378 }
2379
2380 pub async fn save(&self) -> Result<()> {
2384 let store = match &self.session_store {
2385 Some(s) => s,
2386 None => return Ok(()),
2387 };
2388
2389 let history = read_or_recover(&self.history).clone();
2390 let now = chrono::Utc::now().timestamp();
2391
2392 let data = crate::store::SessionData {
2393 id: self.session_id.clone(),
2394 config: crate::session::SessionConfig {
2395 name: String::new(),
2396 workspace: self.workspace.display().to_string(),
2397 system_prompt: Some(self.config.prompt_slots.build()),
2398 max_context_length: 200_000,
2399 auto_compact: false,
2400 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
2401 storage_type: crate::config::StorageBackend::File,
2402 queue_config: None,
2403 confirmation_policy: None,
2404 permission_policy: None,
2405 parent_id: None,
2406 security_config: None,
2407 hook_engine: None,
2408 planning_mode: self.config.planning_mode,
2409 goal_tracking: self.config.goal_tracking,
2410 },
2411 state: crate::session::SessionState::Active,
2412 messages: history,
2413 context_usage: crate::session::ContextUsage::default(),
2414 total_usage: crate::llm::TokenUsage::default(),
2415 total_cost: 0.0,
2416 model_name: None,
2417 cost_records: Vec::new(),
2418 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
2419 thinking_enabled: false,
2420 thinking_budget: None,
2421 created_at: now,
2422 updated_at: now,
2423 llm_config: None,
2424 tasks: Vec::new(),
2425 parent_id: None,
2426 };
2427
2428 store.save(&data).await?;
2429 tracing::debug!("Session {} saved", self.session_id);
2430 Ok(())
2431 }
2432
2433 pub async fn read_file(&self, path: &str) -> Result<String> {
2435 let args = serde_json::json!({ "file_path": path });
2436 let result = self.tool_executor.execute("read", &args).await?;
2437 Ok(result.output)
2438 }
2439
2440 pub async fn bash(&self, command: &str) -> Result<String> {
2445 let args = serde_json::json!({ "command": command });
2446 let result = self
2447 .tool_executor
2448 .execute_with_context("bash", &args, &self.tool_context)
2449 .await?;
2450 Ok(result.output)
2451 }
2452
2453 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
2455 let args = serde_json::json!({ "pattern": pattern });
2456 let result = self.tool_executor.execute("glob", &args).await?;
2457 let files: Vec<String> = result
2458 .output
2459 .lines()
2460 .filter(|l| !l.is_empty())
2461 .map(|l| l.to_string())
2462 .collect();
2463 Ok(files)
2464 }
2465
2466 pub async fn grep(&self, pattern: &str) -> Result<String> {
2468 let args = serde_json::json!({ "pattern": pattern });
2469 let result = self.tool_executor.execute("grep", &args).await?;
2470 Ok(result.output)
2471 }
2472
2473 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
2475 let result = self.tool_executor.execute(name, &args).await?;
2476 Ok(ToolCallResult {
2477 name: name.to_string(),
2478 output: result.output,
2479 exit_code: result.exit_code,
2480 metadata: result.metadata,
2481 })
2482 }
2483
2484 pub fn has_queue(&self) -> bool {
2490 self.command_queue.is_some()
2491 }
2492
2493 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
2497 if let Some(ref queue) = self.command_queue {
2498 queue.set_lane_handler(lane, config).await;
2499 }
2500 }
2501
2502 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
2506 if let Some(ref queue) = self.command_queue {
2507 queue.complete_external_task(task_id, result).await
2508 } else {
2509 false
2510 }
2511 }
2512
2513 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
2515 if let Some(ref queue) = self.command_queue {
2516 queue.pending_external_tasks().await
2517 } else {
2518 Vec::new()
2519 }
2520 }
2521
2522 pub async fn queue_stats(&self) -> SessionQueueStats {
2524 if let Some(ref queue) = self.command_queue {
2525 queue.stats().await
2526 } else {
2527 SessionQueueStats::default()
2528 }
2529 }
2530
2531 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
2533 if let Some(ref queue) = self.command_queue {
2534 queue.metrics_snapshot().await
2535 } else {
2536 None
2537 }
2538 }
2539
2540 pub async fn submit(
2546 &self,
2547 lane: SessionLane,
2548 command: Box<dyn crate::queue::SessionCommand>,
2549 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
2550 let queue = self
2551 .command_queue
2552 .as_ref()
2553 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2554 Ok(queue.submit(lane, command).await)
2555 }
2556
2557 pub async fn submit_batch(
2564 &self,
2565 lane: SessionLane,
2566 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
2567 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
2568 {
2569 let queue = self
2570 .command_queue
2571 .as_ref()
2572 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2573 Ok(queue.submit_batch(lane, commands).await)
2574 }
2575
2576 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
2578 if let Some(ref queue) = self.command_queue {
2579 queue.dead_letters().await
2580 } else {
2581 Vec::new()
2582 }
2583 }
2584
2585 pub fn register_agent_dir(&self, dir: &std::path::Path) -> usize {
2598 use crate::subagent::load_agents_from_dir;
2599 let agents = load_agents_from_dir(dir);
2600 let count = agents.len();
2601 for agent in agents {
2602 tracing::info!(
2603 session_id = %self.session_id,
2604 agent = agent.name,
2605 dir = %dir.display(),
2606 "Dynamically registered agent"
2607 );
2608 self.agent_registry.register(agent);
2609 }
2610 count
2611 }
2612
2613 pub async fn add_mcp_server(
2620 &self,
2621 config: crate::mcp::McpServerConfig,
2622 ) -> crate::error::Result<usize> {
2623 let server_name = config.name.clone();
2624 self.mcp_manager.register_server(config).await;
2625 self.mcp_manager.connect(&server_name).await.map_err(|e| {
2626 crate::error::CodeError::Tool {
2627 tool: server_name.clone(),
2628 message: format!("Failed to connect MCP server: {}", e),
2629 }
2630 })?;
2631
2632 let tools = self.mcp_manager.get_server_tools(&server_name).await;
2633 let count = tools.len();
2634
2635 for tool in
2636 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
2637 {
2638 self.tool_executor.register_dynamic_tool(tool);
2639 }
2640
2641 tracing::info!(
2642 session_id = %self.session_id,
2643 server = server_name,
2644 tools = count,
2645 "MCP server added to live session"
2646 );
2647
2648 Ok(count)
2649 }
2650
2651 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
2656 self.tool_executor
2657 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
2658 self.mcp_manager
2659 .disconnect(server_name)
2660 .await
2661 .map_err(|e| crate::error::CodeError::Tool {
2662 tool: server_name.to_string(),
2663 message: format!("Failed to disconnect MCP server: {}", e),
2664 })?;
2665 tracing::info!(
2666 session_id = %self.session_id,
2667 server = server_name,
2668 "MCP server removed from live session"
2669 );
2670 Ok(())
2671 }
2672
2673 pub async fn mcp_status(
2675 &self,
2676 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
2677 self.mcp_manager.get_status().await
2678 }
2679}
2680
2681#[cfg(test)]
2686mod tests {
2687 use super::*;
2688 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
2689 use crate::store::SessionStore;
2690
2691 #[tokio::test]
2692 async fn test_session_submit_no_queue_returns_err() {
2693 let agent = Agent::from_config(test_config()).await.unwrap();
2694 let session = agent.session(".", None).unwrap();
2695 struct Noop;
2696 #[async_trait::async_trait]
2697 impl crate::queue::SessionCommand for Noop {
2698 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2699 Ok(serde_json::json!(null))
2700 }
2701 fn command_type(&self) -> &str {
2702 "noop"
2703 }
2704 }
2705 let result: anyhow::Result<
2706 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
2707 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
2708 assert!(result.is_err());
2709 assert!(result.unwrap_err().to_string().contains("No queue"));
2710 }
2711
2712 #[tokio::test]
2713 async fn test_session_submit_batch_no_queue_returns_err() {
2714 let agent = Agent::from_config(test_config()).await.unwrap();
2715 let session = agent.session(".", None).unwrap();
2716 struct Noop;
2717 #[async_trait::async_trait]
2718 impl crate::queue::SessionCommand for Noop {
2719 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2720 Ok(serde_json::json!(null))
2721 }
2722 fn command_type(&self) -> &str {
2723 "noop"
2724 }
2725 }
2726 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
2727 let result: anyhow::Result<
2728 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
2729 > = session.submit_batch(SessionLane::Query, cmds).await;
2730 assert!(result.is_err());
2731 assert!(result.unwrap_err().to_string().contains("No queue"));
2732 }
2733
2734 fn test_config() -> CodeConfig {
2735 CodeConfig {
2736 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
2737 providers: vec![
2738 ProviderConfig {
2739 name: "anthropic".to_string(),
2740 api_key: Some("test-key".to_string()),
2741 base_url: None,
2742 headers: std::collections::HashMap::new(),
2743 session_id_header: None,
2744 models: vec![ModelConfig {
2745 id: "claude-sonnet-4-20250514".to_string(),
2746 name: "Claude Sonnet 4".to_string(),
2747 family: "claude-sonnet".to_string(),
2748 api_key: None,
2749 base_url: None,
2750 headers: std::collections::HashMap::new(),
2751 session_id_header: None,
2752 attachment: false,
2753 reasoning: false,
2754 tool_call: true,
2755 temperature: true,
2756 release_date: None,
2757 modalities: ModelModalities::default(),
2758 cost: Default::default(),
2759 limit: Default::default(),
2760 }],
2761 },
2762 ProviderConfig {
2763 name: "openai".to_string(),
2764 api_key: Some("test-openai-key".to_string()),
2765 base_url: None,
2766 headers: std::collections::HashMap::new(),
2767 session_id_header: None,
2768 models: vec![ModelConfig {
2769 id: "gpt-4o".to_string(),
2770 name: "GPT-4o".to_string(),
2771 family: "gpt-4".to_string(),
2772 api_key: None,
2773 base_url: None,
2774 headers: std::collections::HashMap::new(),
2775 session_id_header: None,
2776 attachment: false,
2777 reasoning: false,
2778 tool_call: true,
2779 temperature: true,
2780 release_date: None,
2781 modalities: ModelModalities::default(),
2782 cost: Default::default(),
2783 limit: Default::default(),
2784 }],
2785 },
2786 ],
2787 ..Default::default()
2788 }
2789 }
2790
2791 fn build_effective_registry_for_test(
2792 agent_registry: Option<Arc<crate::skills::SkillRegistry>>,
2793 opts: &SessionOptions,
2794 ) -> Arc<crate::skills::SkillRegistry> {
2795 let base_registry = agent_registry
2796 .as_deref()
2797 .map(|r| r.fork())
2798 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
2799 if let Some(ref r) = opts.skill_registry {
2800 for skill in r.all() {
2801 base_registry.register_unchecked(skill);
2802 }
2803 }
2804 for dir in &opts.skill_dirs {
2805 if let Err(e) = base_registry.load_from_dir(dir) {
2806 tracing::warn!(
2807 dir = %dir.display(),
2808 error = %e,
2809 "Failed to load session skill dir — skipping"
2810 );
2811 }
2812 }
2813 Arc::new(base_registry)
2814 }
2815
2816 #[tokio::test]
2817 async fn test_from_config() {
2818 let agent = Agent::from_config(test_config()).await;
2819 assert!(agent.is_ok());
2820 }
2821
2822 #[tokio::test]
2823 async fn test_session_default() {
2824 let agent = Agent::from_config(test_config()).await.unwrap();
2825 let session = agent.session("/tmp/test-workspace", None);
2826 assert!(session.is_ok());
2827 let debug = format!("{:?}", session.unwrap());
2828 assert!(debug.contains("AgentSession"));
2829 }
2830
2831 #[tokio::test]
2832 async fn test_session_registers_agentic_tools_by_default() {
2833 let agent = Agent::from_config(test_config()).await.unwrap();
2834 let session = agent.session("/tmp/test-workspace", None).unwrap();
2835 let tool_names = session.tool_names();
2836
2837 assert!(tool_names.iter().any(|name| name == "agentic_search"));
2838 assert!(tool_names.iter().any(|name| name == "agentic_parse"));
2839 }
2840
2841 #[tokio::test]
2842 async fn test_session_can_disable_agentic_tools_via_config() {
2843 let mut config = test_config();
2844 config.agentic_search = Some(crate::config::AgenticSearchConfig {
2845 enabled: false,
2846 ..Default::default()
2847 });
2848 config.agentic_parse = Some(crate::config::AgenticParseConfig {
2849 enabled: false,
2850 ..Default::default()
2851 });
2852
2853 let agent = Agent::from_config(config).await.unwrap();
2854 let session = agent.session("/tmp/test-workspace", None).unwrap();
2855 let tool_names = session.tool_names();
2856
2857 assert!(!tool_names.iter().any(|name| name == "agentic_search"));
2858 assert!(!tool_names.iter().any(|name| name == "agentic_parse"));
2859 }
2860
2861 #[tokio::test]
2862 async fn test_session_with_model_override() {
2863 let agent = Agent::from_config(test_config()).await.unwrap();
2864 let opts = SessionOptions::new().with_model("openai/gpt-4o");
2865 let session = agent.session("/tmp/test-workspace", Some(opts));
2866 assert!(session.is_ok());
2867 }
2868
2869 #[tokio::test]
2870 async fn test_session_with_invalid_model_format() {
2871 let agent = Agent::from_config(test_config()).await.unwrap();
2872 let opts = SessionOptions::new().with_model("gpt-4o");
2873 let session = agent.session("/tmp/test-workspace", Some(opts));
2874 assert!(session.is_err());
2875 }
2876
2877 #[tokio::test]
2878 async fn test_session_with_model_not_found() {
2879 let agent = Agent::from_config(test_config()).await.unwrap();
2880 let opts = SessionOptions::new().with_model("openai/nonexistent");
2881 let session = agent.session("/tmp/test-workspace", Some(opts));
2882 assert!(session.is_err());
2883 }
2884
2885 #[tokio::test]
2886 async fn test_session_preserves_skill_scorer_from_agent_registry() {
2887 use crate::skills::feedback::{
2888 DefaultSkillScorer, SkillFeedback, SkillOutcome, SkillScorer,
2889 };
2890 use crate::skills::{Skill, SkillKind, SkillRegistry};
2891
2892 let registry = Arc::new(SkillRegistry::new());
2893 let scorer = Arc::new(DefaultSkillScorer::default());
2894 registry.set_scorer(scorer.clone());
2895
2896 registry.register_unchecked(Arc::new(Skill {
2897 name: "healthy-skill".to_string(),
2898 description: "healthy".to_string(),
2899 allowed_tools: None,
2900 disable_model_invocation: false,
2901 kind: SkillKind::Instruction,
2902 content: "healthy".to_string(),
2903 tags: vec![],
2904 version: None,
2905 }));
2906 registry.register_unchecked(Arc::new(Skill {
2907 name: "disabled-skill".to_string(),
2908 description: "disabled".to_string(),
2909 allowed_tools: None,
2910 disable_model_invocation: false,
2911 kind: SkillKind::Instruction,
2912 content: "disabled".to_string(),
2913 tags: vec![],
2914 version: None,
2915 }));
2916
2917 for _ in 0..5 {
2918 scorer.record(SkillFeedback {
2919 skill_name: "disabled-skill".to_string(),
2920 outcome: SkillOutcome::Failure,
2921 score_delta: -1.0,
2922 reason: "bad".to_string(),
2923 timestamp: 0,
2924 });
2925 }
2926
2927 let effective_registry =
2928 build_effective_registry_for_test(Some(registry), &SessionOptions::new());
2929 let prompt = effective_registry.to_system_prompt();
2930
2931 assert!(prompt.contains("healthy-skill"));
2932 assert!(!prompt.contains("disabled-skill"));
2933 }
2934
2935 #[tokio::test]
2936 async fn test_session_skill_dirs_preserve_agent_registry_validator() {
2937 use crate::skills::validator::DefaultSkillValidator;
2938 use crate::skills::SkillRegistry;
2939
2940 let registry = Arc::new(SkillRegistry::new());
2941 registry.set_validator(Arc::new(DefaultSkillValidator::default()));
2942
2943 let temp_dir = tempfile::tempdir().unwrap();
2944 let invalid_skill = temp_dir.path().join("invalid.md");
2945 std::fs::write(
2946 &invalid_skill,
2947 r#"---
2948name: BadName
2949description: "invalid skill name"
2950kind: instruction
2951---
2952# Invalid Skill
2953"#,
2954 )
2955 .unwrap();
2956
2957 let opts = SessionOptions::new().with_skill_dirs([temp_dir.path()]);
2958 let effective_registry = build_effective_registry_for_test(Some(registry), &opts);
2959 assert!(effective_registry.get("BadName").is_none());
2960 }
2961
2962 #[tokio::test]
2963 async fn test_session_skill_registry_overrides_agent_registry_without_polluting_parent() {
2964 use crate::skills::{Skill, SkillKind, SkillRegistry};
2965
2966 let registry = Arc::new(SkillRegistry::new());
2967 registry.register_unchecked(Arc::new(Skill {
2968 name: "shared-skill".to_string(),
2969 description: "agent level".to_string(),
2970 allowed_tools: None,
2971 disable_model_invocation: false,
2972 kind: SkillKind::Instruction,
2973 content: "agent content".to_string(),
2974 tags: vec![],
2975 version: None,
2976 }));
2977
2978 let session_registry = Arc::new(SkillRegistry::new());
2979 session_registry.register_unchecked(Arc::new(Skill {
2980 name: "shared-skill".to_string(),
2981 description: "session level".to_string(),
2982 allowed_tools: None,
2983 disable_model_invocation: false,
2984 kind: SkillKind::Instruction,
2985 content: "session content".to_string(),
2986 tags: vec![],
2987 version: None,
2988 }));
2989
2990 let opts = SessionOptions::new().with_skill_registry(session_registry);
2991 let effective_registry = build_effective_registry_for_test(Some(registry.clone()), &opts);
2992
2993 assert_eq!(
2994 effective_registry.get("shared-skill").unwrap().content,
2995 "session content"
2996 );
2997 assert_eq!(
2998 registry.get("shared-skill").unwrap().content,
2999 "agent content"
3000 );
3001 }
3002
3003 #[tokio::test]
3004 async fn test_session_skill_dirs_override_session_registry_and_skip_invalid_entries() {
3005 use crate::skills::{Skill, SkillKind, SkillRegistry};
3006
3007 let session_registry = Arc::new(SkillRegistry::new());
3008 session_registry.register_unchecked(Arc::new(Skill {
3009 name: "shared-skill".to_string(),
3010 description: "session registry".to_string(),
3011 allowed_tools: None,
3012 disable_model_invocation: false,
3013 kind: SkillKind::Instruction,
3014 content: "registry content".to_string(),
3015 tags: vec![],
3016 version: None,
3017 }));
3018
3019 let temp_dir = tempfile::tempdir().unwrap();
3020 std::fs::write(
3021 temp_dir.path().join("shared.md"),
3022 r#"---
3023name: shared-skill
3024description: "skill dir override"
3025kind: instruction
3026---
3027# Shared Skill
3028dir content
3029"#,
3030 )
3031 .unwrap();
3032 std::fs::write(temp_dir.path().join("README.md"), "# not a skill").unwrap();
3033
3034 let opts = SessionOptions::new()
3035 .with_skill_registry(session_registry)
3036 .with_skill_dirs([temp_dir.path()]);
3037 let effective_registry = build_effective_registry_for_test(None, &opts);
3038
3039 assert_eq!(
3040 effective_registry.get("shared-skill").unwrap().description,
3041 "skill dir override"
3042 );
3043 assert!(effective_registry.get("README").is_none());
3044 }
3045
3046 #[tokio::test]
3047 async fn test_session_plugin_skills_are_loaded_into_session_registry_only() {
3048 use crate::plugin::{Plugin, PluginContext};
3049 use crate::skills::{Skill, SkillKind, SkillRegistry};
3050 use crate::tools::ToolRegistry;
3051
3052 struct SessionOnlySkillPlugin;
3053
3054 impl Plugin for SessionOnlySkillPlugin {
3055 fn name(&self) -> &str {
3056 "session-only-skill"
3057 }
3058
3059 fn version(&self) -> &str {
3060 "0.1.0"
3061 }
3062
3063 fn tool_names(&self) -> &[&str] {
3064 &[]
3065 }
3066
3067 fn load(
3068 &self,
3069 _registry: &Arc<ToolRegistry>,
3070 _ctx: &PluginContext,
3071 ) -> anyhow::Result<()> {
3072 Ok(())
3073 }
3074
3075 fn skills(&self) -> Vec<Arc<Skill>> {
3076 vec![Arc::new(Skill {
3077 name: "plugin-session-skill".to_string(),
3078 description: "plugin skill".to_string(),
3079 allowed_tools: None,
3080 disable_model_invocation: false,
3081 kind: SkillKind::Instruction,
3082 content: "plugin content".to_string(),
3083 tags: vec!["plugin".to_string()],
3084 version: None,
3085 })]
3086 }
3087 }
3088
3089 let mut agent = Agent::from_config(test_config()).await.unwrap();
3090 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3091 agent.config.skill_registry = Some(Arc::clone(&agent_registry));
3092
3093 let opts = SessionOptions::new().with_plugin(SessionOnlySkillPlugin);
3094 let session = agent.session("/tmp/test-workspace", Some(opts)).unwrap();
3095
3096 let session_registry = session.config.skill_registry.as_ref().unwrap();
3097 assert!(session_registry.get("plugin-session-skill").is_some());
3098 assert!(agent_registry.get("plugin-session-skill").is_none());
3099 }
3100
3101 #[tokio::test]
3102 async fn test_session_specific_skills_do_not_leak_across_sessions() {
3103 use crate::skills::{Skill, SkillKind, SkillRegistry};
3104
3105 let mut agent = Agent::from_config(test_config()).await.unwrap();
3106 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3107 agent.config.skill_registry = Some(agent_registry);
3108
3109 let session_registry = Arc::new(SkillRegistry::new());
3110 session_registry.register_unchecked(Arc::new(Skill {
3111 name: "session-only".to_string(),
3112 description: "only for first session".to_string(),
3113 allowed_tools: None,
3114 disable_model_invocation: false,
3115 kind: SkillKind::Instruction,
3116 content: "session one".to_string(),
3117 tags: vec![],
3118 version: None,
3119 }));
3120
3121 let session_one = agent
3122 .session(
3123 "/tmp/test-workspace",
3124 Some(SessionOptions::new().with_skill_registry(session_registry)),
3125 )
3126 .unwrap();
3127 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3128
3129 assert!(session_one
3130 .config
3131 .skill_registry
3132 .as_ref()
3133 .unwrap()
3134 .get("session-only")
3135 .is_some());
3136 assert!(session_two
3137 .config
3138 .skill_registry
3139 .as_ref()
3140 .unwrap()
3141 .get("session-only")
3142 .is_none());
3143 }
3144
3145 #[tokio::test]
3146 async fn test_plugin_skills_do_not_leak_across_sessions() {
3147 use crate::plugin::{Plugin, PluginContext};
3148 use crate::skills::{Skill, SkillKind, SkillRegistry};
3149 use crate::tools::ToolRegistry;
3150
3151 struct LeakyPlugin;
3152
3153 impl Plugin for LeakyPlugin {
3154 fn name(&self) -> &str {
3155 "leaky-plugin"
3156 }
3157
3158 fn version(&self) -> &str {
3159 "0.1.0"
3160 }
3161
3162 fn tool_names(&self) -> &[&str] {
3163 &[]
3164 }
3165
3166 fn load(
3167 &self,
3168 _registry: &Arc<ToolRegistry>,
3169 _ctx: &PluginContext,
3170 ) -> anyhow::Result<()> {
3171 Ok(())
3172 }
3173
3174 fn skills(&self) -> Vec<Arc<Skill>> {
3175 vec![Arc::new(Skill {
3176 name: "plugin-only".to_string(),
3177 description: "plugin only".to_string(),
3178 allowed_tools: None,
3179 disable_model_invocation: false,
3180 kind: SkillKind::Instruction,
3181 content: "plugin skill".to_string(),
3182 tags: vec![],
3183 version: None,
3184 })]
3185 }
3186 }
3187
3188 let mut agent = Agent::from_config(test_config()).await.unwrap();
3189 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3190
3191 let session_one = agent
3192 .session(
3193 "/tmp/test-workspace",
3194 Some(SessionOptions::new().with_plugin(LeakyPlugin)),
3195 )
3196 .unwrap();
3197 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3198
3199 assert!(session_one
3200 .config
3201 .skill_registry
3202 .as_ref()
3203 .unwrap()
3204 .get("plugin-only")
3205 .is_some());
3206 assert!(session_two
3207 .config
3208 .skill_registry
3209 .as_ref()
3210 .unwrap()
3211 .get("plugin-only")
3212 .is_none());
3213 }
3214
3215 #[tokio::test]
3216 async fn test_session_for_agent_applies_definition_and_keeps_skill_overrides_isolated() {
3217 use crate::skills::{Skill, SkillKind, SkillRegistry};
3218 use crate::subagent::AgentDefinition;
3219
3220 let mut agent = Agent::from_config(test_config()).await.unwrap();
3221 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3222
3223 let definition = AgentDefinition::new("reviewer", "Review code")
3224 .with_prompt("Agent definition prompt")
3225 .with_max_steps(7);
3226
3227 let session_registry = Arc::new(SkillRegistry::new());
3228 session_registry.register_unchecked(Arc::new(Skill {
3229 name: "agent-session-skill".to_string(),
3230 description: "agent session only".to_string(),
3231 allowed_tools: None,
3232 disable_model_invocation: false,
3233 kind: SkillKind::Instruction,
3234 content: "agent session content".to_string(),
3235 tags: vec![],
3236 version: None,
3237 }));
3238
3239 let session_one = agent
3240 .session_for_agent(
3241 "/tmp/test-workspace",
3242 &definition,
3243 Some(SessionOptions::new().with_skill_registry(session_registry)),
3244 )
3245 .unwrap();
3246 let session_two = agent
3247 .session_for_agent("/tmp/test-workspace", &definition, None)
3248 .unwrap();
3249
3250 assert_eq!(session_one.config.max_tool_rounds, 7);
3251 let extra = session_one.config.prompt_slots.extra.as_deref().unwrap();
3252 assert!(extra.contains("Agent definition prompt"));
3253 assert!(extra.contains("agent-session-skill"));
3254 assert!(session_one
3255 .config
3256 .skill_registry
3257 .as_ref()
3258 .unwrap()
3259 .get("agent-session-skill")
3260 .is_some());
3261 assert!(session_two
3262 .config
3263 .skill_registry
3264 .as_ref()
3265 .unwrap()
3266 .get("agent-session-skill")
3267 .is_none());
3268 }
3269
3270 #[tokio::test]
3271 async fn test_session_for_agent_preserves_existing_prompt_slots_when_injecting_definition_prompt(
3272 ) {
3273 use crate::prompts::SystemPromptSlots;
3274 use crate::subagent::AgentDefinition;
3275
3276 let agent = Agent::from_config(test_config()).await.unwrap();
3277 let definition = AgentDefinition::new("planner", "Plan work")
3278 .with_prompt("Definition extra prompt")
3279 .with_max_steps(3);
3280
3281 let opts = SessionOptions::new().with_prompt_slots(SystemPromptSlots {
3282 style: None,
3283 role: Some("Custom role".to_string()),
3284 guidelines: None,
3285 response_style: None,
3286 extra: None,
3287 });
3288
3289 let session = agent
3290 .session_for_agent("/tmp/test-workspace", &definition, Some(opts))
3291 .unwrap();
3292
3293 assert_eq!(
3294 session.config.prompt_slots.role.as_deref(),
3295 Some("Custom role")
3296 );
3297 assert!(session
3298 .config
3299 .prompt_slots
3300 .extra
3301 .as_deref()
3302 .unwrap()
3303 .contains("Definition extra prompt"));
3304 assert_eq!(session.config.max_tool_rounds, 3);
3305 }
3306
3307 #[tokio::test]
3308 async fn test_new_with_hcl_string() {
3309 let hcl = r#"
3310 default_model = "anthropic/claude-sonnet-4-20250514"
3311 providers {
3312 name = "anthropic"
3313 api_key = "test-key"
3314 models {
3315 id = "claude-sonnet-4-20250514"
3316 name = "Claude Sonnet 4"
3317 }
3318 }
3319 "#;
3320 let agent = Agent::new(hcl).await;
3321 assert!(agent.is_ok());
3322 }
3323
3324 #[tokio::test]
3325 async fn test_create_alias_hcl() {
3326 let hcl = r#"
3327 default_model = "anthropic/claude-sonnet-4-20250514"
3328 providers {
3329 name = "anthropic"
3330 api_key = "test-key"
3331 models {
3332 id = "claude-sonnet-4-20250514"
3333 name = "Claude Sonnet 4"
3334 }
3335 }
3336 "#;
3337 let agent = Agent::create(hcl).await;
3338 assert!(agent.is_ok());
3339 }
3340
3341 #[tokio::test]
3342 async fn test_create_and_new_produce_same_result() {
3343 let hcl = r#"
3344 default_model = "anthropic/claude-sonnet-4-20250514"
3345 providers {
3346 name = "anthropic"
3347 api_key = "test-key"
3348 models {
3349 id = "claude-sonnet-4-20250514"
3350 name = "Claude Sonnet 4"
3351 }
3352 }
3353 "#;
3354 let agent_new = Agent::new(hcl).await;
3355 let agent_create = Agent::create(hcl).await;
3356 assert!(agent_new.is_ok());
3357 assert!(agent_create.is_ok());
3358
3359 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
3361 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
3362 assert!(session_new.is_ok());
3363 assert!(session_create.is_ok());
3364 }
3365
3366 #[tokio::test]
3367 async fn test_new_with_existing_hcl_file_uses_file_loading() {
3368 let temp_dir = tempfile::tempdir().unwrap();
3369 let config_path = temp_dir.path().join("agent.hcl");
3370 std::fs::write(&config_path, "this is not valid hcl").unwrap();
3371
3372 let err = Agent::new(config_path.display().to_string())
3373 .await
3374 .unwrap_err();
3375 let msg = err.to_string();
3376
3377 assert!(msg.contains("Failed to load config"));
3378 assert!(msg.contains("agent.hcl"));
3379 assert!(!msg.contains("Failed to parse config as HCL string"));
3380 }
3381
3382 #[tokio::test]
3383 async fn test_new_with_missing_hcl_file_reports_not_found() {
3384 let temp_dir = tempfile::tempdir().unwrap();
3385 let missing_path = temp_dir.path().join("agent.hcl");
3386
3387 let err = Agent::new(missing_path.display().to_string())
3388 .await
3389 .unwrap_err();
3390 let msg = err.to_string();
3391
3392 assert!(msg.contains("Config file not found"));
3393 assert!(msg.contains("agent.hcl"));
3394 assert!(!msg.contains("Failed to parse config as HCL string"));
3395 }
3396
3397 #[test]
3398 fn test_from_config_requires_default_model() {
3399 let rt = tokio::runtime::Runtime::new().unwrap();
3400 let config = CodeConfig {
3401 providers: vec![ProviderConfig {
3402 name: "anthropic".to_string(),
3403 api_key: Some("test-key".to_string()),
3404 base_url: None,
3405 headers: std::collections::HashMap::new(),
3406 session_id_header: None,
3407 models: vec![],
3408 }],
3409 ..Default::default()
3410 };
3411 let result = rt.block_on(Agent::from_config(config));
3412 assert!(result.is_err());
3413 }
3414
3415 #[tokio::test]
3416 async fn test_history_empty_on_new_session() {
3417 let agent = Agent::from_config(test_config()).await.unwrap();
3418 let session = agent.session("/tmp/test-workspace", None).unwrap();
3419 assert!(session.history().is_empty());
3420 }
3421
3422 #[tokio::test]
3423 async fn test_session_options_with_agent_dir() {
3424 let opts = SessionOptions::new()
3425 .with_agent_dir("/tmp/agents")
3426 .with_agent_dir("/tmp/more-agents");
3427 assert_eq!(opts.agent_dirs.len(), 2);
3428 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
3429 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
3430 }
3431
3432 #[test]
3437 fn test_session_options_with_queue_config() {
3438 let qc = SessionQueueConfig::default().with_lane_features();
3439 let opts = SessionOptions::new().with_queue_config(qc.clone());
3440 assert!(opts.queue_config.is_some());
3441
3442 let config = opts.queue_config.unwrap();
3443 assert!(config.enable_dlq);
3444 assert!(config.enable_metrics);
3445 assert!(config.enable_alerts);
3446 assert_eq!(config.default_timeout_ms, Some(60_000));
3447 }
3448
3449 #[tokio::test(flavor = "multi_thread")]
3450 async fn test_session_with_queue_config() {
3451 let agent = Agent::from_config(test_config()).await.unwrap();
3452 let qc = SessionQueueConfig::default();
3453 let opts = SessionOptions::new().with_queue_config(qc);
3454 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
3455 assert!(session.is_ok());
3456 let session = session.unwrap();
3457 assert!(session.has_queue());
3458 }
3459
3460 #[tokio::test]
3461 async fn test_session_without_queue_config() {
3462 let agent = Agent::from_config(test_config()).await.unwrap();
3463 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
3464 assert!(!session.has_queue());
3465 }
3466
3467 #[tokio::test]
3468 async fn test_session_queue_stats_without_queue() {
3469 let agent = Agent::from_config(test_config()).await.unwrap();
3470 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
3471 let stats = session.queue_stats().await;
3472 assert_eq!(stats.total_pending, 0);
3474 assert_eq!(stats.total_active, 0);
3475 }
3476
3477 #[tokio::test(flavor = "multi_thread")]
3478 async fn test_session_queue_stats_with_queue() {
3479 let agent = Agent::from_config(test_config()).await.unwrap();
3480 let qc = SessionQueueConfig::default();
3481 let opts = SessionOptions::new().with_queue_config(qc);
3482 let session = agent
3483 .session("/tmp/test-workspace-qstats", Some(opts))
3484 .unwrap();
3485 let stats = session.queue_stats().await;
3486 assert_eq!(stats.total_pending, 0);
3488 assert_eq!(stats.total_active, 0);
3489 }
3490
3491 #[tokio::test(flavor = "multi_thread")]
3492 async fn test_session_pending_external_tasks_empty() {
3493 let agent = Agent::from_config(test_config()).await.unwrap();
3494 let qc = SessionQueueConfig::default();
3495 let opts = SessionOptions::new().with_queue_config(qc);
3496 let session = agent
3497 .session("/tmp/test-workspace-ext", Some(opts))
3498 .unwrap();
3499 let tasks = session.pending_external_tasks().await;
3500 assert!(tasks.is_empty());
3501 }
3502
3503 #[tokio::test(flavor = "multi_thread")]
3504 async fn test_session_dead_letters_empty() {
3505 let agent = Agent::from_config(test_config()).await.unwrap();
3506 let qc = SessionQueueConfig::default().with_dlq(Some(100));
3507 let opts = SessionOptions::new().with_queue_config(qc);
3508 let session = agent
3509 .session("/tmp/test-workspace-dlq", Some(opts))
3510 .unwrap();
3511 let dead = session.dead_letters().await;
3512 assert!(dead.is_empty());
3513 }
3514
3515 #[tokio::test(flavor = "multi_thread")]
3516 async fn test_session_queue_metrics_disabled() {
3517 let agent = Agent::from_config(test_config()).await.unwrap();
3518 let qc = SessionQueueConfig::default();
3520 let opts = SessionOptions::new().with_queue_config(qc);
3521 let session = agent
3522 .session("/tmp/test-workspace-nomet", Some(opts))
3523 .unwrap();
3524 let metrics = session.queue_metrics().await;
3525 assert!(metrics.is_none());
3526 }
3527
3528 #[tokio::test(flavor = "multi_thread")]
3529 async fn test_session_queue_metrics_enabled() {
3530 let agent = Agent::from_config(test_config()).await.unwrap();
3531 let qc = SessionQueueConfig::default().with_metrics();
3532 let opts = SessionOptions::new().with_queue_config(qc);
3533 let session = agent
3534 .session("/tmp/test-workspace-met", Some(opts))
3535 .unwrap();
3536 let metrics = session.queue_metrics().await;
3537 assert!(metrics.is_some());
3538 }
3539
3540 #[tokio::test(flavor = "multi_thread")]
3541 async fn test_session_set_lane_handler() {
3542 let agent = Agent::from_config(test_config()).await.unwrap();
3543 let qc = SessionQueueConfig::default();
3544 let opts = SessionOptions::new().with_queue_config(qc);
3545 let session = agent
3546 .session("/tmp/test-workspace-handler", Some(opts))
3547 .unwrap();
3548
3549 session
3551 .set_lane_handler(
3552 SessionLane::Execute,
3553 LaneHandlerConfig {
3554 mode: crate::queue::TaskHandlerMode::External,
3555 timeout_ms: 30_000,
3556 },
3557 )
3558 .await;
3559
3560 }
3563
3564 #[tokio::test(flavor = "multi_thread")]
3569 async fn test_session_has_id() {
3570 let agent = Agent::from_config(test_config()).await.unwrap();
3571 let session = agent.session("/tmp/test-ws-id", None).unwrap();
3572 assert!(!session.session_id().is_empty());
3574 assert_eq!(session.session_id().len(), 36); }
3576
3577 #[tokio::test(flavor = "multi_thread")]
3578 async fn test_session_explicit_id() {
3579 let agent = Agent::from_config(test_config()).await.unwrap();
3580 let opts = SessionOptions::new().with_session_id("my-session-42");
3581 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
3582 assert_eq!(session.session_id(), "my-session-42");
3583 }
3584
3585 #[tokio::test(flavor = "multi_thread")]
3586 async fn test_session_save_no_store() {
3587 let agent = Agent::from_config(test_config()).await.unwrap();
3588 let session = agent.session("/tmp/test-ws-save", None).unwrap();
3589 session.save().await.unwrap();
3591 }
3592
3593 #[tokio::test(flavor = "multi_thread")]
3594 async fn test_session_save_and_load() {
3595 let store = Arc::new(crate::store::MemorySessionStore::new());
3596 let agent = Agent::from_config(test_config()).await.unwrap();
3597
3598 let opts = SessionOptions::new()
3599 .with_session_store(store.clone())
3600 .with_session_id("persist-test");
3601 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
3602
3603 session.save().await.unwrap();
3605
3606 assert!(store.exists("persist-test").await.unwrap());
3608
3609 let data = store.load("persist-test").await.unwrap().unwrap();
3610 assert_eq!(data.id, "persist-test");
3611 assert!(data.messages.is_empty());
3612 }
3613
3614 #[tokio::test(flavor = "multi_thread")]
3615 async fn test_session_save_with_history() {
3616 let store = Arc::new(crate::store::MemorySessionStore::new());
3617 let agent = Agent::from_config(test_config()).await.unwrap();
3618
3619 let opts = SessionOptions::new()
3620 .with_session_store(store.clone())
3621 .with_session_id("history-test");
3622 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
3623
3624 {
3626 let mut h = session.history.write().unwrap();
3627 h.push(Message::user("Hello"));
3628 h.push(Message::user("How are you?"));
3629 }
3630
3631 session.save().await.unwrap();
3632
3633 let data = store.load("history-test").await.unwrap().unwrap();
3634 assert_eq!(data.messages.len(), 2);
3635 }
3636
3637 #[tokio::test(flavor = "multi_thread")]
3638 async fn test_resume_session() {
3639 let store = Arc::new(crate::store::MemorySessionStore::new());
3640 let agent = Agent::from_config(test_config()).await.unwrap();
3641
3642 let opts = SessionOptions::new()
3644 .with_session_store(store.clone())
3645 .with_session_id("resume-test");
3646 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
3647 {
3648 let mut h = session.history.write().unwrap();
3649 h.push(Message::user("What is Rust?"));
3650 h.push(Message::user("Tell me more"));
3651 }
3652 session.save().await.unwrap();
3653
3654 let opts2 = SessionOptions::new().with_session_store(store.clone());
3656 let resumed = agent.resume_session("resume-test", opts2).unwrap();
3657
3658 assert_eq!(resumed.session_id(), "resume-test");
3659 let history = resumed.history();
3660 assert_eq!(history.len(), 2);
3661 assert_eq!(history[0].text(), "What is Rust?");
3662 }
3663
3664 #[tokio::test(flavor = "multi_thread")]
3665 async fn test_resume_session_not_found() {
3666 let store = Arc::new(crate::store::MemorySessionStore::new());
3667 let agent = Agent::from_config(test_config()).await.unwrap();
3668
3669 let opts = SessionOptions::new().with_session_store(store.clone());
3670 let result = agent.resume_session("nonexistent", opts);
3671 assert!(result.is_err());
3672 assert!(result.unwrap_err().to_string().contains("not found"));
3673 }
3674
3675 #[tokio::test(flavor = "multi_thread")]
3676 async fn test_resume_session_no_store() {
3677 let agent = Agent::from_config(test_config()).await.unwrap();
3678 let opts = SessionOptions::new();
3679 let result = agent.resume_session("any-id", opts);
3680 assert!(result.is_err());
3681 assert!(result.unwrap_err().to_string().contains("session_store"));
3682 }
3683
3684 #[tokio::test(flavor = "multi_thread")]
3685 async fn test_file_session_store_persistence() {
3686 let dir = tempfile::TempDir::new().unwrap();
3687 let store = Arc::new(
3688 crate::store::FileSessionStore::new(dir.path())
3689 .await
3690 .unwrap(),
3691 );
3692 let agent = Agent::from_config(test_config()).await.unwrap();
3693
3694 let opts = SessionOptions::new()
3696 .with_session_store(store.clone())
3697 .with_session_id("file-persist");
3698 let session = agent
3699 .session("/tmp/test-ws-file-persist", Some(opts))
3700 .unwrap();
3701 {
3702 let mut h = session.history.write().unwrap();
3703 h.push(Message::user("test message"));
3704 }
3705 session.save().await.unwrap();
3706
3707 let store2 = Arc::new(
3709 crate::store::FileSessionStore::new(dir.path())
3710 .await
3711 .unwrap(),
3712 );
3713 let data = store2.load("file-persist").await.unwrap().unwrap();
3714 assert_eq!(data.messages.len(), 1);
3715 }
3716
3717 #[tokio::test(flavor = "multi_thread")]
3718 async fn test_session_options_builders() {
3719 let opts = SessionOptions::new()
3720 .with_session_id("test-id")
3721 .with_auto_save(true);
3722 assert_eq!(opts.session_id, Some("test-id".to_string()));
3723 assert!(opts.auto_save);
3724 }
3725
3726 #[test]
3731 fn test_session_options_with_sandbox_sets_config() {
3732 use crate::sandbox::SandboxConfig;
3733 let cfg = SandboxConfig {
3734 image: "ubuntu:22.04".into(),
3735 memory_mb: 1024,
3736 ..SandboxConfig::default()
3737 };
3738 let opts = SessionOptions::new().with_sandbox(cfg);
3739 assert!(opts.sandbox_config.is_some());
3740 let sc = opts.sandbox_config.unwrap();
3741 assert_eq!(sc.image, "ubuntu:22.04");
3742 assert_eq!(sc.memory_mb, 1024);
3743 }
3744
3745 #[test]
3746 fn test_session_options_default_has_no_sandbox() {
3747 let opts = SessionOptions::default();
3748 assert!(opts.sandbox_config.is_none());
3749 }
3750
3751 #[tokio::test]
3752 async fn test_session_debug_includes_sandbox_config() {
3753 use crate::sandbox::SandboxConfig;
3754 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
3755 let debug = format!("{:?}", opts);
3756 assert!(debug.contains("sandbox_config"));
3757 }
3758
3759 #[tokio::test]
3760 async fn test_session_build_with_sandbox_config_no_feature_warn() {
3761 let agent = Agent::from_config(test_config()).await.unwrap();
3764 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
3765 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
3767 assert!(session.is_ok());
3768 }
3769
3770 #[tokio::test(flavor = "multi_thread")]
3775 async fn test_session_with_memory_store() {
3776 use a3s_memory::InMemoryStore;
3777 let store = Arc::new(InMemoryStore::new());
3778 let agent = Agent::from_config(test_config()).await.unwrap();
3779 let opts = SessionOptions::new().with_memory(store);
3780 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
3781 assert!(session.memory().is_some());
3782 }
3783
3784 #[tokio::test(flavor = "multi_thread")]
3785 async fn test_session_without_memory_store() {
3786 let agent = Agent::from_config(test_config()).await.unwrap();
3787 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
3788 assert!(session.memory().is_none());
3789 }
3790
3791 #[tokio::test(flavor = "multi_thread")]
3792 async fn test_session_memory_wired_into_config() {
3793 use a3s_memory::InMemoryStore;
3794 let store = Arc::new(InMemoryStore::new());
3795 let agent = Agent::from_config(test_config()).await.unwrap();
3796 let opts = SessionOptions::new().with_memory(store);
3797 let session = agent
3798 .session("/tmp/test-ws-mem-config", Some(opts))
3799 .unwrap();
3800 assert!(session.memory().is_some());
3802 }
3803
3804 #[tokio::test(flavor = "multi_thread")]
3805 async fn test_session_with_file_memory() {
3806 let dir = tempfile::TempDir::new().unwrap();
3807 let agent = Agent::from_config(test_config()).await.unwrap();
3808 let opts = SessionOptions::new().with_file_memory(dir.path());
3809 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
3810 assert!(session.memory().is_some());
3811 }
3812
3813 #[tokio::test(flavor = "multi_thread")]
3814 async fn test_memory_remember_and_recall() {
3815 use a3s_memory::InMemoryStore;
3816 let store = Arc::new(InMemoryStore::new());
3817 let agent = Agent::from_config(test_config()).await.unwrap();
3818 let opts = SessionOptions::new().with_memory(store);
3819 let session = agent
3820 .session("/tmp/test-ws-mem-recall", Some(opts))
3821 .unwrap();
3822
3823 let memory = session.memory().unwrap();
3824 memory
3825 .remember_success("write a file", &["write".to_string()], "done")
3826 .await
3827 .unwrap();
3828
3829 let results = memory.recall_similar("write", 5).await.unwrap();
3830 assert!(!results.is_empty());
3831 let stats = memory.stats().await.unwrap();
3832 assert_eq!(stats.long_term_count, 1);
3833 }
3834
3835 #[tokio::test(flavor = "multi_thread")]
3840 async fn test_session_tool_timeout_configured() {
3841 let agent = Agent::from_config(test_config()).await.unwrap();
3842 let opts = SessionOptions::new().with_tool_timeout(5000);
3843 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
3844 assert!(!session.id().is_empty());
3845 }
3846
3847 #[tokio::test(flavor = "multi_thread")]
3852 async fn test_session_without_queue_builds_ok() {
3853 let agent = Agent::from_config(test_config()).await.unwrap();
3854 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
3855 assert!(!session.id().is_empty());
3856 }
3857
3858 #[tokio::test(flavor = "multi_thread")]
3863 async fn test_concurrent_history_reads() {
3864 let agent = Agent::from_config(test_config()).await.unwrap();
3865 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
3866
3867 let handles: Vec<_> = (0..10)
3868 .map(|_| {
3869 let s = Arc::clone(&session);
3870 tokio::spawn(async move { s.history().len() })
3871 })
3872 .collect();
3873
3874 for h in handles {
3875 h.await.unwrap();
3876 }
3877 }
3878
3879 #[tokio::test(flavor = "multi_thread")]
3884 async fn test_session_no_init_warning_without_file_memory() {
3885 let agent = Agent::from_config(test_config()).await.unwrap();
3886 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
3887 assert!(session.init_warning().is_none());
3888 }
3889
3890 #[tokio::test(flavor = "multi_thread")]
3891 async fn test_register_agent_dir_loads_agents_into_live_session() {
3892 let temp_dir = tempfile::tempdir().unwrap();
3893
3894 std::fs::write(
3896 temp_dir.path().join("my-agent.yaml"),
3897 "name: my-dynamic-agent\ndescription: Dynamically registered agent\n",
3898 )
3899 .unwrap();
3900
3901 let agent = Agent::from_config(test_config()).await.unwrap();
3902 let session = agent.session(".", None).unwrap();
3903
3904 assert!(!session.agent_registry.exists("my-dynamic-agent"));
3906
3907 let count = session.register_agent_dir(temp_dir.path());
3908 assert_eq!(count, 1);
3909 assert!(session.agent_registry.exists("my-dynamic-agent"));
3910 }
3911
3912 #[tokio::test(flavor = "multi_thread")]
3913 async fn test_register_agent_dir_empty_dir_returns_zero() {
3914 let temp_dir = tempfile::tempdir().unwrap();
3915 let agent = Agent::from_config(test_config()).await.unwrap();
3916 let session = agent.session(".", None).unwrap();
3917 let count = session.register_agent_dir(temp_dir.path());
3918 assert_eq!(count, 0);
3919 }
3920
3921 #[tokio::test(flavor = "multi_thread")]
3922 async fn test_register_agent_dir_nonexistent_returns_zero() {
3923 let agent = Agent::from_config(test_config()).await.unwrap();
3924 let session = agent.session(".", None).unwrap();
3925 let count = session.register_agent_dir(std::path::Path::new("/nonexistent/path/abc"));
3926 assert_eq!(count, 0);
3927 }
3928
3929 #[tokio::test(flavor = "multi_thread")]
3930 async fn test_session_with_mcp_manager_builds_ok() {
3931 use crate::mcp::manager::McpManager;
3932 let mcp = Arc::new(McpManager::new());
3933 let agent = Agent::from_config(test_config()).await.unwrap();
3934 let opts = SessionOptions::new().with_mcp(mcp);
3935 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
3937 assert!(!session.id().is_empty());
3938 }
3939
3940 #[test]
3941 fn test_session_command_is_pub() {
3942 use crate::SessionCommand;
3944 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
3945 }
3946
3947 #[tokio::test(flavor = "multi_thread")]
3948 async fn test_session_submit_with_queue_executes() {
3949 let agent = Agent::from_config(test_config()).await.unwrap();
3950 let qc = SessionQueueConfig::default();
3951 let opts = SessionOptions::new().with_queue_config(qc);
3952 let session = agent
3953 .session("/tmp/test-ws-submit-exec", Some(opts))
3954 .unwrap();
3955
3956 struct Echo(serde_json::Value);
3957 #[async_trait::async_trait]
3958 impl crate::queue::SessionCommand for Echo {
3959 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
3960 Ok(self.0.clone())
3961 }
3962 fn command_type(&self) -> &str {
3963 "echo"
3964 }
3965 }
3966
3967 let rx = session
3968 .submit(
3969 SessionLane::Query,
3970 Box::new(Echo(serde_json::json!({"ok": true}))),
3971 )
3972 .await
3973 .expect("submit should succeed with queue configured");
3974
3975 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
3976 .await
3977 .expect("timed out waiting for command result")
3978 .expect("channel closed before result")
3979 .expect("command returned an error");
3980
3981 assert_eq!(result["ok"], true);
3982 }
3983}