1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{
21 CommandAction, CommandContext, CommandRegistry, CronCancelCommand, CronListCommand, LoopCommand,
22};
23use crate::config::CodeConfig;
24use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
25use crate::hitl::PendingConfirmationInfo;
26use crate::llm::{LlmClient, Message};
27use crate::prompts::{PlanningMode, SystemPromptSlots};
28use crate::queue::{
29 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
30 SessionQueueStats,
31};
32use crate::scheduler::{CronScheduler, ScheduledFire};
33use crate::session_lane_queue::SessionLaneQueue;
34use crate::task::{ProgressTracker, TaskManager};
35use crate::text::truncate_utf8;
36use crate::tools::{ToolContext, ToolExecutor};
37use a3s_lane::{DeadLetter, MetricsSnapshot};
38use a3s_memory::{FileMemoryStore, MemoryStore};
39use anyhow::Context;
40use std::collections::HashMap;
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicBool, Ordering};
43use std::sync::{Arc, RwLock};
44use tokio::sync::{broadcast, mpsc};
45use tokio::task::JoinHandle;
46
47fn safe_canonicalize(path: &Path) -> PathBuf {
50 match std::fs::canonicalize(path) {
51 Ok(p) => strip_unc_prefix(p),
52 Err(_) => path.to_path_buf(),
53 }
54}
55
56fn strip_unc_prefix(path: PathBuf) -> PathBuf {
59 #[cfg(windows)]
60 {
61 let s = path.to_string_lossy();
62 if let Some(stripped) = s.strip_prefix(r"\\?\") {
63 return PathBuf::from(stripped);
64 }
65 }
66 path
67}
68
69#[derive(Debug, Clone)]
75pub struct ToolCallResult {
76 pub name: String,
77 pub output: String,
78 pub exit_code: i32,
79 pub metadata: Option<serde_json::Value>,
80}
81
82#[derive(Clone, Default)]
88pub struct SessionOptions {
89 pub model: Option<String>,
91 pub agent_dirs: Vec<PathBuf>,
94 pub queue_config: Option<SessionQueueConfig>,
99 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
101 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
103 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
105 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
107 pub planning_mode: PlanningMode,
109 pub goal_tracking: bool,
111 pub skill_dirs: Vec<PathBuf>,
114 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
116 pub memory_store: Option<Arc<dyn MemoryStore>>,
118 pub(crate) file_memory_dir: Option<PathBuf>,
120 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
122 pub session_id: Option<String>,
124 pub auto_save: bool,
126 pub max_parse_retries: Option<u32>,
129 pub tool_timeout_ms: Option<u64>,
132 pub circuit_breaker_threshold: Option<u32>,
136 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
144 pub sandbox_handle: Option<Arc<dyn crate::sandbox::BashSandbox>>,
150 pub auto_compact: bool,
152 pub auto_compact_threshold: Option<f32>,
155 pub continuation_enabled: Option<bool>,
158 pub max_continuation_turns: Option<u32>,
161 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
166 pub temperature: Option<f32>,
168 pub thinking_budget: Option<usize>,
170 pub max_tool_rounds: Option<usize>,
176 pub prompt_slots: Option<SystemPromptSlots>,
182 pub hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
189 pub plugins: Vec<std::sync::Arc<dyn crate::plugin::Plugin>>,
198}
199
200impl std::fmt::Debug for SessionOptions {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 f.debug_struct("SessionOptions")
203 .field("model", &self.model)
204 .field("agent_dirs", &self.agent_dirs)
205 .field("skill_dirs", &self.skill_dirs)
206 .field("queue_config", &self.queue_config)
207 .field("security_provider", &self.security_provider.is_some())
208 .field("context_providers", &self.context_providers.len())
209 .field("confirmation_manager", &self.confirmation_manager.is_some())
210 .field("permission_checker", &self.permission_checker.is_some())
211 .field("planning_mode", &self.planning_mode)
212 .field("goal_tracking", &self.goal_tracking)
213 .field(
214 "skill_registry",
215 &self
216 .skill_registry
217 .as_ref()
218 .map(|r| format!("{} skills", r.len())),
219 )
220 .field("memory_store", &self.memory_store.is_some())
221 .field("session_store", &self.session_store.is_some())
222 .field("session_id", &self.session_id)
223 .field("auto_save", &self.auto_save)
224 .field("max_parse_retries", &self.max_parse_retries)
225 .field("tool_timeout_ms", &self.tool_timeout_ms)
226 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
227 .field("sandbox_config", &self.sandbox_config)
228 .field("auto_compact", &self.auto_compact)
229 .field("auto_compact_threshold", &self.auto_compact_threshold)
230 .field("continuation_enabled", &self.continuation_enabled)
231 .field("max_continuation_turns", &self.max_continuation_turns)
232 .field(
233 "plugins",
234 &self.plugins.iter().map(|p| p.name()).collect::<Vec<_>>(),
235 )
236 .field("mcp_manager", &self.mcp_manager.is_some())
237 .field("temperature", &self.temperature)
238 .field("thinking_budget", &self.thinking_budget)
239 .field("max_tool_rounds", &self.max_tool_rounds)
240 .field("prompt_slots", &self.prompt_slots.is_some())
241 .finish()
242 }
243}
244
245impl SessionOptions {
246 pub fn new() -> Self {
247 Self::default()
248 }
249
250 pub fn with_plugin(mut self, plugin: impl crate::plugin::Plugin + 'static) -> Self {
255 self.plugins.push(std::sync::Arc::new(plugin));
256 self
257 }
258
259 pub fn with_model(mut self, model: impl Into<String>) -> Self {
260 self.model = Some(model.into());
261 self
262 }
263
264 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
265 self.agent_dirs.push(dir.into());
266 self
267 }
268
269 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
270 self.queue_config = Some(config);
271 self
272 }
273
274 pub fn with_default_security(mut self) -> Self {
276 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
277 self
278 }
279
280 pub fn with_security_provider(
282 mut self,
283 provider: Arc<dyn crate::security::SecurityProvider>,
284 ) -> Self {
285 self.security_provider = Some(provider);
286 self
287 }
288
289 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
291 let config = crate::context::FileSystemContextConfig::new(root_path);
292 self.context_providers
293 .push(Arc::new(crate::context::FileSystemContextProvider::new(
294 config,
295 )));
296 self
297 }
298
299 pub fn with_context_provider(
301 mut self,
302 provider: Arc<dyn crate::context::ContextProvider>,
303 ) -> Self {
304 self.context_providers.push(provider);
305 self
306 }
307
308 pub fn with_confirmation_manager(
310 mut self,
311 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
312 ) -> Self {
313 self.confirmation_manager = Some(manager);
314 self
315 }
316
317 pub fn with_permission_checker(
319 mut self,
320 checker: Arc<dyn crate::permissions::PermissionChecker>,
321 ) -> Self {
322 self.permission_checker = Some(checker);
323 self
324 }
325
326 pub fn with_permissive_policy(self) -> Self {
333 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
334 }
335
336 pub fn with_planning_mode(mut self, mode: PlanningMode) -> Self {
338 self.planning_mode = mode;
339 self
340 }
341
342 pub fn with_planning(mut self, enabled: bool) -> Self {
344 self.planning_mode = if enabled {
345 PlanningMode::Enabled
346 } else {
347 PlanningMode::Disabled
348 };
349 self
350 }
351
352 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
354 self.goal_tracking = enabled;
355 self
356 }
357
358 pub fn with_builtin_skills(mut self) -> Self {
360 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
361 self
362 }
363
364 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
366 self.skill_registry = Some(registry);
367 self
368 }
369
370 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
373 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
374 self
375 }
376
377 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
379 let registry = self
380 .skill_registry
381 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
382 if let Err(e) = registry.load_from_dir(&dir) {
383 tracing::warn!(
384 dir = %dir.as_ref().display(),
385 error = %e,
386 "Failed to load skills from directory — continuing without them"
387 );
388 }
389 self.skill_registry = Some(registry);
390 self
391 }
392
393 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
395 self.memory_store = Some(store);
396 self
397 }
398
399 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
405 self.file_memory_dir = Some(dir.into());
406 self
407 }
408
409 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
411 self.session_store = Some(store);
412 self
413 }
414
415 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
417 let dir = dir.into();
418 match tokio::runtime::Handle::try_current() {
419 Ok(handle) => {
420 match tokio::task::block_in_place(|| {
421 handle.block_on(crate::store::FileSessionStore::new(dir))
422 }) {
423 Ok(store) => {
424 self.session_store =
425 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
426 }
427 Err(e) => {
428 tracing::warn!("Failed to create file session store: {}", e);
429 }
430 }
431 }
432 Err(_) => {
433 tracing::warn!(
434 "No async runtime available for file session store — persistence disabled"
435 );
436 }
437 }
438 self
439 }
440
441 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
443 self.session_id = Some(id.into());
444 self
445 }
446
447 pub fn with_auto_save(mut self, enabled: bool) -> Self {
449 self.auto_save = enabled;
450 self
451 }
452
453 pub fn with_parse_retries(mut self, max: u32) -> Self {
459 self.max_parse_retries = Some(max);
460 self
461 }
462
463 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
469 self.tool_timeout_ms = Some(timeout_ms);
470 self
471 }
472
473 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
479 self.circuit_breaker_threshold = Some(threshold);
480 self
481 }
482
483 pub fn with_resilience_defaults(self) -> Self {
489 self.with_parse_retries(2)
490 .with_tool_timeout(120_000)
491 .with_circuit_breaker(3)
492 }
493
494 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
513 self.sandbox_config = Some(config);
514 self
515 }
516
517 pub fn with_sandbox_handle(mut self, handle: Arc<dyn crate::sandbox::BashSandbox>) -> Self {
525 self.sandbox_handle = Some(handle);
526 self
527 }
528
529 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
534 self.auto_compact = enabled;
535 self
536 }
537
538 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
540 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
541 self
542 }
543
544 pub fn with_continuation(mut self, enabled: bool) -> Self {
549 self.continuation_enabled = Some(enabled);
550 self
551 }
552
553 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
555 self.max_continuation_turns = Some(turns);
556 self
557 }
558
559 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
564 self.mcp_manager = Some(manager);
565 self
566 }
567
568 pub fn with_temperature(mut self, temperature: f32) -> Self {
569 self.temperature = Some(temperature);
570 self
571 }
572
573 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
574 self.thinking_budget = Some(budget);
575 self
576 }
577
578 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
583 self.max_tool_rounds = Some(rounds);
584 self
585 }
586
587 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
592 self.prompt_slots = Some(slots);
593 self
594 }
595
596 pub fn with_hook_executor(mut self, executor: Arc<dyn crate::hooks::HookExecutor>) -> Self {
602 self.hook_executor = Some(executor);
603 self
604 }
605}
606
607pub struct Agent {
616 llm_client: Arc<dyn LlmClient>,
617 code_config: CodeConfig,
618 config: AgentConfig,
619 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
621 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
624}
625
626impl std::fmt::Debug for Agent {
627 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628 f.debug_struct("Agent").finish()
629 }
630}
631
632impl Agent {
633 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
637 let source = config_source.into();
638
639 let expanded = if let Some(rest) = source.strip_prefix("~/") {
641 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
642 if let Some(home) = home {
643 PathBuf::from(home).join(rest).display().to_string()
644 } else {
645 source.clone()
646 }
647 } else {
648 source.clone()
649 };
650
651 let path = Path::new(&expanded);
652
653 let config = if matches!(
654 path.extension().and_then(|ext| ext.to_str()),
655 Some("hcl" | "json")
656 ) {
657 if !path.exists() {
658 return Err(CodeError::Config(format!(
659 "Config file not found: {}",
660 path.display()
661 )));
662 }
663
664 CodeConfig::from_file(path)
665 .with_context(|| format!("Failed to load config: {}", path.display()))?
666 } else {
667 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
669 };
670
671 Self::from_config(config).await
672 }
673
674 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
679 Self::new(config_source).await
680 }
681
682 pub async fn from_config(config: CodeConfig) -> Result<Self> {
684 let llm_config = config
685 .default_llm_config()
686 .context("default_model must be set in 'provider/model' format with a valid API key")?;
687 let llm_client = crate::llm::create_client_with_config(llm_config);
688
689 let agent_config = AgentConfig {
690 max_tool_rounds: config
691 .max_tool_rounds
692 .unwrap_or(AgentConfig::default().max_tool_rounds),
693 ..AgentConfig::default()
694 };
695
696 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
698 (None, vec![])
699 } else {
700 let manager = Arc::new(crate::mcp::manager::McpManager::new());
701 for server in &config.mcp_servers {
702 if !server.enabled {
703 continue;
704 }
705 manager.register_server(server.clone()).await;
706 if let Err(e) = manager.connect(&server.name).await {
707 tracing::warn!(
708 server = %server.name,
709 error = %e,
710 "Failed to connect to MCP server — skipping"
711 );
712 }
713 }
714 let tools = manager.get_all_tools().await;
716 (Some(manager), tools)
717 };
718
719 let mut agent = Agent {
720 llm_client,
721 code_config: config,
722 config: agent_config,
723 global_mcp,
724 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
725 };
726
727 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
729 for dir in &agent.code_config.skill_dirs.clone() {
730 if let Err(e) = registry.load_from_dir(dir) {
731 tracing::warn!(
732 dir = %dir.display(),
733 error = %e,
734 "Failed to load skills from directory — skipping"
735 );
736 }
737 }
738 agent.config.skill_registry = Some(registry);
739
740 Ok(agent)
741 }
742
743 pub async fn refresh_mcp_tools(&self) -> Result<()> {
751 if let Some(ref mcp) = self.global_mcp {
752 let fresh = mcp.get_all_tools().await;
753 *self
754 .global_mcp_tools
755 .lock()
756 .expect("global_mcp_tools lock poisoned") = fresh;
757 }
758 Ok(())
759 }
760
761 pub fn session(
766 &self,
767 workspace: impl Into<String>,
768 options: Option<SessionOptions>,
769 ) -> Result<AgentSession> {
770 let opts = options.unwrap_or_default();
771
772 let mut merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
775 (Some(global), Some(session)) => {
776 let global = Arc::clone(global);
777 let session_mgr = Arc::clone(session);
778 match tokio::runtime::Handle::try_current() {
779 Ok(handle) => {
780 let global_for_merge = Arc::clone(&global);
781 tokio::task::block_in_place(|| {
782 handle.block_on(async move {
783 for config in session_mgr.all_configs().await {
784 let name = config.name.clone();
785 global_for_merge.register_server(config).await;
786 if let Err(e) = global_for_merge.connect(&name).await {
787 tracing::warn!(
788 server = %name,
789 error = %e,
790 "Failed to connect session-level MCP server — skipping"
791 );
792 }
793 }
794 })
795 });
796 }
797 Err(_) => {
798 tracing::warn!(
799 "No async runtime available to merge session-level MCP servers \
800 into global manager — session MCP servers will not be available"
801 );
802 }
803 }
804 SessionOptions {
805 mcp_manager: Some(Arc::clone(&global)),
806 ..opts
807 }
808 }
809 (Some(global), None) => SessionOptions {
810 mcp_manager: Some(Arc::clone(global)),
811 ..opts
812 },
813 _ => opts,
814 };
815
816 let session_id = merged_opts
817 .session_id
818 .clone()
819 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
820 merged_opts.session_id = Some(session_id.clone());
821 let llm_client = self.resolve_session_llm_client(&merged_opts, Some(&session_id))?;
822
823 self.build_session(workspace.into(), llm_client, &merged_opts)
824 }
825
826 pub fn session_for_agent(
841 &self,
842 workspace: impl Into<String>,
843 def: &crate::subagent::AgentDefinition,
844 extra: Option<SessionOptions>,
845 ) -> Result<AgentSession> {
846 let mut opts = extra.unwrap_or_default();
847
848 if opts.permission_checker.is_none()
850 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
851 {
852 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
853 }
854
855 if opts.max_tool_rounds.is_none() {
857 if let Some(steps) = def.max_steps {
858 opts.max_tool_rounds = Some(steps);
859 }
860 }
861
862 if opts.model.is_none() {
864 if let Some(ref m) = def.model {
865 let provider = m.provider.as_deref().unwrap_or("anthropic");
866 opts.model = Some(format!("{}/{}", provider, m.model));
867 }
868 }
869
870 if let Some(ref prompt) = def.prompt {
877 let slots = opts
878 .prompt_slots
879 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
880 if slots.extra.is_none() {
881 slots.extra = Some(prompt.clone());
882 }
883 }
884
885 self.session(workspace, Some(opts))
886 }
887
888 pub fn resume_session(
896 &self,
897 session_id: &str,
898 options: SessionOptions,
899 ) -> Result<AgentSession> {
900 let store = options.session_store.as_ref().ok_or_else(|| {
901 crate::error::CodeError::Session(
902 "resume_session requires a session_store in SessionOptions".to_string(),
903 )
904 })?;
905
906 let data = match tokio::runtime::Handle::try_current() {
908 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
909 .map_err(|e| {
910 crate::error::CodeError::Session(format!(
911 "Failed to load session {}: {}",
912 session_id, e
913 ))
914 })?,
915 Err(_) => {
916 return Err(crate::error::CodeError::Session(
917 "No async runtime available for session resume".to_string(),
918 ))
919 }
920 };
921
922 let data = data.ok_or_else(|| {
923 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
924 })?;
925
926 let mut opts = options;
928 opts.session_id = Some(data.id.clone());
929 let llm_client = self.resolve_session_llm_client(&opts, Some(&data.id))?;
930
931 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
932
933 *write_or_recover(&session.history) = data.messages;
935
936 Ok(session)
937 }
938
939 fn resolve_session_llm_client(
940 &self,
941 opts: &SessionOptions,
942 session_id: Option<&str>,
943 ) -> Result<Arc<dyn LlmClient>> {
944 let model_ref = if let Some(ref model) = opts.model {
945 model.as_str()
946 } else {
947 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
948 tracing::warn!(
949 "temperature/thinking_budget set without model override — these will be ignored. \
950 Use with_model() to apply LLM parameter overrides."
951 );
952 }
953 self.code_config
954 .default_model
955 .as_deref()
956 .context("default_model must be set in 'provider/model' format")?
957 };
958
959 let (provider_name, model_id) = model_ref
960 .split_once('/')
961 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
962
963 let mut llm_config = self
964 .code_config
965 .llm_config(provider_name, model_id)
966 .with_context(|| {
967 format!("provider '{provider_name}' or model '{model_id}' not found in config")
968 })?;
969
970 if opts.model.is_some() {
971 if let Some(temp) = opts.temperature {
972 llm_config = llm_config.with_temperature(temp);
973 }
974 if let Some(budget) = opts.thinking_budget {
975 llm_config = llm_config.with_thinking_budget(budget);
976 }
977 }
978
979 if let Some(session_id) = session_id {
980 llm_config = llm_config.with_session_id(session_id);
981 }
982
983 Ok(crate::llm::create_client_with_config(llm_config))
984 }
985
986 fn build_session(
987 &self,
988 workspace: String,
989 llm_client: Arc<dyn LlmClient>,
990 opts: &SessionOptions,
991 ) -> Result<AgentSession> {
992 let canonical = safe_canonicalize(Path::new(&workspace));
993
994 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
995
996 if let Some(ref search_config) = self.code_config.search {
998 tool_executor
999 .registry()
1000 .set_search_config(search_config.clone());
1001 }
1002
1003 let agent_registry = {
1007 use crate::subagent::{load_agents_from_dir, AgentRegistry};
1008 use crate::tools::register_task_with_mcp;
1009 let registry = AgentRegistry::new();
1010 for dir in self
1011 .code_config
1012 .agent_dirs
1013 .iter()
1014 .chain(opts.agent_dirs.iter())
1015 {
1016 for agent in load_agents_from_dir(dir) {
1017 registry.register(agent);
1018 }
1019 }
1020 let registry = Arc::new(registry);
1021 register_task_with_mcp(
1022 tool_executor.registry(),
1023 Arc::clone(&llm_client),
1024 Arc::clone(®istry),
1025 canonical.display().to_string(),
1026 opts.mcp_manager.clone(),
1027 );
1028 registry
1029 };
1030
1031 if let Some(ref mcp) = opts.mcp_manager {
1034 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
1037 Arc::as_ptr(mcp),
1038 self.global_mcp
1039 .as_ref()
1040 .map(Arc::as_ptr)
1041 .unwrap_or(std::ptr::null()),
1042 ) {
1043 self.global_mcp_tools
1045 .lock()
1046 .expect("global_mcp_tools lock poisoned")
1047 .clone()
1048 } else {
1049 match tokio::runtime::Handle::try_current() {
1051 Ok(handle) => {
1052 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
1053 }
1054 Err(_) => {
1055 tracing::warn!(
1056 "No async runtime available for session-level MCP tools — \
1057 MCP tools will not be registered"
1058 );
1059 vec![]
1060 }
1061 }
1062 };
1063
1064 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
1065 std::collections::HashMap::new();
1066 for (server, tool) in all_tools {
1067 by_server.entry(server).or_default().push(tool);
1068 }
1069 for (server_name, tools) in by_server {
1070 for tool in
1071 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
1072 {
1073 tool_executor.register_dynamic_tool(tool);
1074 }
1075 }
1076 }
1077
1078 let tool_defs = tool_executor.definitions();
1079
1080 let mut prompt_slots = opts
1082 .prompt_slots
1083 .clone()
1084 .unwrap_or_else(|| self.config.prompt_slots.clone());
1085
1086 let agents_md_path = canonical.join("AGENTS.md");
1088 if agents_md_path.exists() && agents_md_path.is_file() {
1089 match std::fs::read_to_string(&agents_md_path) {
1090 Ok(content) if !content.trim().is_empty() => {
1091 tracing::info!(
1092 path = %agents_md_path.display(),
1093 "Auto-loaded AGENTS.md from workspace root"
1094 );
1095 prompt_slots.extra = match prompt_slots.extra {
1096 Some(existing) => Some(format!(
1097 "{}\n\n# Project Instructions (AGENTS.md)\n\n{}",
1098 existing, content
1099 )),
1100 None => Some(format!("# Project Instructions (AGENTS.md)\n\n{}", content)),
1101 };
1102 }
1103 Ok(_) => {
1104 tracing::debug!(
1105 path = %agents_md_path.display(),
1106 "AGENTS.md exists but is empty — skipping"
1107 );
1108 }
1109 Err(e) => {
1110 tracing::warn!(
1111 path = %agents_md_path.display(),
1112 error = %e,
1113 "Failed to read AGENTS.md — skipping"
1114 );
1115 }
1116 }
1117 }
1118
1119 let base_registry = self
1123 .config
1124 .skill_registry
1125 .as_deref()
1126 .map(|r| r.fork())
1127 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
1128 if let Some(ref r) = opts.skill_registry {
1130 for skill in r.all() {
1131 base_registry.register_unchecked(skill);
1132 }
1133 }
1134 for dir in &opts.skill_dirs {
1136 if let Err(e) = base_registry.load_from_dir(dir) {
1137 tracing::warn!(
1138 dir = %dir.display(),
1139 error = %e,
1140 "Failed to load session skill dir — skipping"
1141 );
1142 }
1143 }
1144 let effective_registry = Arc::new(base_registry);
1145
1146 if !opts.plugins.is_empty() {
1149 use crate::plugin::PluginContext;
1150 let plugin_ctx = PluginContext::new()
1151 .with_llm(Arc::clone(&self.llm_client))
1152 .with_skill_registry(Arc::clone(&effective_registry));
1153 let plugin_registry = tool_executor.registry();
1154 for plugin in &opts.plugins {
1155 tracing::info!("Loading plugin '{}' v{}", plugin.name(), plugin.version());
1156 match plugin.load(plugin_registry, &plugin_ctx) {
1157 Ok(()) => {
1158 for skill in plugin.skills() {
1159 tracing::debug!(
1160 "Plugin '{}' registered skill '{}'",
1161 plugin.name(),
1162 skill.name
1163 );
1164 effective_registry.register_unchecked(skill);
1165 }
1166 }
1167 Err(e) => {
1168 tracing::error!("Plugin '{}' failed to load: {}", plugin.name(), e);
1169 }
1170 }
1171 }
1172 }
1173
1174 let skill_prompt = effective_registry.to_system_prompt();
1176 if !skill_prompt.is_empty() {
1177 prompt_slots.extra = match prompt_slots.extra {
1178 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
1179 None => Some(skill_prompt),
1180 };
1181 }
1182
1183 let mut init_warning: Option<String> = None;
1185 let memory = {
1186 let store = if let Some(ref store) = opts.memory_store {
1187 Some(Arc::clone(store))
1188 } else if let Some(ref dir) = opts.file_memory_dir {
1189 match tokio::runtime::Handle::try_current() {
1190 Ok(handle) => {
1191 let dir = dir.clone();
1192 match tokio::task::block_in_place(|| {
1193 handle.block_on(FileMemoryStore::new(dir))
1194 }) {
1195 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1196 Err(e) => {
1197 let msg = format!("Failed to create file memory store: {}", e);
1198 tracing::warn!("{}", msg);
1199 init_warning = Some(msg);
1200 None
1201 }
1202 }
1203 }
1204 Err(_) => {
1205 let msg =
1206 "No async runtime available for file memory store — memory disabled"
1207 .to_string();
1208 tracing::warn!("{}", msg);
1209 init_warning = Some(msg);
1210 None
1211 }
1212 }
1213 } else {
1214 None
1215 };
1216 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1217 };
1218
1219 let base = self.config.clone();
1220 let config = AgentConfig {
1221 prompt_slots,
1222 tools: tool_defs,
1223 security_provider: opts.security_provider.clone(),
1224 permission_checker: opts.permission_checker.clone(),
1225 confirmation_manager: opts.confirmation_manager.clone(),
1226 context_providers: opts.context_providers.clone(),
1227 planning_mode: opts.planning_mode,
1228 goal_tracking: opts.goal_tracking,
1229 skill_registry: Some(Arc::clone(&effective_registry)),
1230 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1231 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1232 circuit_breaker_threshold: opts
1233 .circuit_breaker_threshold
1234 .unwrap_or(base.circuit_breaker_threshold),
1235 auto_compact: opts.auto_compact,
1236 auto_compact_threshold: opts
1237 .auto_compact_threshold
1238 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
1239 max_context_tokens: base.max_context_tokens,
1240 llm_client: Some(Arc::clone(&llm_client)),
1241 memory: memory.clone(),
1242 continuation_enabled: opts
1243 .continuation_enabled
1244 .unwrap_or(base.continuation_enabled),
1245 max_continuation_turns: opts
1246 .max_continuation_turns
1247 .unwrap_or(base.max_continuation_turns),
1248 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1249 ..base
1250 };
1251
1252 {
1256 use crate::tools::register_skill;
1257 register_skill(
1258 tool_executor.registry(),
1259 Arc::clone(&llm_client),
1260 Arc::clone(&effective_registry),
1261 Arc::clone(&tool_executor),
1262 config.clone(),
1263 );
1264 }
1265
1266 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1269 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1270 let session_id = uuid::Uuid::new_v4().to_string();
1271 let rt = tokio::runtime::Handle::try_current();
1272
1273 match rt {
1274 Ok(handle) => {
1275 let queue = tokio::task::block_in_place(|| {
1277 handle.block_on(SessionLaneQueue::new(
1278 &session_id,
1279 queue_config.clone(),
1280 agent_event_tx.clone(),
1281 ))
1282 });
1283 match queue {
1284 Ok(q) => {
1285 let q = Arc::new(q);
1287 let q2 = Arc::clone(&q);
1288 tokio::task::block_in_place(|| {
1289 handle.block_on(async { q2.start().await.ok() })
1290 });
1291 Some(q)
1292 }
1293 Err(e) => {
1294 tracing::warn!("Failed to create session lane queue: {}", e);
1295 None
1296 }
1297 }
1298 }
1299 Err(_) => {
1300 tracing::warn!(
1301 "No async runtime available for queue creation — queue disabled"
1302 );
1303 None
1304 }
1305 }
1306 } else {
1307 None
1308 };
1309
1310 let mut tool_context = ToolContext::new(canonical.clone());
1312 if let Some(ref search_config) = self.code_config.search {
1313 tool_context = tool_context.with_search_config(search_config.clone());
1314 }
1315 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1316
1317 if let Some(handle) = opts.sandbox_handle.clone() {
1319 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1320 tool_context = tool_context.with_sandbox(handle);
1321 } else if opts.sandbox_config.is_some() {
1322 tracing::warn!(
1323 "sandbox_config is set but no sandbox_handle was provided \
1324 — bash commands will run locally"
1325 );
1326 }
1327
1328 let session_id = opts
1329 .session_id
1330 .clone()
1331 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1332
1333 let session_store = if opts.session_store.is_some() {
1335 opts.session_store.clone()
1336 } else if let Some(ref dir) = self.code_config.sessions_dir {
1337 match tokio::runtime::Handle::try_current() {
1338 Ok(handle) => {
1339 let dir = dir.clone();
1340 match tokio::task::block_in_place(|| {
1341 handle.block_on(crate::store::FileSessionStore::new(dir))
1342 }) {
1343 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1344 Err(e) => {
1345 tracing::warn!(
1346 "Failed to create session store from sessions_dir: {}",
1347 e
1348 );
1349 None
1350 }
1351 }
1352 }
1353 Err(_) => {
1354 tracing::warn!(
1355 "No async runtime for sessions_dir store — persistence disabled"
1356 );
1357 None
1358 }
1359 }
1360 } else {
1361 None
1362 };
1363
1364 let (cron_scheduler, cron_rx) = CronScheduler::new();
1368 let mut command_registry = CommandRegistry::new();
1369 command_registry.register(Arc::new(LoopCommand {
1370 scheduler: Arc::clone(&cron_scheduler),
1371 }));
1372 command_registry.register(Arc::new(CronListCommand {
1373 scheduler: Arc::clone(&cron_scheduler),
1374 }));
1375 command_registry.register(Arc::new(CronCancelCommand {
1376 scheduler: Arc::clone(&cron_scheduler),
1377 }));
1378
1379 Ok(AgentSession {
1380 llm_client,
1381 tool_executor,
1382 tool_context,
1383 memory: config.memory.clone(),
1384 config,
1385 workspace: canonical,
1386 session_id,
1387 history: RwLock::new(Vec::new()),
1388 command_queue,
1389 session_store,
1390 auto_save: opts.auto_save,
1391 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1392 ahp_executor: opts.hook_executor.clone(),
1393 init_warning,
1394 command_registry: std::sync::Mutex::new(command_registry),
1395 model_name: opts
1396 .model
1397 .clone()
1398 .or_else(|| self.code_config.default_model.clone())
1399 .unwrap_or_else(|| "unknown".to_string()),
1400 mcp_manager: opts
1401 .mcp_manager
1402 .clone()
1403 .or_else(|| self.global_mcp.clone())
1404 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1405 agent_registry,
1406 cron_scheduler,
1407 cron_rx: tokio::sync::Mutex::new(cron_rx),
1408 is_processing_cron: AtomicBool::new(false),
1409 cron_started: AtomicBool::new(false),
1410 cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
1411 active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
1412 task_manager: Arc::new(TaskManager::new()),
1413 progress_tracker: Arc::new(tokio::sync::RwLock::new(ProgressTracker::new(30))),
1414 })
1415 }
1416}
1417
1418#[derive(Debug, Clone)]
1427pub struct BtwResult {
1428 pub question: String,
1430 pub answer: String,
1432 pub usage: crate::llm::TokenUsage,
1434}
1435
1436pub struct AgentSession {
1445 llm_client: Arc<dyn LlmClient>,
1446 tool_executor: Arc<ToolExecutor>,
1447 tool_context: ToolContext,
1448 config: AgentConfig,
1449 workspace: PathBuf,
1450 session_id: String,
1452 history: RwLock<Vec<Message>>,
1454 command_queue: Option<Arc<SessionLaneQueue>>,
1456 memory: Option<Arc<crate::memory::AgentMemory>>,
1458 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1460 auto_save: bool,
1462 hook_engine: Arc<crate::hooks::HookEngine>,
1464 ahp_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
1467 init_warning: Option<String>,
1469 command_registry: std::sync::Mutex<CommandRegistry>,
1472 model_name: String,
1474 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1476 agent_registry: Arc<crate::subagent::AgentRegistry>,
1478 cron_scheduler: Arc<CronScheduler>,
1480 cron_rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<ScheduledFire>>,
1482 is_processing_cron: AtomicBool,
1484 cron_started: AtomicBool,
1488 cancel_token: Arc<tokio::sync::Mutex<Option<tokio_util::sync::CancellationToken>>>,
1491 active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1493 task_manager: Arc<TaskManager>,
1495 progress_tracker: Arc<tokio::sync::RwLock<ProgressTracker>>,
1497}
1498
1499#[derive(Debug, Clone)]
1500struct ActiveToolSnapshot {
1501 tool_name: String,
1502 started_at_ms: u64,
1503}
1504
1505impl std::fmt::Debug for AgentSession {
1506 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1507 f.debug_struct("AgentSession")
1508 .field("session_id", &self.session_id)
1509 .field("workspace", &self.workspace.display().to_string())
1510 .field("auto_save", &self.auto_save)
1511 .finish()
1512 }
1513}
1514
1515impl AgentSession {
1516 fn now_ms() -> u64 {
1517 std::time::SystemTime::now()
1518 .duration_since(std::time::UNIX_EPOCH)
1519 .map(|d| d.as_millis() as u64)
1520 .unwrap_or(0)
1521 }
1522
1523 fn compact_json_value(value: &serde_json::Value) -> String {
1524 let raw = match value {
1525 serde_json::Value::Null => String::new(),
1526 serde_json::Value::String(s) => s.clone(),
1527 _ => serde_json::to_string(value).unwrap_or_default(),
1528 };
1529 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1530 if compact.len() > 180 {
1531 format!("{}...", truncate_utf8(&compact, 180))
1532 } else {
1533 compact
1534 }
1535 }
1536
1537 async fn apply_runtime_event(
1538 active_tools: &Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1539 event: &AgentEvent,
1540 ) {
1541 match event {
1542 AgentEvent::ToolStart { id, name } => {
1543 active_tools.write().await.insert(
1544 id.clone(),
1545 ActiveToolSnapshot {
1546 tool_name: name.clone(),
1547 started_at_ms: Self::now_ms(),
1548 },
1549 );
1550 }
1551 AgentEvent::ToolEnd { id, .. }
1552 | AgentEvent::PermissionDenied { tool_id: id, .. }
1553 | AgentEvent::ConfirmationRequired { tool_id: id, .. }
1554 | AgentEvent::ConfirmationReceived { tool_id: id, .. }
1555 | AgentEvent::ConfirmationTimeout { tool_id: id, .. } => {
1556 active_tools.write().await.remove(id);
1557 }
1558 _ => {}
1559 }
1560 }
1561
1562 async fn clear_runtime_tracking(&self) {
1563 self.active_tools.write().await.clear();
1564 }
1565
1566 fn build_agent_loop(&self) -> AgentLoop {
1570 let mut config = self.config.clone();
1571 config.hook_engine = Some(if let Some(ref ahp) = self.ahp_executor {
1572 ahp.clone()
1573 } else {
1574 Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>
1575 });
1576 config.tools = self.tool_executor.definitions();
1580 let mut agent_loop = AgentLoop::new(
1581 self.llm_client.clone(),
1582 self.tool_executor.clone(),
1583 self.tool_context.clone(),
1584 config,
1585 );
1586 if let Some(ref queue) = self.command_queue {
1587 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1588 }
1589 agent_loop = agent_loop.with_progress_tracker(Arc::clone(&self.progress_tracker));
1590 agent_loop = agent_loop.with_task_manager(Arc::clone(&self.task_manager));
1591 agent_loop
1592 }
1593
1594 fn build_command_context(&self) -> CommandContext {
1596 let history = read_or_recover(&self.history);
1597
1598 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1600
1601 let mut mcp_map: std::collections::HashMap<String, usize> =
1603 std::collections::HashMap::new();
1604 for name in &tool_names {
1605 if let Some(rest) = name.strip_prefix("mcp__") {
1606 if let Some((server, _)) = rest.split_once("__") {
1607 *mcp_map.entry(server.to_string()).or_default() += 1;
1608 }
1609 }
1610 }
1611 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1612 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1613
1614 CommandContext {
1615 session_id: self.session_id.clone(),
1616 workspace: self.workspace.display().to_string(),
1617 model: self.model_name.clone(),
1618 history_len: history.len(),
1619 total_tokens: 0,
1620 total_cost: 0.0,
1621 tool_names,
1622 mcp_servers,
1623 }
1624 }
1625
1626 pub fn command_registry(&self) -> std::sync::MutexGuard<'_, CommandRegistry> {
1630 self.command_registry
1631 .lock()
1632 .expect("command_registry lock poisoned")
1633 }
1634
1635 pub fn register_command(&self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1639 self.command_registry
1640 .lock()
1641 .expect("command_registry lock poisoned")
1642 .register(cmd);
1643 }
1644
1645 pub fn cron_scheduler(&self) -> &Arc<CronScheduler> {
1647 &self.cron_scheduler
1648 }
1649
1650 pub async fn close(&self) {
1652 let _ = self.cancel().await;
1653 self.cron_scheduler.stop();
1654 }
1655
1656 fn ensure_cron_started(&self) {
1661 if !self.cron_started.swap(true, Ordering::Relaxed) {
1662 CronScheduler::start(Arc::clone(&self.cron_scheduler));
1663 }
1664 }
1665
1666 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1675 self.ensure_cron_started();
1678
1679 if CommandRegistry::is_command(prompt) {
1681 let ctx = self.build_command_context();
1682 let output = self.command_registry().dispatch(prompt, &ctx);
1683 if let Some(output) = output {
1685 if let Some(CommandAction::BtwQuery(ref question)) = output.action {
1687 let result = self.btw(question).await?;
1688 return Ok(AgentResult {
1689 text: result.answer,
1690 messages: history
1691 .map(|h| h.to_vec())
1692 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1693 tool_calls_count: 0,
1694 usage: result.usage,
1695 });
1696 }
1697 return Ok(AgentResult {
1698 text: output.text,
1699 messages: history
1700 .map(|h| h.to_vec())
1701 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1702 tool_calls_count: 0,
1703 usage: crate::llm::TokenUsage::default(),
1704 });
1705 }
1706 }
1707
1708 if let Some(ref w) = self.init_warning {
1709 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1710 }
1711 let agent_loop = self.build_agent_loop();
1712 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
1713 let runtime_state = Arc::clone(&self.active_tools);
1714 let runtime_collector = tokio::spawn(async move {
1715 while let Some(event) = runtime_rx.recv().await {
1716 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1717 }
1718 });
1719
1720 let use_internal = history.is_none();
1721 let effective_history = match history {
1722 Some(h) => h.to_vec(),
1723 None => read_or_recover(&self.history).clone(),
1724 };
1725
1726 let cancel_token = tokio_util::sync::CancellationToken::new();
1727 *self.cancel_token.lock().await = Some(cancel_token.clone());
1728 let result = agent_loop
1729 .execute_with_session(
1730 &effective_history,
1731 prompt,
1732 Some(&self.session_id),
1733 Some(runtime_tx),
1734 Some(&cancel_token),
1735 )
1736 .await;
1737 *self.cancel_token.lock().await = None;
1738 let _ = runtime_collector.await;
1739 let result = result?;
1740
1741 if use_internal {
1744 *write_or_recover(&self.history) = result.messages.clone();
1745
1746 if self.auto_save {
1748 if let Err(e) = self.save().await {
1749 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1750 }
1751 }
1752 }
1753
1754 if !self.is_processing_cron.swap(true, Ordering::Relaxed) {
1759 let fires = {
1760 let mut rx = self.cron_rx.lock().await;
1761 let mut fires: Vec<ScheduledFire> = Vec::new();
1762 while let Ok(fire) = rx.try_recv() {
1763 fires.push(fire);
1764 }
1765 fires
1766 };
1767 for fire in fires {
1768 tracing::debug!(
1769 task_id = %fire.task_id,
1770 "Firing scheduled cron task"
1771 );
1772 if let Err(e) = Box::pin(self.send(&fire.prompt, None)).await {
1773 tracing::warn!(
1774 task_id = %fire.task_id,
1775 "Scheduled task failed: {e}"
1776 );
1777 }
1778 }
1779 self.is_processing_cron.store(false, Ordering::Relaxed);
1780 }
1781
1782 self.clear_runtime_tracking().await;
1783
1784 Ok(result)
1785 }
1786
1787 async fn build_btw_runtime_context(&self) -> String {
1788 let mut sections = Vec::new();
1789
1790 let active_tools = {
1791 let tools = self.active_tools.read().await;
1792 let mut items = tools
1793 .iter()
1794 .map(|(tool_id, tool)| {
1795 let elapsed_ms = Self::now_ms().saturating_sub(tool.started_at_ms);
1796 format!(
1797 "- {} [{}] running_for={}ms",
1798 tool.tool_name, tool_id, elapsed_ms
1799 )
1800 })
1801 .collect::<Vec<_>>();
1802 items.sort();
1803 items
1804 };
1805 if !active_tools.is_empty() {
1806 sections.push(format!("[active tools]\n{}", active_tools.join("\n")));
1807 }
1808
1809 if let Some(cm) = &self.config.confirmation_manager {
1810 let pending = cm.pending_confirmations().await;
1811 if !pending.is_empty() {
1812 let mut lines = pending
1813 .into_iter()
1814 .map(
1815 |PendingConfirmationInfo {
1816 tool_id,
1817 tool_name,
1818 args,
1819 remaining_ms,
1820 }| {
1821 let arg_summary = Self::compact_json_value(&args);
1822 if arg_summary.is_empty() {
1823 format!(
1824 "- {} [{}] remaining={}ms",
1825 tool_name, tool_id, remaining_ms
1826 )
1827 } else {
1828 format!(
1829 "- {} [{}] remaining={}ms {}",
1830 tool_name, tool_id, remaining_ms, arg_summary
1831 )
1832 }
1833 },
1834 )
1835 .collect::<Vec<_>>();
1836 lines.sort();
1837 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1838 }
1839 }
1840
1841 if let Some(queue) = &self.command_queue {
1842 let stats = queue.stats().await;
1843 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1844 let mut lines = vec![format!(
1845 "active={}, pending={}, external_pending={}",
1846 stats.total_active, stats.total_pending, stats.external_pending
1847 )];
1848 let mut lanes = stats
1849 .lanes
1850 .into_values()
1851 .filter(|lane| lane.active > 0 || lane.pending > 0)
1852 .map(|lane| {
1853 format!(
1854 "- {:?}: active={}, pending={}, handler={:?}",
1855 lane.lane, lane.active, lane.pending, lane.handler_mode
1856 )
1857 })
1858 .collect::<Vec<_>>();
1859 lanes.sort();
1860 lines.extend(lanes);
1861 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1862 }
1863
1864 let external_tasks = queue.pending_external_tasks().await;
1865 if !external_tasks.is_empty() {
1866 let mut lines = external_tasks
1867 .into_iter()
1868 .take(6)
1869 .map(|task| {
1870 let payload_summary = Self::compact_json_value(&task.payload);
1871 if payload_summary.is_empty() {
1872 format!(
1873 "- {} {:?} remaining={}ms",
1874 task.command_type,
1875 task.lane,
1876 task.remaining_ms()
1877 )
1878 } else {
1879 format!(
1880 "- {} {:?} remaining={}ms {}",
1881 task.command_type,
1882 task.lane,
1883 task.remaining_ms(),
1884 payload_summary
1885 )
1886 }
1887 })
1888 .collect::<Vec<_>>();
1889 lines.sort();
1890 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1891 }
1892 }
1893
1894 if let Some(store) = &self.session_store {
1895 if let Ok(Some(session)) = store.load(&self.session_id).await {
1896 let active_tasks = session
1897 .tasks
1898 .into_iter()
1899 .filter(|task| task.status.is_active())
1900 .take(6)
1901 .map(|task| match task.tool {
1902 Some(tool) if !tool.is_empty() => {
1903 format!("- [{}] {} ({})", task.status, task.content, tool)
1904 }
1905 _ => format!("- [{}] {}", task.status, task.content),
1906 })
1907 .collect::<Vec<_>>();
1908 if !active_tasks.is_empty() {
1909 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1910 }
1911 }
1912 }
1913
1914 sections.join("\n\n")
1915 }
1916
1917 pub async fn btw(&self, question: &str) -> Result<BtwResult> {
1935 self.btw_with_context(question, None).await
1936 }
1937
1938 pub async fn btw_with_context(
1943 &self,
1944 question: &str,
1945 runtime_context: Option<&str>,
1946 ) -> Result<BtwResult> {
1947 let question = question.trim();
1948 if question.is_empty() {
1949 return Err(crate::error::CodeError::Session(
1950 "btw: question cannot be empty".to_string(),
1951 ));
1952 }
1953
1954 let history_snapshot = read_or_recover(&self.history).clone();
1956
1957 let mut messages = history_snapshot;
1959 let mut injected_sections = Vec::new();
1960 let session_runtime = self.build_btw_runtime_context().await;
1961 if !session_runtime.is_empty() {
1962 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1963 }
1964 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1965 injected_sections.push(format!("[host runtime context]\n{}", extra));
1966 }
1967 if !injected_sections.is_empty() {
1968 let injected_context = format!(
1969 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1970 injected_sections.join("\n\n")
1971 );
1972 messages.push(Message::user(&injected_context));
1973 }
1974 messages.push(Message::user(question));
1975
1976 let response = self
1977 .llm_client
1978 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1979 .await
1980 .map_err(|e| {
1981 crate::error::CodeError::Llm(format!("btw: ephemeral LLM call failed: {e}"))
1982 })?;
1983
1984 Ok(BtwResult {
1985 question: question.to_string(),
1986 answer: response.text(),
1987 usage: response.usage,
1988 })
1989 }
1990
1991 pub async fn send_with_attachments(
1996 &self,
1997 prompt: &str,
1998 attachments: &[crate::llm::Attachment],
1999 history: Option<&[Message]>,
2000 ) -> Result<AgentResult> {
2001 let use_internal = history.is_none();
2005 let mut effective_history = match history {
2006 Some(h) => h.to_vec(),
2007 None => read_or_recover(&self.history).clone(),
2008 };
2009 effective_history.push(Message::user_with_attachments(prompt, attachments));
2010
2011 let agent_loop = self.build_agent_loop();
2012 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2013 let runtime_state = Arc::clone(&self.active_tools);
2014 let runtime_collector = tokio::spawn(async move {
2015 while let Some(event) = runtime_rx.recv().await {
2016 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2017 }
2018 });
2019 let result = agent_loop
2020 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2021 .await?;
2022 let _ = runtime_collector.await;
2023
2024 if use_internal {
2025 *write_or_recover(&self.history) = result.messages.clone();
2026 if self.auto_save {
2027 if let Err(e) = self.save().await {
2028 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2029 }
2030 }
2031 }
2032
2033 self.clear_runtime_tracking().await;
2034
2035 Ok(result)
2036 }
2037
2038 pub async fn stream_with_attachments(
2043 &self,
2044 prompt: &str,
2045 attachments: &[crate::llm::Attachment],
2046 history: Option<&[Message]>,
2047 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2048 let (tx, rx) = mpsc::channel(256);
2049 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2050 let mut effective_history = match history {
2051 Some(h) => h.to_vec(),
2052 None => read_or_recover(&self.history).clone(),
2053 };
2054 effective_history.push(Message::user_with_attachments(prompt, attachments));
2055
2056 let agent_loop = self.build_agent_loop();
2057 let runtime_state = Arc::clone(&self.active_tools);
2058 let forwarder = tokio::spawn(async move {
2059 while let Some(event) = runtime_rx.recv().await {
2060 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2061 if tx.send(event).await.is_err() {
2062 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2065 break;
2066 }
2067 }
2068 });
2069 let handle = tokio::spawn(async move {
2070 let _ = agent_loop
2071 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2072 .await;
2073 });
2074 let active_tools = Arc::clone(&self.active_tools);
2075 let wrapped_handle = tokio::spawn(async move {
2076 let _ = handle.await;
2077 let _ = forwarder.await;
2078 active_tools.write().await.clear();
2079 });
2080
2081 Ok((rx, wrapped_handle))
2082 }
2083
2084 pub async fn stream(
2094 &self,
2095 prompt: &str,
2096 history: Option<&[Message]>,
2097 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2098 self.ensure_cron_started();
2099
2100 if CommandRegistry::is_command(prompt) {
2102 let ctx = self.build_command_context();
2103 let output = self.command_registry().dispatch(prompt, &ctx);
2104 if let Some(output) = output {
2106 let (tx, rx) = mpsc::channel(256);
2107
2108 if let Some(CommandAction::BtwQuery(question)) = output.action {
2110 let llm_client = self.llm_client.clone();
2112 let history_snapshot = read_or_recover(&self.history).clone();
2113 let handle = tokio::spawn(async move {
2114 let mut messages = history_snapshot;
2115 messages.push(Message::user(&question));
2116 match llm_client
2117 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2118 .await
2119 {
2120 Ok(response) => {
2121 let answer = response.text();
2122 let _ = tx
2123 .send(AgentEvent::BtwAnswer {
2124 question: question.clone(),
2125 answer: answer.clone(),
2126 usage: response.usage,
2127 })
2128 .await;
2129 let _ = tx
2130 .send(AgentEvent::End {
2131 text: answer,
2132 usage: crate::llm::TokenUsage::default(),
2133 meta: None,
2134 })
2135 .await;
2136 }
2137 Err(e) => {
2138 let _ = tx
2139 .send(AgentEvent::Error {
2140 message: format!("btw failed: {e}"),
2141 })
2142 .await;
2143 }
2144 }
2145 });
2146 return Ok((rx, handle));
2147 }
2148
2149 let handle = tokio::spawn(async move {
2150 let _ = tx
2151 .send(AgentEvent::TextDelta {
2152 text: output.text.clone(),
2153 })
2154 .await;
2155 let _ = tx
2156 .send(AgentEvent::End {
2157 text: output.text.clone(),
2158 usage: crate::llm::TokenUsage::default(),
2159 meta: None,
2160 })
2161 .await;
2162 });
2163 return Ok((rx, handle));
2164 }
2165 }
2166
2167 let (tx, rx) = mpsc::channel(256);
2168 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2169 let agent_loop = self.build_agent_loop();
2170 let effective_history = match history {
2171 Some(h) => h.to_vec(),
2172 None => read_or_recover(&self.history).clone(),
2173 };
2174 let prompt = prompt.to_string();
2175 let session_id = self.session_id.clone();
2176
2177 let cancel_token = tokio_util::sync::CancellationToken::new();
2178 *self.cancel_token.lock().await = Some(cancel_token.clone());
2179 let token_clone = cancel_token.clone();
2180 let runtime_state = Arc::clone(&self.active_tools);
2181 let forwarder = tokio::spawn(async move {
2182 while let Some(event) = runtime_rx.recv().await {
2183 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2184 if tx.send(event).await.is_err() {
2185 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2188 break;
2189 }
2190 }
2191 });
2192
2193 let handle = tokio::spawn(async move {
2194 let _ = agent_loop
2195 .execute_with_session(
2196 &effective_history,
2197 &prompt,
2198 Some(&session_id),
2199 Some(runtime_tx),
2200 Some(&token_clone),
2201 )
2202 .await;
2203 });
2204
2205 let cancel_token_ref = self.cancel_token.clone();
2207 let active_tools = Arc::clone(&self.active_tools);
2208 let wrapped_handle = tokio::spawn(async move {
2209 let _ = handle.await;
2210 let _ = forwarder.await;
2211 *cancel_token_ref.lock().await = None;
2212 active_tools.write().await.clear();
2213 });
2214
2215 Ok((rx, wrapped_handle))
2216 }
2217
2218 pub async fn cancel(&self) -> bool {
2225 let token = self.cancel_token.lock().await.clone();
2226 if let Some(token) = token {
2227 token.cancel();
2228 tracing::info!(session_id = %self.session_id, "Cancelled ongoing operation");
2229 true
2230 } else {
2231 tracing::debug!(session_id = %self.session_id, "No ongoing operation to cancel");
2232 false
2233 }
2234 }
2235
2236 pub fn history(&self) -> Vec<Message> {
2238 read_or_recover(&self.history).clone()
2239 }
2240
2241 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
2243 self.memory.as_ref()
2244 }
2245
2246 pub fn id(&self) -> &str {
2248 &self.session_id
2249 }
2250
2251 pub fn workspace(&self) -> &std::path::Path {
2253 &self.workspace
2254 }
2255
2256 pub fn init_warning(&self) -> Option<&str> {
2258 self.init_warning.as_deref()
2259 }
2260
2261 pub fn session_id(&self) -> &str {
2263 &self.session_id
2264 }
2265
2266 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
2272 self.tool_executor.definitions()
2273 }
2274
2275 pub fn tool_names(&self) -> Vec<String> {
2281 self.tool_executor
2282 .definitions()
2283 .into_iter()
2284 .map(|t| t.name)
2285 .collect()
2286 }
2287
2288 pub fn task_manager(&self) -> &Arc<TaskManager> {
2297 &self.task_manager
2298 }
2299
2300 pub fn spawn_task(&self, task: crate::task::Task) -> crate::task::TaskId {
2315 self.task_manager.spawn(task)
2316 }
2317
2318 pub fn track_tool_call(&self, tool_name: &str, args_summary: &str, success: bool) {
2322 if let Ok(mut guard) = self.progress_tracker.try_write() {
2323 guard.track_tool_call(tool_name, args_summary, success);
2324 }
2325 }
2326
2327 pub async fn get_progress(&self) -> crate::task::AgentProgress {
2331 self.progress_tracker.read().await.progress()
2332 }
2333
2334 pub fn subscribe_tasks(
2338 &self,
2339 task_id: crate::task::TaskId,
2340 ) -> Option<tokio::sync::broadcast::Receiver<crate::task::manager::TaskEvent>> {
2341 self.task_manager.subscribe(task_id)
2342 }
2343
2344 pub fn subscribe_all_tasks(
2346 &self,
2347 ) -> tokio::sync::broadcast::Receiver<crate::task::manager::TaskEvent> {
2348 self.task_manager.subscribe_all()
2349 }
2350
2351 pub fn register_hook(&self, hook: crate::hooks::Hook) {
2357 self.hook_engine.register(hook);
2358 }
2359
2360 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
2362 self.hook_engine.unregister(hook_id)
2363 }
2364
2365 pub fn register_hook_handler(
2367 &self,
2368 hook_id: &str,
2369 handler: Arc<dyn crate::hooks::HookHandler>,
2370 ) {
2371 self.hook_engine.register_handler(hook_id, handler);
2372 }
2373
2374 pub fn unregister_hook_handler(&self, hook_id: &str) {
2376 self.hook_engine.unregister_handler(hook_id);
2377 }
2378
2379 pub fn hook_count(&self) -> usize {
2381 self.hook_engine.hook_count()
2382 }
2383
2384 pub async fn save(&self) -> Result<()> {
2388 let store = match &self.session_store {
2389 Some(s) => s,
2390 None => return Ok(()),
2391 };
2392
2393 let history = read_or_recover(&self.history).clone();
2394 let now = chrono::Utc::now().timestamp();
2395
2396 let data = crate::store::SessionData {
2397 id: self.session_id.clone(),
2398 config: crate::session::SessionConfig {
2399 name: String::new(),
2400 workspace: self.workspace.display().to_string(),
2401 system_prompt: Some(self.config.prompt_slots.build()),
2402 max_context_length: 200_000,
2403 auto_compact: false,
2404 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
2405 storage_type: crate::config::StorageBackend::File,
2406 queue_config: None,
2407 confirmation_policy: None,
2408 permission_policy: None,
2409 parent_id: None,
2410 security_config: None,
2411 hook_engine: None,
2412 planning_mode: self.config.planning_mode,
2413 goal_tracking: self.config.goal_tracking,
2414 },
2415 state: crate::session::SessionState::Active,
2416 messages: history,
2417 context_usage: crate::session::ContextUsage::default(),
2418 total_usage: crate::llm::TokenUsage::default(),
2419 total_cost: 0.0,
2420 model_name: None,
2421 cost_records: Vec::new(),
2422 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
2423 thinking_enabled: false,
2424 thinking_budget: None,
2425 created_at: now,
2426 updated_at: now,
2427 llm_config: None,
2428 tasks: Vec::new(),
2429 parent_id: None,
2430 };
2431
2432 store.save(&data).await?;
2433 tracing::debug!("Session {} saved", self.session_id);
2434 Ok(())
2435 }
2436
2437 pub async fn read_file(&self, path: &str) -> Result<String> {
2439 let args = serde_json::json!({ "file_path": path });
2440 let result = self.tool_executor.execute("read", &args).await?;
2441 Ok(result.output)
2442 }
2443
2444 pub async fn bash(&self, command: &str) -> Result<String> {
2449 let args = serde_json::json!({ "command": command });
2450 let result = self
2451 .tool_executor
2452 .execute_with_context("bash", &args, &self.tool_context)
2453 .await?;
2454 Ok(result.output)
2455 }
2456
2457 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
2459 let args = serde_json::json!({ "pattern": pattern });
2460 let result = self.tool_executor.execute("glob", &args).await?;
2461 let files: Vec<String> = result
2462 .output
2463 .lines()
2464 .filter(|l| !l.is_empty())
2465 .map(|l| l.to_string())
2466 .collect();
2467 Ok(files)
2468 }
2469
2470 pub async fn grep(&self, pattern: &str) -> Result<String> {
2472 let args = serde_json::json!({ "pattern": pattern });
2473 let result = self.tool_executor.execute("grep", &args).await?;
2474 Ok(result.output)
2475 }
2476
2477 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
2479 let result = self.tool_executor.execute(name, &args).await?;
2480 Ok(ToolCallResult {
2481 name: name.to_string(),
2482 output: result.output,
2483 exit_code: result.exit_code,
2484 metadata: result.metadata,
2485 })
2486 }
2487
2488 pub fn has_queue(&self) -> bool {
2494 self.command_queue.is_some()
2495 }
2496
2497 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
2501 if let Some(ref queue) = self.command_queue {
2502 queue.set_lane_handler(lane, config).await;
2503 }
2504 }
2505
2506 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
2510 if let Some(ref queue) = self.command_queue {
2511 queue.complete_external_task(task_id, result).await
2512 } else {
2513 false
2514 }
2515 }
2516
2517 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
2519 if let Some(ref queue) = self.command_queue {
2520 queue.pending_external_tasks().await
2521 } else {
2522 Vec::new()
2523 }
2524 }
2525
2526 pub async fn queue_stats(&self) -> SessionQueueStats {
2528 if let Some(ref queue) = self.command_queue {
2529 queue.stats().await
2530 } else {
2531 SessionQueueStats::default()
2532 }
2533 }
2534
2535 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
2537 if let Some(ref queue) = self.command_queue {
2538 queue.metrics_snapshot().await
2539 } else {
2540 None
2541 }
2542 }
2543
2544 pub async fn submit(
2550 &self,
2551 lane: SessionLane,
2552 command: Box<dyn crate::queue::SessionCommand>,
2553 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
2554 let queue = self
2555 .command_queue
2556 .as_ref()
2557 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2558 Ok(queue.submit(lane, command).await)
2559 }
2560
2561 pub async fn submit_batch(
2568 &self,
2569 lane: SessionLane,
2570 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
2571 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
2572 {
2573 let queue = self
2574 .command_queue
2575 .as_ref()
2576 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2577 Ok(queue.submit_batch(lane, commands).await)
2578 }
2579
2580 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
2582 if let Some(ref queue) = self.command_queue {
2583 queue.dead_letters().await
2584 } else {
2585 Vec::new()
2586 }
2587 }
2588
2589 pub fn register_agent_dir(&self, dir: &std::path::Path) -> usize {
2602 use crate::subagent::load_agents_from_dir;
2603 let agents = load_agents_from_dir(dir);
2604 let count = agents.len();
2605 for agent in agents {
2606 tracing::info!(
2607 session_id = %self.session_id,
2608 agent = agent.name,
2609 dir = %dir.display(),
2610 "Dynamically registered agent"
2611 );
2612 self.agent_registry.register(agent);
2613 }
2614 count
2615 }
2616
2617 pub async fn add_mcp_server(
2624 &self,
2625 config: crate::mcp::McpServerConfig,
2626 ) -> crate::error::Result<usize> {
2627 let server_name = config.name.clone();
2628 self.mcp_manager.register_server(config).await;
2629 self.mcp_manager.connect(&server_name).await.map_err(|e| {
2630 crate::error::CodeError::Tool {
2631 tool: server_name.clone(),
2632 message: format!("Failed to connect MCP server: {}", e),
2633 }
2634 })?;
2635
2636 let tools = self.mcp_manager.get_server_tools(&server_name).await;
2637 let count = tools.len();
2638
2639 for tool in
2640 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
2641 {
2642 self.tool_executor.register_dynamic_tool(tool);
2643 }
2644
2645 tracing::info!(
2646 session_id = %self.session_id,
2647 server = server_name,
2648 tools = count,
2649 "MCP server added to live session"
2650 );
2651
2652 Ok(count)
2653 }
2654
2655 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
2660 self.tool_executor
2661 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
2662 self.mcp_manager
2663 .disconnect(server_name)
2664 .await
2665 .map_err(|e| crate::error::CodeError::Tool {
2666 tool: server_name.to_string(),
2667 message: format!("Failed to disconnect MCP server: {}", e),
2668 })?;
2669 tracing::info!(
2670 session_id = %self.session_id,
2671 server = server_name,
2672 "MCP server removed from live session"
2673 );
2674 Ok(())
2675 }
2676
2677 pub async fn mcp_status(
2679 &self,
2680 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
2681 self.mcp_manager.get_status().await
2682 }
2683}
2684
2685#[cfg(test)]
2690mod tests {
2691 use super::*;
2692 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
2693 use crate::store::SessionStore;
2694
2695 #[tokio::test]
2696 async fn test_session_submit_no_queue_returns_err() {
2697 let agent = Agent::from_config(test_config()).await.unwrap();
2698 let session = agent.session(".", None).unwrap();
2699 struct Noop;
2700 #[async_trait::async_trait]
2701 impl crate::queue::SessionCommand for Noop {
2702 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2703 Ok(serde_json::json!(null))
2704 }
2705 fn command_type(&self) -> &str {
2706 "noop"
2707 }
2708 }
2709 let result: anyhow::Result<
2710 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
2711 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
2712 assert!(result.is_err());
2713 assert!(result.unwrap_err().to_string().contains("No queue"));
2714 }
2715
2716 #[tokio::test]
2717 async fn test_session_submit_batch_no_queue_returns_err() {
2718 let agent = Agent::from_config(test_config()).await.unwrap();
2719 let session = agent.session(".", None).unwrap();
2720 struct Noop;
2721 #[async_trait::async_trait]
2722 impl crate::queue::SessionCommand for Noop {
2723 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2724 Ok(serde_json::json!(null))
2725 }
2726 fn command_type(&self) -> &str {
2727 "noop"
2728 }
2729 }
2730 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
2731 let result: anyhow::Result<
2732 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
2733 > = session.submit_batch(SessionLane::Query, cmds).await;
2734 assert!(result.is_err());
2735 assert!(result.unwrap_err().to_string().contains("No queue"));
2736 }
2737
2738 fn test_config() -> CodeConfig {
2739 CodeConfig {
2740 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
2741 providers: vec![
2742 ProviderConfig {
2743 name: "anthropic".to_string(),
2744 api_key: Some("test-key".to_string()),
2745 base_url: None,
2746 headers: std::collections::HashMap::new(),
2747 session_id_header: None,
2748 models: vec![ModelConfig {
2749 id: "claude-sonnet-4-20250514".to_string(),
2750 name: "Claude Sonnet 4".to_string(),
2751 family: "claude-sonnet".to_string(),
2752 api_key: None,
2753 base_url: None,
2754 headers: std::collections::HashMap::new(),
2755 session_id_header: None,
2756 attachment: false,
2757 reasoning: false,
2758 tool_call: true,
2759 temperature: true,
2760 release_date: None,
2761 modalities: ModelModalities::default(),
2762 cost: Default::default(),
2763 limit: Default::default(),
2764 }],
2765 },
2766 ProviderConfig {
2767 name: "openai".to_string(),
2768 api_key: Some("test-openai-key".to_string()),
2769 base_url: None,
2770 headers: std::collections::HashMap::new(),
2771 session_id_header: None,
2772 models: vec![ModelConfig {
2773 id: "gpt-4o".to_string(),
2774 name: "GPT-4o".to_string(),
2775 family: "gpt-4".to_string(),
2776 api_key: None,
2777 base_url: None,
2778 headers: std::collections::HashMap::new(),
2779 session_id_header: None,
2780 attachment: false,
2781 reasoning: false,
2782 tool_call: true,
2783 temperature: true,
2784 release_date: None,
2785 modalities: ModelModalities::default(),
2786 cost: Default::default(),
2787 limit: Default::default(),
2788 }],
2789 },
2790 ],
2791 ..Default::default()
2792 }
2793 }
2794
2795 fn build_effective_registry_for_test(
2796 agent_registry: Option<Arc<crate::skills::SkillRegistry>>,
2797 opts: &SessionOptions,
2798 ) -> Arc<crate::skills::SkillRegistry> {
2799 let base_registry = agent_registry
2800 .as_deref()
2801 .map(|r| r.fork())
2802 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
2803 if let Some(ref r) = opts.skill_registry {
2804 for skill in r.all() {
2805 base_registry.register_unchecked(skill);
2806 }
2807 }
2808 for dir in &opts.skill_dirs {
2809 if let Err(e) = base_registry.load_from_dir(dir) {
2810 tracing::warn!(
2811 dir = %dir.display(),
2812 error = %e,
2813 "Failed to load session skill dir — skipping"
2814 );
2815 }
2816 }
2817 Arc::new(base_registry)
2818 }
2819
2820 #[tokio::test]
2821 async fn test_from_config() {
2822 let agent = Agent::from_config(test_config()).await;
2823 assert!(agent.is_ok());
2824 }
2825
2826 #[tokio::test]
2827 async fn test_session_default() {
2828 let agent = Agent::from_config(test_config()).await.unwrap();
2829 let session = agent.session("/tmp/test-workspace", None);
2830 assert!(session.is_ok());
2831 let debug = format!("{:?}", session.unwrap());
2832 assert!(debug.contains("AgentSession"));
2833 }
2834
2835 #[tokio::test]
2836 async fn test_session_registers_agentic_tools_by_default() {
2837 let agent = Agent::from_config(test_config()).await.unwrap();
2838 let session = agent.session("/tmp/test-workspace", None).unwrap();
2839 let tool_names = session.tool_names();
2840
2841 assert!(tool_names.iter().any(|name| name == "agentic_search"));
2842 assert!(tool_names.iter().any(|name| name == "agentic_parse"));
2843 }
2844
2845 #[tokio::test]
2846 async fn test_session_can_disable_agentic_tools_via_config() {
2847 let mut config = test_config();
2848 config.agentic_search = Some(crate::config::AgenticSearchConfig {
2849 enabled: false,
2850 ..Default::default()
2851 });
2852 config.agentic_parse = Some(crate::config::AgenticParseConfig {
2853 enabled: false,
2854 ..Default::default()
2855 });
2856
2857 let agent = Agent::from_config(config).await.unwrap();
2858 let session = agent.session("/tmp/test-workspace", None).unwrap();
2859 let tool_names = session.tool_names();
2860
2861 assert!(!tool_names.iter().any(|name| name == "agentic_search"));
2862 assert!(!tool_names.iter().any(|name| name == "agentic_parse"));
2863 }
2864
2865 #[tokio::test]
2866 async fn test_session_with_model_override() {
2867 let agent = Agent::from_config(test_config()).await.unwrap();
2868 let opts = SessionOptions::new().with_model("openai/gpt-4o");
2869 let session = agent.session("/tmp/test-workspace", Some(opts));
2870 assert!(session.is_ok());
2871 }
2872
2873 #[tokio::test]
2874 async fn test_session_with_invalid_model_format() {
2875 let agent = Agent::from_config(test_config()).await.unwrap();
2876 let opts = SessionOptions::new().with_model("gpt-4o");
2877 let session = agent.session("/tmp/test-workspace", Some(opts));
2878 assert!(session.is_err());
2879 }
2880
2881 #[tokio::test]
2882 async fn test_session_with_model_not_found() {
2883 let agent = Agent::from_config(test_config()).await.unwrap();
2884 let opts = SessionOptions::new().with_model("openai/nonexistent");
2885 let session = agent.session("/tmp/test-workspace", Some(opts));
2886 assert!(session.is_err());
2887 }
2888
2889 #[tokio::test]
2890 async fn test_session_preserves_skill_scorer_from_agent_registry() {
2891 use crate::skills::feedback::{
2892 DefaultSkillScorer, SkillFeedback, SkillOutcome, SkillScorer,
2893 };
2894 use crate::skills::{Skill, SkillKind, SkillRegistry};
2895
2896 let registry = Arc::new(SkillRegistry::new());
2897 let scorer = Arc::new(DefaultSkillScorer::default());
2898 registry.set_scorer(scorer.clone());
2899
2900 registry.register_unchecked(Arc::new(Skill {
2901 name: "healthy-skill".to_string(),
2902 description: "healthy".to_string(),
2903 allowed_tools: None,
2904 disable_model_invocation: false,
2905 kind: SkillKind::Instruction,
2906 content: "healthy".to_string(),
2907 tags: vec![],
2908 version: None,
2909 }));
2910 registry.register_unchecked(Arc::new(Skill {
2911 name: "disabled-skill".to_string(),
2912 description: "disabled".to_string(),
2913 allowed_tools: None,
2914 disable_model_invocation: false,
2915 kind: SkillKind::Instruction,
2916 content: "disabled".to_string(),
2917 tags: vec![],
2918 version: None,
2919 }));
2920
2921 for _ in 0..5 {
2922 scorer.record(SkillFeedback {
2923 skill_name: "disabled-skill".to_string(),
2924 outcome: SkillOutcome::Failure,
2925 score_delta: -1.0,
2926 reason: "bad".to_string(),
2927 timestamp: 0,
2928 });
2929 }
2930
2931 let effective_registry =
2932 build_effective_registry_for_test(Some(registry), &SessionOptions::new());
2933 let prompt = effective_registry.to_system_prompt();
2934
2935 assert!(prompt.contains("healthy-skill"));
2936 assert!(!prompt.contains("disabled-skill"));
2937 }
2938
2939 #[tokio::test]
2940 async fn test_session_skill_dirs_preserve_agent_registry_validator() {
2941 use crate::skills::validator::DefaultSkillValidator;
2942 use crate::skills::SkillRegistry;
2943
2944 let registry = Arc::new(SkillRegistry::new());
2945 registry.set_validator(Arc::new(DefaultSkillValidator::default()));
2946
2947 let temp_dir = tempfile::tempdir().unwrap();
2948 let invalid_skill = temp_dir.path().join("invalid.md");
2949 std::fs::write(
2950 &invalid_skill,
2951 r#"---
2952name: BadName
2953description: "invalid skill name"
2954kind: instruction
2955---
2956# Invalid Skill
2957"#,
2958 )
2959 .unwrap();
2960
2961 let opts = SessionOptions::new().with_skill_dirs([temp_dir.path()]);
2962 let effective_registry = build_effective_registry_for_test(Some(registry), &opts);
2963 assert!(effective_registry.get("BadName").is_none());
2964 }
2965
2966 #[tokio::test]
2967 async fn test_session_skill_registry_overrides_agent_registry_without_polluting_parent() {
2968 use crate::skills::{Skill, SkillKind, SkillRegistry};
2969
2970 let registry = Arc::new(SkillRegistry::new());
2971 registry.register_unchecked(Arc::new(Skill {
2972 name: "shared-skill".to_string(),
2973 description: "agent level".to_string(),
2974 allowed_tools: None,
2975 disable_model_invocation: false,
2976 kind: SkillKind::Instruction,
2977 content: "agent content".to_string(),
2978 tags: vec![],
2979 version: None,
2980 }));
2981
2982 let session_registry = Arc::new(SkillRegistry::new());
2983 session_registry.register_unchecked(Arc::new(Skill {
2984 name: "shared-skill".to_string(),
2985 description: "session level".to_string(),
2986 allowed_tools: None,
2987 disable_model_invocation: false,
2988 kind: SkillKind::Instruction,
2989 content: "session content".to_string(),
2990 tags: vec![],
2991 version: None,
2992 }));
2993
2994 let opts = SessionOptions::new().with_skill_registry(session_registry);
2995 let effective_registry = build_effective_registry_for_test(Some(registry.clone()), &opts);
2996
2997 assert_eq!(
2998 effective_registry.get("shared-skill").unwrap().content,
2999 "session content"
3000 );
3001 assert_eq!(
3002 registry.get("shared-skill").unwrap().content,
3003 "agent content"
3004 );
3005 }
3006
3007 #[tokio::test]
3008 async fn test_session_skill_dirs_override_session_registry_and_skip_invalid_entries() {
3009 use crate::skills::{Skill, SkillKind, SkillRegistry};
3010
3011 let session_registry = Arc::new(SkillRegistry::new());
3012 session_registry.register_unchecked(Arc::new(Skill {
3013 name: "shared-skill".to_string(),
3014 description: "session registry".to_string(),
3015 allowed_tools: None,
3016 disable_model_invocation: false,
3017 kind: SkillKind::Instruction,
3018 content: "registry content".to_string(),
3019 tags: vec![],
3020 version: None,
3021 }));
3022
3023 let temp_dir = tempfile::tempdir().unwrap();
3024 std::fs::write(
3025 temp_dir.path().join("shared.md"),
3026 r#"---
3027name: shared-skill
3028description: "skill dir override"
3029kind: instruction
3030---
3031# Shared Skill
3032dir content
3033"#,
3034 )
3035 .unwrap();
3036 std::fs::write(temp_dir.path().join("README.md"), "# not a skill").unwrap();
3037
3038 let opts = SessionOptions::new()
3039 .with_skill_registry(session_registry)
3040 .with_skill_dirs([temp_dir.path()]);
3041 let effective_registry = build_effective_registry_for_test(None, &opts);
3042
3043 assert_eq!(
3044 effective_registry.get("shared-skill").unwrap().description,
3045 "skill dir override"
3046 );
3047 assert!(effective_registry.get("README").is_none());
3048 }
3049
3050 #[tokio::test]
3051 async fn test_session_plugin_skills_are_loaded_into_session_registry_only() {
3052 use crate::plugin::{Plugin, PluginContext};
3053 use crate::skills::{Skill, SkillKind, SkillRegistry};
3054 use crate::tools::ToolRegistry;
3055
3056 struct SessionOnlySkillPlugin;
3057
3058 impl Plugin for SessionOnlySkillPlugin {
3059 fn name(&self) -> &str {
3060 "session-only-skill"
3061 }
3062
3063 fn version(&self) -> &str {
3064 "0.1.0"
3065 }
3066
3067 fn tool_names(&self) -> &[&str] {
3068 &[]
3069 }
3070
3071 fn load(
3072 &self,
3073 _registry: &Arc<ToolRegistry>,
3074 _ctx: &PluginContext,
3075 ) -> anyhow::Result<()> {
3076 Ok(())
3077 }
3078
3079 fn skills(&self) -> Vec<Arc<Skill>> {
3080 vec![Arc::new(Skill {
3081 name: "plugin-session-skill".to_string(),
3082 description: "plugin skill".to_string(),
3083 allowed_tools: None,
3084 disable_model_invocation: false,
3085 kind: SkillKind::Instruction,
3086 content: "plugin content".to_string(),
3087 tags: vec!["plugin".to_string()],
3088 version: None,
3089 })]
3090 }
3091 }
3092
3093 let mut agent = Agent::from_config(test_config()).await.unwrap();
3094 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3095 agent.config.skill_registry = Some(Arc::clone(&agent_registry));
3096
3097 let opts = SessionOptions::new().with_plugin(SessionOnlySkillPlugin);
3098 let session = agent.session("/tmp/test-workspace", Some(opts)).unwrap();
3099
3100 let session_registry = session.config.skill_registry.as_ref().unwrap();
3101 assert!(session_registry.get("plugin-session-skill").is_some());
3102 assert!(agent_registry.get("plugin-session-skill").is_none());
3103 }
3104
3105 #[tokio::test]
3106 async fn test_session_specific_skills_do_not_leak_across_sessions() {
3107 use crate::skills::{Skill, SkillKind, SkillRegistry};
3108
3109 let mut agent = Agent::from_config(test_config()).await.unwrap();
3110 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3111 agent.config.skill_registry = Some(agent_registry);
3112
3113 let session_registry = Arc::new(SkillRegistry::new());
3114 session_registry.register_unchecked(Arc::new(Skill {
3115 name: "session-only".to_string(),
3116 description: "only for first session".to_string(),
3117 allowed_tools: None,
3118 disable_model_invocation: false,
3119 kind: SkillKind::Instruction,
3120 content: "session one".to_string(),
3121 tags: vec![],
3122 version: None,
3123 }));
3124
3125 let session_one = agent
3126 .session(
3127 "/tmp/test-workspace",
3128 Some(SessionOptions::new().with_skill_registry(session_registry)),
3129 )
3130 .unwrap();
3131 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3132
3133 assert!(session_one
3134 .config
3135 .skill_registry
3136 .as_ref()
3137 .unwrap()
3138 .get("session-only")
3139 .is_some());
3140 assert!(session_two
3141 .config
3142 .skill_registry
3143 .as_ref()
3144 .unwrap()
3145 .get("session-only")
3146 .is_none());
3147 }
3148
3149 #[tokio::test]
3150 async fn test_plugin_skills_do_not_leak_across_sessions() {
3151 use crate::plugin::{Plugin, PluginContext};
3152 use crate::skills::{Skill, SkillKind, SkillRegistry};
3153 use crate::tools::ToolRegistry;
3154
3155 struct LeakyPlugin;
3156
3157 impl Plugin for LeakyPlugin {
3158 fn name(&self) -> &str {
3159 "leaky-plugin"
3160 }
3161
3162 fn version(&self) -> &str {
3163 "0.1.0"
3164 }
3165
3166 fn tool_names(&self) -> &[&str] {
3167 &[]
3168 }
3169
3170 fn load(
3171 &self,
3172 _registry: &Arc<ToolRegistry>,
3173 _ctx: &PluginContext,
3174 ) -> anyhow::Result<()> {
3175 Ok(())
3176 }
3177
3178 fn skills(&self) -> Vec<Arc<Skill>> {
3179 vec![Arc::new(Skill {
3180 name: "plugin-only".to_string(),
3181 description: "plugin only".to_string(),
3182 allowed_tools: None,
3183 disable_model_invocation: false,
3184 kind: SkillKind::Instruction,
3185 content: "plugin skill".to_string(),
3186 tags: vec![],
3187 version: None,
3188 })]
3189 }
3190 }
3191
3192 let mut agent = Agent::from_config(test_config()).await.unwrap();
3193 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3194
3195 let session_one = agent
3196 .session(
3197 "/tmp/test-workspace",
3198 Some(SessionOptions::new().with_plugin(LeakyPlugin)),
3199 )
3200 .unwrap();
3201 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3202
3203 assert!(session_one
3204 .config
3205 .skill_registry
3206 .as_ref()
3207 .unwrap()
3208 .get("plugin-only")
3209 .is_some());
3210 assert!(session_two
3211 .config
3212 .skill_registry
3213 .as_ref()
3214 .unwrap()
3215 .get("plugin-only")
3216 .is_none());
3217 }
3218
3219 #[tokio::test]
3220 async fn test_session_for_agent_applies_definition_and_keeps_skill_overrides_isolated() {
3221 use crate::skills::{Skill, SkillKind, SkillRegistry};
3222 use crate::subagent::AgentDefinition;
3223
3224 let mut agent = Agent::from_config(test_config()).await.unwrap();
3225 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3226
3227 let definition = AgentDefinition::new("reviewer", "Review code")
3228 .with_prompt("Agent definition prompt")
3229 .with_max_steps(7);
3230
3231 let session_registry = Arc::new(SkillRegistry::new());
3232 session_registry.register_unchecked(Arc::new(Skill {
3233 name: "agent-session-skill".to_string(),
3234 description: "agent session only".to_string(),
3235 allowed_tools: None,
3236 disable_model_invocation: false,
3237 kind: SkillKind::Instruction,
3238 content: "agent session content".to_string(),
3239 tags: vec![],
3240 version: None,
3241 }));
3242
3243 let session_one = agent
3244 .session_for_agent(
3245 "/tmp/test-workspace",
3246 &definition,
3247 Some(SessionOptions::new().with_skill_registry(session_registry)),
3248 )
3249 .unwrap();
3250 let session_two = agent
3251 .session_for_agent("/tmp/test-workspace", &definition, None)
3252 .unwrap();
3253
3254 assert_eq!(session_one.config.max_tool_rounds, 7);
3255 let extra = session_one.config.prompt_slots.extra.as_deref().unwrap();
3256 assert!(extra.contains("Agent definition prompt"));
3257 assert!(extra.contains("agent-session-skill"));
3258 assert!(session_one
3259 .config
3260 .skill_registry
3261 .as_ref()
3262 .unwrap()
3263 .get("agent-session-skill")
3264 .is_some());
3265 assert!(session_two
3266 .config
3267 .skill_registry
3268 .as_ref()
3269 .unwrap()
3270 .get("agent-session-skill")
3271 .is_none());
3272 }
3273
3274 #[tokio::test]
3275 async fn test_session_for_agent_preserves_existing_prompt_slots_when_injecting_definition_prompt(
3276 ) {
3277 use crate::prompts::SystemPromptSlots;
3278 use crate::subagent::AgentDefinition;
3279
3280 let agent = Agent::from_config(test_config()).await.unwrap();
3281 let definition = AgentDefinition::new("planner", "Plan work")
3282 .with_prompt("Definition extra prompt")
3283 .with_max_steps(3);
3284
3285 let opts = SessionOptions::new().with_prompt_slots(SystemPromptSlots {
3286 style: None,
3287 role: Some("Custom role".to_string()),
3288 guidelines: None,
3289 response_style: None,
3290 extra: None,
3291 });
3292
3293 let session = agent
3294 .session_for_agent("/tmp/test-workspace", &definition, Some(opts))
3295 .unwrap();
3296
3297 assert_eq!(
3298 session.config.prompt_slots.role.as_deref(),
3299 Some("Custom role")
3300 );
3301 assert!(session
3302 .config
3303 .prompt_slots
3304 .extra
3305 .as_deref()
3306 .unwrap()
3307 .contains("Definition extra prompt"));
3308 assert_eq!(session.config.max_tool_rounds, 3);
3309 }
3310
3311 #[tokio::test]
3312 async fn test_new_with_hcl_string() {
3313 let hcl = r#"
3314 default_model = "anthropic/claude-sonnet-4-20250514"
3315 providers {
3316 name = "anthropic"
3317 api_key = "test-key"
3318 models {
3319 id = "claude-sonnet-4-20250514"
3320 name = "Claude Sonnet 4"
3321 }
3322 }
3323 "#;
3324 let agent = Agent::new(hcl).await;
3325 assert!(agent.is_ok());
3326 }
3327
3328 #[tokio::test]
3329 async fn test_create_alias_hcl() {
3330 let hcl = r#"
3331 default_model = "anthropic/claude-sonnet-4-20250514"
3332 providers {
3333 name = "anthropic"
3334 api_key = "test-key"
3335 models {
3336 id = "claude-sonnet-4-20250514"
3337 name = "Claude Sonnet 4"
3338 }
3339 }
3340 "#;
3341 let agent = Agent::create(hcl).await;
3342 assert!(agent.is_ok());
3343 }
3344
3345 #[tokio::test]
3346 async fn test_create_and_new_produce_same_result() {
3347 let hcl = r#"
3348 default_model = "anthropic/claude-sonnet-4-20250514"
3349 providers {
3350 name = "anthropic"
3351 api_key = "test-key"
3352 models {
3353 id = "claude-sonnet-4-20250514"
3354 name = "Claude Sonnet 4"
3355 }
3356 }
3357 "#;
3358 let agent_new = Agent::new(hcl).await;
3359 let agent_create = Agent::create(hcl).await;
3360 assert!(agent_new.is_ok());
3361 assert!(agent_create.is_ok());
3362
3363 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
3365 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
3366 assert!(session_new.is_ok());
3367 assert!(session_create.is_ok());
3368 }
3369
3370 #[tokio::test]
3371 async fn test_new_with_existing_hcl_file_uses_file_loading() {
3372 let temp_dir = tempfile::tempdir().unwrap();
3373 let config_path = temp_dir.path().join("agent.hcl");
3374 std::fs::write(&config_path, "this is not valid hcl").unwrap();
3375
3376 let err = Agent::new(config_path.display().to_string())
3377 .await
3378 .unwrap_err();
3379 let msg = err.to_string();
3380
3381 assert!(msg.contains("Failed to load config"));
3382 assert!(msg.contains("agent.hcl"));
3383 assert!(!msg.contains("Failed to parse config as HCL string"));
3384 }
3385
3386 #[tokio::test]
3387 async fn test_new_with_missing_hcl_file_reports_not_found() {
3388 let temp_dir = tempfile::tempdir().unwrap();
3389 let missing_path = temp_dir.path().join("agent.hcl");
3390
3391 let err = Agent::new(missing_path.display().to_string())
3392 .await
3393 .unwrap_err();
3394 let msg = err.to_string();
3395
3396 assert!(msg.contains("Config file not found"));
3397 assert!(msg.contains("agent.hcl"));
3398 assert!(!msg.contains("Failed to parse config as HCL string"));
3399 }
3400
3401 #[test]
3402 fn test_from_config_requires_default_model() {
3403 let rt = tokio::runtime::Runtime::new().unwrap();
3404 let config = CodeConfig {
3405 providers: vec![ProviderConfig {
3406 name: "anthropic".to_string(),
3407 api_key: Some("test-key".to_string()),
3408 base_url: None,
3409 headers: std::collections::HashMap::new(),
3410 session_id_header: None,
3411 models: vec![],
3412 }],
3413 ..Default::default()
3414 };
3415 let result = rt.block_on(Agent::from_config(config));
3416 assert!(result.is_err());
3417 }
3418
3419 #[tokio::test]
3420 async fn test_history_empty_on_new_session() {
3421 let agent = Agent::from_config(test_config()).await.unwrap();
3422 let session = agent.session("/tmp/test-workspace", None).unwrap();
3423 assert!(session.history().is_empty());
3424 }
3425
3426 #[tokio::test]
3427 async fn test_session_options_with_agent_dir() {
3428 let opts = SessionOptions::new()
3429 .with_agent_dir("/tmp/agents")
3430 .with_agent_dir("/tmp/more-agents");
3431 assert_eq!(opts.agent_dirs.len(), 2);
3432 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
3433 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
3434 }
3435
3436 #[test]
3441 fn test_session_options_with_queue_config() {
3442 let qc = SessionQueueConfig::default().with_lane_features();
3443 let opts = SessionOptions::new().with_queue_config(qc.clone());
3444 assert!(opts.queue_config.is_some());
3445
3446 let config = opts.queue_config.unwrap();
3447 assert!(config.enable_dlq);
3448 assert!(config.enable_metrics);
3449 assert!(config.enable_alerts);
3450 assert_eq!(config.default_timeout_ms, Some(60_000));
3451 }
3452
3453 #[tokio::test(flavor = "multi_thread")]
3454 async fn test_session_with_queue_config() {
3455 let agent = Agent::from_config(test_config()).await.unwrap();
3456 let qc = SessionQueueConfig::default();
3457 let opts = SessionOptions::new().with_queue_config(qc);
3458 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
3459 assert!(session.is_ok());
3460 let session = session.unwrap();
3461 assert!(session.has_queue());
3462 }
3463
3464 #[tokio::test]
3465 async fn test_session_without_queue_config() {
3466 let agent = Agent::from_config(test_config()).await.unwrap();
3467 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
3468 assert!(!session.has_queue());
3469 }
3470
3471 #[tokio::test]
3472 async fn test_session_queue_stats_without_queue() {
3473 let agent = Agent::from_config(test_config()).await.unwrap();
3474 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
3475 let stats = session.queue_stats().await;
3476 assert_eq!(stats.total_pending, 0);
3478 assert_eq!(stats.total_active, 0);
3479 }
3480
3481 #[tokio::test(flavor = "multi_thread")]
3482 async fn test_session_queue_stats_with_queue() {
3483 let agent = Agent::from_config(test_config()).await.unwrap();
3484 let qc = SessionQueueConfig::default();
3485 let opts = SessionOptions::new().with_queue_config(qc);
3486 let session = agent
3487 .session("/tmp/test-workspace-qstats", Some(opts))
3488 .unwrap();
3489 let stats = session.queue_stats().await;
3490 assert_eq!(stats.total_pending, 0);
3492 assert_eq!(stats.total_active, 0);
3493 }
3494
3495 #[tokio::test(flavor = "multi_thread")]
3496 async fn test_session_pending_external_tasks_empty() {
3497 let agent = Agent::from_config(test_config()).await.unwrap();
3498 let qc = SessionQueueConfig::default();
3499 let opts = SessionOptions::new().with_queue_config(qc);
3500 let session = agent
3501 .session("/tmp/test-workspace-ext", Some(opts))
3502 .unwrap();
3503 let tasks = session.pending_external_tasks().await;
3504 assert!(tasks.is_empty());
3505 }
3506
3507 #[tokio::test(flavor = "multi_thread")]
3508 async fn test_session_dead_letters_empty() {
3509 let agent = Agent::from_config(test_config()).await.unwrap();
3510 let qc = SessionQueueConfig::default().with_dlq(Some(100));
3511 let opts = SessionOptions::new().with_queue_config(qc);
3512 let session = agent
3513 .session("/tmp/test-workspace-dlq", Some(opts))
3514 .unwrap();
3515 let dead = session.dead_letters().await;
3516 assert!(dead.is_empty());
3517 }
3518
3519 #[tokio::test(flavor = "multi_thread")]
3520 async fn test_session_queue_metrics_disabled() {
3521 let agent = Agent::from_config(test_config()).await.unwrap();
3522 let qc = SessionQueueConfig::default();
3524 let opts = SessionOptions::new().with_queue_config(qc);
3525 let session = agent
3526 .session("/tmp/test-workspace-nomet", Some(opts))
3527 .unwrap();
3528 let metrics = session.queue_metrics().await;
3529 assert!(metrics.is_none());
3530 }
3531
3532 #[tokio::test(flavor = "multi_thread")]
3533 async fn test_session_queue_metrics_enabled() {
3534 let agent = Agent::from_config(test_config()).await.unwrap();
3535 let qc = SessionQueueConfig::default().with_metrics();
3536 let opts = SessionOptions::new().with_queue_config(qc);
3537 let session = agent
3538 .session("/tmp/test-workspace-met", Some(opts))
3539 .unwrap();
3540 let metrics = session.queue_metrics().await;
3541 assert!(metrics.is_some());
3542 }
3543
3544 #[tokio::test(flavor = "multi_thread")]
3545 async fn test_session_set_lane_handler() {
3546 let agent = Agent::from_config(test_config()).await.unwrap();
3547 let qc = SessionQueueConfig::default();
3548 let opts = SessionOptions::new().with_queue_config(qc);
3549 let session = agent
3550 .session("/tmp/test-workspace-handler", Some(opts))
3551 .unwrap();
3552
3553 session
3555 .set_lane_handler(
3556 SessionLane::Execute,
3557 LaneHandlerConfig {
3558 mode: crate::queue::TaskHandlerMode::External,
3559 timeout_ms: 30_000,
3560 },
3561 )
3562 .await;
3563
3564 }
3567
3568 #[tokio::test(flavor = "multi_thread")]
3573 async fn test_session_has_id() {
3574 let agent = Agent::from_config(test_config()).await.unwrap();
3575 let session = agent.session("/tmp/test-ws-id", None).unwrap();
3576 assert!(!session.session_id().is_empty());
3578 assert_eq!(session.session_id().len(), 36); }
3580
3581 #[tokio::test(flavor = "multi_thread")]
3582 async fn test_session_explicit_id() {
3583 let agent = Agent::from_config(test_config()).await.unwrap();
3584 let opts = SessionOptions::new().with_session_id("my-session-42");
3585 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
3586 assert_eq!(session.session_id(), "my-session-42");
3587 }
3588
3589 #[tokio::test(flavor = "multi_thread")]
3590 async fn test_session_save_no_store() {
3591 let agent = Agent::from_config(test_config()).await.unwrap();
3592 let session = agent.session("/tmp/test-ws-save", None).unwrap();
3593 session.save().await.unwrap();
3595 }
3596
3597 #[tokio::test(flavor = "multi_thread")]
3598 async fn test_session_save_and_load() {
3599 let store = Arc::new(crate::store::MemorySessionStore::new());
3600 let agent = Agent::from_config(test_config()).await.unwrap();
3601
3602 let opts = SessionOptions::new()
3603 .with_session_store(store.clone())
3604 .with_session_id("persist-test");
3605 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
3606
3607 session.save().await.unwrap();
3609
3610 assert!(store.exists("persist-test").await.unwrap());
3612
3613 let data = store.load("persist-test").await.unwrap().unwrap();
3614 assert_eq!(data.id, "persist-test");
3615 assert!(data.messages.is_empty());
3616 }
3617
3618 #[tokio::test(flavor = "multi_thread")]
3619 async fn test_session_save_with_history() {
3620 let store = Arc::new(crate::store::MemorySessionStore::new());
3621 let agent = Agent::from_config(test_config()).await.unwrap();
3622
3623 let opts = SessionOptions::new()
3624 .with_session_store(store.clone())
3625 .with_session_id("history-test");
3626 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
3627
3628 {
3630 let mut h = session.history.write().unwrap();
3631 h.push(Message::user("Hello"));
3632 h.push(Message::user("How are you?"));
3633 }
3634
3635 session.save().await.unwrap();
3636
3637 let data = store.load("history-test").await.unwrap().unwrap();
3638 assert_eq!(data.messages.len(), 2);
3639 }
3640
3641 #[tokio::test(flavor = "multi_thread")]
3642 async fn test_resume_session() {
3643 let store = Arc::new(crate::store::MemorySessionStore::new());
3644 let agent = Agent::from_config(test_config()).await.unwrap();
3645
3646 let opts = SessionOptions::new()
3648 .with_session_store(store.clone())
3649 .with_session_id("resume-test");
3650 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
3651 {
3652 let mut h = session.history.write().unwrap();
3653 h.push(Message::user("What is Rust?"));
3654 h.push(Message::user("Tell me more"));
3655 }
3656 session.save().await.unwrap();
3657
3658 let opts2 = SessionOptions::new().with_session_store(store.clone());
3660 let resumed = agent.resume_session("resume-test", opts2).unwrap();
3661
3662 assert_eq!(resumed.session_id(), "resume-test");
3663 let history = resumed.history();
3664 assert_eq!(history.len(), 2);
3665 assert_eq!(history[0].text(), "What is Rust?");
3666 }
3667
3668 #[tokio::test(flavor = "multi_thread")]
3669 async fn test_resume_session_not_found() {
3670 let store = Arc::new(crate::store::MemorySessionStore::new());
3671 let agent = Agent::from_config(test_config()).await.unwrap();
3672
3673 let opts = SessionOptions::new().with_session_store(store.clone());
3674 let result = agent.resume_session("nonexistent", opts);
3675 assert!(result.is_err());
3676 assert!(result.unwrap_err().to_string().contains("not found"));
3677 }
3678
3679 #[tokio::test(flavor = "multi_thread")]
3680 async fn test_resume_session_no_store() {
3681 let agent = Agent::from_config(test_config()).await.unwrap();
3682 let opts = SessionOptions::new();
3683 let result = agent.resume_session("any-id", opts);
3684 assert!(result.is_err());
3685 assert!(result.unwrap_err().to_string().contains("session_store"));
3686 }
3687
3688 #[tokio::test(flavor = "multi_thread")]
3689 async fn test_file_session_store_persistence() {
3690 let dir = tempfile::TempDir::new().unwrap();
3691 let store = Arc::new(
3692 crate::store::FileSessionStore::new(dir.path())
3693 .await
3694 .unwrap(),
3695 );
3696 let agent = Agent::from_config(test_config()).await.unwrap();
3697
3698 let opts = SessionOptions::new()
3700 .with_session_store(store.clone())
3701 .with_session_id("file-persist");
3702 let session = agent
3703 .session("/tmp/test-ws-file-persist", Some(opts))
3704 .unwrap();
3705 {
3706 let mut h = session.history.write().unwrap();
3707 h.push(Message::user("test message"));
3708 }
3709 session.save().await.unwrap();
3710
3711 let store2 = Arc::new(
3713 crate::store::FileSessionStore::new(dir.path())
3714 .await
3715 .unwrap(),
3716 );
3717 let data = store2.load("file-persist").await.unwrap().unwrap();
3718 assert_eq!(data.messages.len(), 1);
3719 }
3720
3721 #[tokio::test(flavor = "multi_thread")]
3722 async fn test_session_options_builders() {
3723 let opts = SessionOptions::new()
3724 .with_session_id("test-id")
3725 .with_auto_save(true);
3726 assert_eq!(opts.session_id, Some("test-id".to_string()));
3727 assert!(opts.auto_save);
3728 }
3729
3730 #[test]
3735 fn test_session_options_with_sandbox_sets_config() {
3736 use crate::sandbox::SandboxConfig;
3737 let cfg = SandboxConfig {
3738 image: "ubuntu:22.04".into(),
3739 memory_mb: 1024,
3740 ..SandboxConfig::default()
3741 };
3742 let opts = SessionOptions::new().with_sandbox(cfg);
3743 assert!(opts.sandbox_config.is_some());
3744 let sc = opts.sandbox_config.unwrap();
3745 assert_eq!(sc.image, "ubuntu:22.04");
3746 assert_eq!(sc.memory_mb, 1024);
3747 }
3748
3749 #[test]
3750 fn test_session_options_default_has_no_sandbox() {
3751 let opts = SessionOptions::default();
3752 assert!(opts.sandbox_config.is_none());
3753 }
3754
3755 #[tokio::test]
3756 async fn test_session_debug_includes_sandbox_config() {
3757 use crate::sandbox::SandboxConfig;
3758 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
3759 let debug = format!("{:?}", opts);
3760 assert!(debug.contains("sandbox_config"));
3761 }
3762
3763 #[tokio::test]
3764 async fn test_session_build_with_sandbox_config_no_feature_warn() {
3765 let agent = Agent::from_config(test_config()).await.unwrap();
3768 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
3769 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
3771 assert!(session.is_ok());
3772 }
3773
3774 #[tokio::test(flavor = "multi_thread")]
3779 async fn test_session_with_memory_store() {
3780 use a3s_memory::InMemoryStore;
3781 let store = Arc::new(InMemoryStore::new());
3782 let agent = Agent::from_config(test_config()).await.unwrap();
3783 let opts = SessionOptions::new().with_memory(store);
3784 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
3785 assert!(session.memory().is_some());
3786 }
3787
3788 #[tokio::test(flavor = "multi_thread")]
3789 async fn test_session_without_memory_store() {
3790 let agent = Agent::from_config(test_config()).await.unwrap();
3791 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
3792 assert!(session.memory().is_none());
3793 }
3794
3795 #[tokio::test(flavor = "multi_thread")]
3796 async fn test_session_memory_wired_into_config() {
3797 use a3s_memory::InMemoryStore;
3798 let store = Arc::new(InMemoryStore::new());
3799 let agent = Agent::from_config(test_config()).await.unwrap();
3800 let opts = SessionOptions::new().with_memory(store);
3801 let session = agent
3802 .session("/tmp/test-ws-mem-config", Some(opts))
3803 .unwrap();
3804 assert!(session.memory().is_some());
3806 }
3807
3808 #[tokio::test(flavor = "multi_thread")]
3809 async fn test_session_with_file_memory() {
3810 let dir = tempfile::TempDir::new().unwrap();
3811 let agent = Agent::from_config(test_config()).await.unwrap();
3812 let opts = SessionOptions::new().with_file_memory(dir.path());
3813 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
3814 assert!(session.memory().is_some());
3815 }
3816
3817 #[tokio::test(flavor = "multi_thread")]
3818 async fn test_memory_remember_and_recall() {
3819 use a3s_memory::InMemoryStore;
3820 let store = Arc::new(InMemoryStore::new());
3821 let agent = Agent::from_config(test_config()).await.unwrap();
3822 let opts = SessionOptions::new().with_memory(store);
3823 let session = agent
3824 .session("/tmp/test-ws-mem-recall", Some(opts))
3825 .unwrap();
3826
3827 let memory = session.memory().unwrap();
3828 memory
3829 .remember_success("write a file", &["write".to_string()], "done")
3830 .await
3831 .unwrap();
3832
3833 let results = memory.recall_similar("write", 5).await.unwrap();
3834 assert!(!results.is_empty());
3835 let stats = memory.stats().await.unwrap();
3836 assert_eq!(stats.long_term_count, 1);
3837 }
3838
3839 #[tokio::test(flavor = "multi_thread")]
3844 async fn test_session_tool_timeout_configured() {
3845 let agent = Agent::from_config(test_config()).await.unwrap();
3846 let opts = SessionOptions::new().with_tool_timeout(5000);
3847 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
3848 assert!(!session.id().is_empty());
3849 }
3850
3851 #[tokio::test(flavor = "multi_thread")]
3856 async fn test_session_without_queue_builds_ok() {
3857 let agent = Agent::from_config(test_config()).await.unwrap();
3858 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
3859 assert!(!session.id().is_empty());
3860 }
3861
3862 #[tokio::test(flavor = "multi_thread")]
3867 async fn test_concurrent_history_reads() {
3868 let agent = Agent::from_config(test_config()).await.unwrap();
3869 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
3870
3871 let handles: Vec<_> = (0..10)
3872 .map(|_| {
3873 let s = Arc::clone(&session);
3874 tokio::spawn(async move { s.history().len() })
3875 })
3876 .collect();
3877
3878 for h in handles {
3879 h.await.unwrap();
3880 }
3881 }
3882
3883 #[tokio::test(flavor = "multi_thread")]
3888 async fn test_session_no_init_warning_without_file_memory() {
3889 let agent = Agent::from_config(test_config()).await.unwrap();
3890 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
3891 assert!(session.init_warning().is_none());
3892 }
3893
3894 #[tokio::test(flavor = "multi_thread")]
3895 async fn test_register_agent_dir_loads_agents_into_live_session() {
3896 let temp_dir = tempfile::tempdir().unwrap();
3897
3898 std::fs::write(
3900 temp_dir.path().join("my-agent.yaml"),
3901 "name: my-dynamic-agent\ndescription: Dynamically registered agent\n",
3902 )
3903 .unwrap();
3904
3905 let agent = Agent::from_config(test_config()).await.unwrap();
3906 let session = agent.session(".", None).unwrap();
3907
3908 assert!(!session.agent_registry.exists("my-dynamic-agent"));
3910
3911 let count = session.register_agent_dir(temp_dir.path());
3912 assert_eq!(count, 1);
3913 assert!(session.agent_registry.exists("my-dynamic-agent"));
3914 }
3915
3916 #[tokio::test(flavor = "multi_thread")]
3917 async fn test_register_agent_dir_empty_dir_returns_zero() {
3918 let temp_dir = tempfile::tempdir().unwrap();
3919 let agent = Agent::from_config(test_config()).await.unwrap();
3920 let session = agent.session(".", None).unwrap();
3921 let count = session.register_agent_dir(temp_dir.path());
3922 assert_eq!(count, 0);
3923 }
3924
3925 #[tokio::test(flavor = "multi_thread")]
3926 async fn test_register_agent_dir_nonexistent_returns_zero() {
3927 let agent = Agent::from_config(test_config()).await.unwrap();
3928 let session = agent.session(".", None).unwrap();
3929 let count = session.register_agent_dir(std::path::Path::new("/nonexistent/path/abc"));
3930 assert_eq!(count, 0);
3931 }
3932
3933 #[tokio::test(flavor = "multi_thread")]
3934 async fn test_session_with_mcp_manager_builds_ok() {
3935 use crate::mcp::manager::McpManager;
3936 let mcp = Arc::new(McpManager::new());
3937 let agent = Agent::from_config(test_config()).await.unwrap();
3938 let opts = SessionOptions::new().with_mcp(mcp);
3939 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
3941 assert!(!session.id().is_empty());
3942 }
3943
3944 #[test]
3945 fn test_session_command_is_pub() {
3946 use crate::SessionCommand;
3948 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
3949 }
3950
3951 #[tokio::test(flavor = "multi_thread")]
3952 async fn test_session_submit_with_queue_executes() {
3953 let agent = Agent::from_config(test_config()).await.unwrap();
3954 let qc = SessionQueueConfig::default();
3955 let opts = SessionOptions::new().with_queue_config(qc);
3956 let session = agent
3957 .session("/tmp/test-ws-submit-exec", Some(opts))
3958 .unwrap();
3959
3960 struct Echo(serde_json::Value);
3961 #[async_trait::async_trait]
3962 impl crate::queue::SessionCommand for Echo {
3963 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
3964 Ok(self.0.clone())
3965 }
3966 fn command_type(&self) -> &str {
3967 "echo"
3968 }
3969 }
3970
3971 let rx = session
3972 .submit(
3973 SessionLane::Query,
3974 Box::new(Echo(serde_json::json!({"ok": true}))),
3975 )
3976 .await
3977 .expect("submit should succeed with queue configured");
3978
3979 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
3980 .await
3981 .expect("timed out waiting for command result")
3982 .expect("channel closed before result")
3983 .expect("command returned an error");
3984
3985 assert_eq!(result["ok"], true);
3986 }
3987}