1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{
21 CommandAction, CommandContext, CommandRegistry, CronCancelCommand, CronListCommand, LoopCommand,
22};
23use crate::config::CodeConfig;
24use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
25use crate::hitl::PendingConfirmationInfo;
26use crate::llm::{LlmClient, Message};
27use crate::prompts::SystemPromptSlots;
28use crate::queue::{
29 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
30 SessionQueueStats,
31};
32use crate::scheduler::{CronScheduler, ScheduledFire};
33use crate::session_lane_queue::SessionLaneQueue;
34use crate::text::truncate_utf8;
35use crate::tools::{ToolContext, ToolExecutor};
36use a3s_lane::{DeadLetter, MetricsSnapshot};
37use a3s_memory::{FileMemoryStore, MemoryStore};
38use anyhow::Context;
39use std::collections::HashMap;
40use std::path::{Path, PathBuf};
41use std::sync::atomic::{AtomicBool, Ordering};
42use std::sync::{Arc, RwLock};
43use tokio::sync::{broadcast, mpsc};
44use tokio::task::JoinHandle;
45
46fn safe_canonicalize(path: &Path) -> PathBuf {
49 match std::fs::canonicalize(path) {
50 Ok(p) => strip_unc_prefix(p),
51 Err(_) => path.to_path_buf(),
52 }
53}
54
55fn strip_unc_prefix(path: PathBuf) -> PathBuf {
58 #[cfg(windows)]
59 {
60 let s = path.to_string_lossy();
61 if let Some(stripped) = s.strip_prefix(r"\\?\") {
62 return PathBuf::from(stripped);
63 }
64 }
65 path
66}
67
68#[derive(Debug, Clone)]
74pub struct ToolCallResult {
75 pub name: String,
76 pub output: String,
77 pub exit_code: i32,
78}
79
80#[derive(Clone, Default)]
86pub struct SessionOptions {
87 pub model: Option<String>,
89 pub agent_dirs: Vec<PathBuf>,
92 pub queue_config: Option<SessionQueueConfig>,
97 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
99 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
101 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
103 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
105 pub planning_enabled: bool,
107 pub goal_tracking: bool,
109 pub skill_dirs: Vec<PathBuf>,
112 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
114 pub memory_store: Option<Arc<dyn MemoryStore>>,
116 pub(crate) file_memory_dir: Option<PathBuf>,
118 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
120 pub session_id: Option<String>,
122 pub auto_save: bool,
124 pub max_parse_retries: Option<u32>,
127 pub tool_timeout_ms: Option<u64>,
130 pub circuit_breaker_threshold: Option<u32>,
134 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
142 pub sandbox_handle: Option<Arc<dyn crate::sandbox::BashSandbox>>,
148 pub auto_compact: bool,
150 pub auto_compact_threshold: Option<f32>,
153 pub continuation_enabled: Option<bool>,
156 pub max_continuation_turns: Option<u32>,
159 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
164 pub temperature: Option<f32>,
166 pub thinking_budget: Option<usize>,
168 pub max_tool_rounds: Option<usize>,
174 pub prompt_slots: Option<SystemPromptSlots>,
180 pub hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
187 pub document_parser_registry: Option<Arc<crate::document_parser::DocumentParserRegistry>>,
193 pub default_parser_ocr_provider:
199 Option<Arc<dyn crate::default_parser::DefaultParserOcrProvider>>,
200 pub plugins: Vec<std::sync::Arc<dyn crate::plugin::Plugin>>,
209}
210
211impl std::fmt::Debug for SessionOptions {
212 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
213 f.debug_struct("SessionOptions")
214 .field("model", &self.model)
215 .field("agent_dirs", &self.agent_dirs)
216 .field("skill_dirs", &self.skill_dirs)
217 .field("queue_config", &self.queue_config)
218 .field("security_provider", &self.security_provider.is_some())
219 .field("context_providers", &self.context_providers.len())
220 .field("confirmation_manager", &self.confirmation_manager.is_some())
221 .field("permission_checker", &self.permission_checker.is_some())
222 .field("planning_enabled", &self.planning_enabled)
223 .field("goal_tracking", &self.goal_tracking)
224 .field(
225 "skill_registry",
226 &self
227 .skill_registry
228 .as_ref()
229 .map(|r| format!("{} skills", r.len())),
230 )
231 .field("memory_store", &self.memory_store.is_some())
232 .field("session_store", &self.session_store.is_some())
233 .field("session_id", &self.session_id)
234 .field("auto_save", &self.auto_save)
235 .field("max_parse_retries", &self.max_parse_retries)
236 .field("tool_timeout_ms", &self.tool_timeout_ms)
237 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
238 .field("sandbox_config", &self.sandbox_config)
239 .field("auto_compact", &self.auto_compact)
240 .field("auto_compact_threshold", &self.auto_compact_threshold)
241 .field("continuation_enabled", &self.continuation_enabled)
242 .field("max_continuation_turns", &self.max_continuation_turns)
243 .field(
244 "plugins",
245 &self.plugins.iter().map(|p| p.name()).collect::<Vec<_>>(),
246 )
247 .field("mcp_manager", &self.mcp_manager.is_some())
248 .field("temperature", &self.temperature)
249 .field("thinking_budget", &self.thinking_budget)
250 .field("max_tool_rounds", &self.max_tool_rounds)
251 .field("prompt_slots", &self.prompt_slots.is_some())
252 .field(
253 "default_parser_ocr_provider",
254 &self.default_parser_ocr_provider.is_some(),
255 )
256 .finish()
257 }
258}
259
260impl SessionOptions {
261 pub fn new() -> Self {
262 Self::default()
263 }
264
265 pub fn with_plugin(mut self, plugin: impl crate::plugin::Plugin + 'static) -> Self {
270 self.plugins.push(std::sync::Arc::new(plugin));
271 self
272 }
273
274 pub fn with_model(mut self, model: impl Into<String>) -> Self {
275 self.model = Some(model.into());
276 self
277 }
278
279 pub fn with_default_parser_ocr_provider(
281 mut self,
282 provider: Arc<dyn crate::default_parser::DefaultParserOcrProvider>,
283 ) -> Self {
284 self.default_parser_ocr_provider = Some(provider);
285 self
286 }
287
288 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
289 self.agent_dirs.push(dir.into());
290 self
291 }
292
293 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
294 self.queue_config = Some(config);
295 self
296 }
297
298 pub fn with_default_security(mut self) -> Self {
300 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
301 self
302 }
303
304 pub fn with_security_provider(
306 mut self,
307 provider: Arc<dyn crate::security::SecurityProvider>,
308 ) -> Self {
309 self.security_provider = Some(provider);
310 self
311 }
312
313 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
315 let config = crate::context::FileSystemContextConfig::new(root_path);
316 self.context_providers
317 .push(Arc::new(crate::context::FileSystemContextProvider::new(
318 config,
319 )));
320 self
321 }
322
323 pub fn with_context_provider(
325 mut self,
326 provider: Arc<dyn crate::context::ContextProvider>,
327 ) -> Self {
328 self.context_providers.push(provider);
329 self
330 }
331
332 pub fn with_confirmation_manager(
334 mut self,
335 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
336 ) -> Self {
337 self.confirmation_manager = Some(manager);
338 self
339 }
340
341 pub fn with_permission_checker(
343 mut self,
344 checker: Arc<dyn crate::permissions::PermissionChecker>,
345 ) -> Self {
346 self.permission_checker = Some(checker);
347 self
348 }
349
350 pub fn with_permissive_policy(self) -> Self {
357 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
358 }
359
360 pub fn with_planning(mut self, enabled: bool) -> Self {
362 self.planning_enabled = enabled;
363 self
364 }
365
366 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
368 self.goal_tracking = enabled;
369 self
370 }
371
372 pub fn with_builtin_skills(mut self) -> Self {
374 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
375 self
376 }
377
378 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
380 self.skill_registry = Some(registry);
381 self
382 }
383
384 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
387 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
388 self
389 }
390
391 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
393 let registry = self
394 .skill_registry
395 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
396 if let Err(e) = registry.load_from_dir(&dir) {
397 tracing::warn!(
398 dir = %dir.as_ref().display(),
399 error = %e,
400 "Failed to load skills from directory — continuing without them"
401 );
402 }
403 self.skill_registry = Some(registry);
404 self
405 }
406
407 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
409 self.memory_store = Some(store);
410 self
411 }
412
413 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
419 self.file_memory_dir = Some(dir.into());
420 self
421 }
422
423 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
425 self.session_store = Some(store);
426 self
427 }
428
429 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
431 let dir = dir.into();
432 match tokio::runtime::Handle::try_current() {
433 Ok(handle) => {
434 match tokio::task::block_in_place(|| {
435 handle.block_on(crate::store::FileSessionStore::new(dir))
436 }) {
437 Ok(store) => {
438 self.session_store =
439 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
440 }
441 Err(e) => {
442 tracing::warn!("Failed to create file session store: {}", e);
443 }
444 }
445 }
446 Err(_) => {
447 tracing::warn!(
448 "No async runtime available for file session store — persistence disabled"
449 );
450 }
451 }
452 self
453 }
454
455 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
457 self.session_id = Some(id.into());
458 self
459 }
460
461 pub fn with_auto_save(mut self, enabled: bool) -> Self {
463 self.auto_save = enabled;
464 self
465 }
466
467 pub fn with_parse_retries(mut self, max: u32) -> Self {
473 self.max_parse_retries = Some(max);
474 self
475 }
476
477 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
483 self.tool_timeout_ms = Some(timeout_ms);
484 self
485 }
486
487 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
493 self.circuit_breaker_threshold = Some(threshold);
494 self
495 }
496
497 pub fn with_resilience_defaults(self) -> Self {
503 self.with_parse_retries(2)
504 .with_tool_timeout(120_000)
505 .with_circuit_breaker(3)
506 }
507
508 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
527 self.sandbox_config = Some(config);
528 self
529 }
530
531 pub fn with_sandbox_handle(mut self, handle: Arc<dyn crate::sandbox::BashSandbox>) -> Self {
539 self.sandbox_handle = Some(handle);
540 self
541 }
542
543 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
548 self.auto_compact = enabled;
549 self
550 }
551
552 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
554 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
555 self
556 }
557
558 pub fn with_continuation(mut self, enabled: bool) -> Self {
563 self.continuation_enabled = Some(enabled);
564 self
565 }
566
567 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
569 self.max_continuation_turns = Some(turns);
570 self
571 }
572
573 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
578 self.mcp_manager = Some(manager);
579 self
580 }
581
582 pub fn with_temperature(mut self, temperature: f32) -> Self {
583 self.temperature = Some(temperature);
584 self
585 }
586
587 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
588 self.thinking_budget = Some(budget);
589 self
590 }
591
592 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
597 self.max_tool_rounds = Some(rounds);
598 self
599 }
600
601 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
606 self.prompt_slots = Some(slots);
607 self
608 }
609
610 pub fn with_hook_executor(mut self, executor: Arc<dyn crate::hooks::HookExecutor>) -> Self {
616 self.hook_executor = Some(executor);
617 self
618 }
619}
620
621pub struct Agent {
630 llm_client: Arc<dyn LlmClient>,
631 code_config: CodeConfig,
632 config: AgentConfig,
633 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
635 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
638}
639
640impl std::fmt::Debug for Agent {
641 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
642 f.debug_struct("Agent").finish()
643 }
644}
645
646impl Agent {
647 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
651 let source = config_source.into();
652
653 let expanded = if let Some(rest) = source.strip_prefix("~/") {
655 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
656 if let Some(home) = home {
657 PathBuf::from(home).join(rest).display().to_string()
658 } else {
659 source.clone()
660 }
661 } else {
662 source.clone()
663 };
664
665 let path = Path::new(&expanded);
666
667 let config = if matches!(
668 path.extension().and_then(|ext| ext.to_str()),
669 Some("hcl" | "json")
670 ) {
671 if !path.exists() {
672 return Err(CodeError::Config(format!(
673 "Config file not found: {}",
674 path.display()
675 )));
676 }
677
678 CodeConfig::from_file(path)
679 .with_context(|| format!("Failed to load config: {}", path.display()))?
680 } else {
681 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
683 };
684
685 Self::from_config(config).await
686 }
687
688 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
693 Self::new(config_source).await
694 }
695
696 pub async fn from_config(config: CodeConfig) -> Result<Self> {
698 let llm_config = config
699 .default_llm_config()
700 .context("default_model must be set in 'provider/model' format with a valid API key")?;
701 let llm_client = crate::llm::create_client_with_config(llm_config);
702
703 let agent_config = AgentConfig {
704 max_tool_rounds: config
705 .max_tool_rounds
706 .unwrap_or(AgentConfig::default().max_tool_rounds),
707 ..AgentConfig::default()
708 };
709
710 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
712 (None, vec![])
713 } else {
714 let manager = Arc::new(crate::mcp::manager::McpManager::new());
715 for server in &config.mcp_servers {
716 if !server.enabled {
717 continue;
718 }
719 manager.register_server(server.clone()).await;
720 if let Err(e) = manager.connect(&server.name).await {
721 tracing::warn!(
722 server = %server.name,
723 error = %e,
724 "Failed to connect to MCP server — skipping"
725 );
726 }
727 }
728 let tools = manager.get_all_tools().await;
730 (Some(manager), tools)
731 };
732
733 let mut agent = Agent {
734 llm_client,
735 code_config: config,
736 config: agent_config,
737 global_mcp,
738 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
739 };
740
741 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
743 for dir in &agent.code_config.skill_dirs.clone() {
744 if let Err(e) = registry.load_from_dir(dir) {
745 tracing::warn!(
746 dir = %dir.display(),
747 error = %e,
748 "Failed to load skills from directory — skipping"
749 );
750 }
751 }
752 agent.config.skill_registry = Some(registry);
753
754 Ok(agent)
755 }
756
757 pub async fn refresh_mcp_tools(&self) -> Result<()> {
765 if let Some(ref mcp) = self.global_mcp {
766 let fresh = mcp.get_all_tools().await;
767 *self
768 .global_mcp_tools
769 .lock()
770 .expect("global_mcp_tools lock poisoned") = fresh;
771 }
772 Ok(())
773 }
774
775 pub fn session(
780 &self,
781 workspace: impl Into<String>,
782 options: Option<SessionOptions>,
783 ) -> Result<AgentSession> {
784 let opts = options.unwrap_or_default();
785
786 let llm_client = if let Some(ref model) = opts.model {
787 let (provider_name, model_id) = model
788 .split_once('/')
789 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
790
791 let mut llm_config = self
792 .code_config
793 .llm_config(provider_name, model_id)
794 .with_context(|| {
795 format!("provider '{provider_name}' or model '{model_id}' not found in config")
796 })?;
797
798 if let Some(temp) = opts.temperature {
799 llm_config = llm_config.with_temperature(temp);
800 }
801 if let Some(budget) = opts.thinking_budget {
802 llm_config = llm_config.with_thinking_budget(budget);
803 }
804
805 crate::llm::create_client_with_config(llm_config)
806 } else {
807 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
808 tracing::warn!(
809 "temperature/thinking_budget set without model override — these will be ignored. \
810 Use with_model() to apply LLM parameter overrides."
811 );
812 }
813 self.llm_client.clone()
814 };
815
816 let merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
819 (Some(global), Some(session)) => {
820 let global = Arc::clone(global);
821 let session_mgr = Arc::clone(session);
822 match tokio::runtime::Handle::try_current() {
823 Ok(handle) => {
824 let global_for_merge = Arc::clone(&global);
825 tokio::task::block_in_place(|| {
826 handle.block_on(async move {
827 for config in session_mgr.all_configs().await {
828 let name = config.name.clone();
829 global_for_merge.register_server(config).await;
830 if let Err(e) = global_for_merge.connect(&name).await {
831 tracing::warn!(
832 server = %name,
833 error = %e,
834 "Failed to connect session-level MCP server — skipping"
835 );
836 }
837 }
838 })
839 });
840 }
841 Err(_) => {
842 tracing::warn!(
843 "No async runtime available to merge session-level MCP servers \
844 into global manager — session MCP servers will not be available"
845 );
846 }
847 }
848 SessionOptions {
849 mcp_manager: Some(Arc::clone(&global)),
850 ..opts
851 }
852 }
853 (Some(global), None) => SessionOptions {
854 mcp_manager: Some(Arc::clone(global)),
855 ..opts
856 },
857 _ => opts,
858 };
859
860 self.build_session(workspace.into(), llm_client, &merged_opts)
861 }
862
863 pub fn session_for_agent(
878 &self,
879 workspace: impl Into<String>,
880 def: &crate::subagent::AgentDefinition,
881 extra: Option<SessionOptions>,
882 ) -> Result<AgentSession> {
883 let mut opts = extra.unwrap_or_default();
884
885 if opts.permission_checker.is_none()
887 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
888 {
889 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
890 }
891
892 if opts.max_tool_rounds.is_none() {
894 if let Some(steps) = def.max_steps {
895 opts.max_tool_rounds = Some(steps);
896 }
897 }
898
899 if opts.model.is_none() {
901 if let Some(ref m) = def.model {
902 let provider = m.provider.as_deref().unwrap_or("anthropic");
903 opts.model = Some(format!("{}/{}", provider, m.model));
904 }
905 }
906
907 if let Some(ref prompt) = def.prompt {
914 let slots = opts
915 .prompt_slots
916 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
917 if slots.extra.is_none() {
918 slots.extra = Some(prompt.clone());
919 }
920 }
921
922 self.session(workspace, Some(opts))
923 }
924
925 pub fn resume_session(
933 &self,
934 session_id: &str,
935 options: SessionOptions,
936 ) -> Result<AgentSession> {
937 let store = options.session_store.as_ref().ok_or_else(|| {
938 crate::error::CodeError::Session(
939 "resume_session requires a session_store in SessionOptions".to_string(),
940 )
941 })?;
942
943 let data = match tokio::runtime::Handle::try_current() {
945 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
946 .map_err(|e| {
947 crate::error::CodeError::Session(format!(
948 "Failed to load session {}: {}",
949 session_id, e
950 ))
951 })?,
952 Err(_) => {
953 return Err(crate::error::CodeError::Session(
954 "No async runtime available for session resume".to_string(),
955 ))
956 }
957 };
958
959 let data = data.ok_or_else(|| {
960 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
961 })?;
962
963 let mut opts = options;
965 opts.session_id = Some(data.id.clone());
966
967 let llm_client = if let Some(ref model) = opts.model {
968 let (provider_name, model_id) = model
969 .split_once('/')
970 .context("model format must be 'provider/model'")?;
971 let llm_config = self
972 .code_config
973 .llm_config(provider_name, model_id)
974 .with_context(|| {
975 format!("provider '{provider_name}' or model '{model_id}' not found")
976 })?;
977 crate::llm::create_client_with_config(llm_config)
978 } else {
979 self.llm_client.clone()
980 };
981
982 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
983
984 *write_or_recover(&session.history) = data.messages;
986
987 Ok(session)
988 }
989
990 fn build_session(
991 &self,
992 workspace: String,
993 llm_client: Arc<dyn LlmClient>,
994 opts: &SessionOptions,
995 ) -> Result<AgentSession> {
996 let canonical = safe_canonicalize(Path::new(&workspace));
997
998 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
999 let search_builtin_cfg = self.code_config.agentic_search.clone().unwrap_or_default();
1000 let parse_builtin_cfg = self.code_config.agentic_parse.clone().unwrap_or_default();
1001 let default_parser_cfg = self.code_config.default_parser.clone().unwrap_or_default();
1002 let effective_document_parser_registry =
1003 opts.document_parser_registry.clone().unwrap_or_else(|| {
1004 Arc::new(
1005 crate::document_parser::DocumentParserRegistry::new_with_default_parser(
1006 default_parser_cfg,
1007 opts.default_parser_ocr_provider.clone(),
1008 ),
1009 )
1010 });
1011
1012 {
1014 use crate::tools::register_agentic_tools;
1015 register_agentic_tools(
1016 tool_executor.registry(),
1017 Some(Arc::clone(&llm_client)),
1018 search_builtin_cfg.enabled,
1019 parse_builtin_cfg.enabled,
1020 );
1021 }
1022
1023 tool_executor
1025 .registry()
1026 .set_document_parsers(Arc::clone(&effective_document_parser_registry));
1027 if let Some(ref search_config) = self.code_config.search {
1028 tool_executor
1029 .registry()
1030 .set_search_config(search_config.clone());
1031 }
1032 tool_executor
1033 .registry()
1034 .set_agentic_search_config(search_builtin_cfg.clone());
1035 tool_executor
1036 .registry()
1037 .set_agentic_parse_config(parse_builtin_cfg.clone());
1038 tool_executor
1039 .registry()
1040 .set_default_parser_config(self.code_config.default_parser.clone().unwrap_or_default());
1041
1042 let agent_registry = {
1046 use crate::subagent::{load_agents_from_dir, AgentRegistry};
1047 use crate::tools::register_task_with_mcp;
1048 let registry = AgentRegistry::new();
1049 for dir in self
1050 .code_config
1051 .agent_dirs
1052 .iter()
1053 .chain(opts.agent_dirs.iter())
1054 {
1055 for agent in load_agents_from_dir(dir) {
1056 registry.register(agent);
1057 }
1058 }
1059 let registry = Arc::new(registry);
1060 register_task_with_mcp(
1061 tool_executor.registry(),
1062 Arc::clone(&llm_client),
1063 Arc::clone(®istry),
1064 canonical.display().to_string(),
1065 opts.mcp_manager.clone(),
1066 );
1067 registry
1068 };
1069
1070 if let Some(ref mcp) = opts.mcp_manager {
1073 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
1076 Arc::as_ptr(mcp),
1077 self.global_mcp
1078 .as_ref()
1079 .map(Arc::as_ptr)
1080 .unwrap_or(std::ptr::null()),
1081 ) {
1082 self.global_mcp_tools
1084 .lock()
1085 .expect("global_mcp_tools lock poisoned")
1086 .clone()
1087 } else {
1088 match tokio::runtime::Handle::try_current() {
1090 Ok(handle) => {
1091 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
1092 }
1093 Err(_) => {
1094 tracing::warn!(
1095 "No async runtime available for session-level MCP tools — \
1096 MCP tools will not be registered"
1097 );
1098 vec![]
1099 }
1100 }
1101 };
1102
1103 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
1104 std::collections::HashMap::new();
1105 for (server, tool) in all_tools {
1106 by_server.entry(server).or_default().push(tool);
1107 }
1108 for (server_name, tools) in by_server {
1109 for tool in
1110 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
1111 {
1112 tool_executor.register_dynamic_tool(tool);
1113 }
1114 }
1115 }
1116
1117 let tool_defs = tool_executor.definitions();
1118
1119 let mut prompt_slots = opts
1121 .prompt_slots
1122 .clone()
1123 .unwrap_or_else(|| self.config.prompt_slots.clone());
1124
1125 let agents_md_path = canonical.join("AGENTS.md");
1127 if agents_md_path.exists() && agents_md_path.is_file() {
1128 match std::fs::read_to_string(&agents_md_path) {
1129 Ok(content) if !content.trim().is_empty() => {
1130 tracing::info!(
1131 path = %agents_md_path.display(),
1132 "Auto-loaded AGENTS.md from workspace root"
1133 );
1134 prompt_slots.extra = match prompt_slots.extra {
1135 Some(existing) => Some(format!(
1136 "{}\n\n# Project Instructions (AGENTS.md)\n\n{}",
1137 existing, content
1138 )),
1139 None => Some(format!("# Project Instructions (AGENTS.md)\n\n{}", content)),
1140 };
1141 }
1142 Ok(_) => {
1143 tracing::debug!(
1144 path = %agents_md_path.display(),
1145 "AGENTS.md exists but is empty — skipping"
1146 );
1147 }
1148 Err(e) => {
1149 tracing::warn!(
1150 path = %agents_md_path.display(),
1151 error = %e,
1152 "Failed to read AGENTS.md — skipping"
1153 );
1154 }
1155 }
1156 }
1157
1158 let base_registry = self
1162 .config
1163 .skill_registry
1164 .as_deref()
1165 .map(|r| r.fork())
1166 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
1167 if let Some(ref r) = opts.skill_registry {
1169 for skill in r.all() {
1170 base_registry.register_unchecked(skill);
1171 }
1172 }
1173 for dir in &opts.skill_dirs {
1175 if let Err(e) = base_registry.load_from_dir(dir) {
1176 tracing::warn!(
1177 dir = %dir.display(),
1178 error = %e,
1179 "Failed to load session skill dir — skipping"
1180 );
1181 }
1182 }
1183 let effective_registry = Arc::new(base_registry);
1184
1185 if !opts.plugins.is_empty() {
1188 use crate::plugin::PluginContext;
1189 let plugin_ctx = PluginContext::new()
1190 .with_llm(Arc::clone(&self.llm_client))
1191 .with_skill_registry(Arc::clone(&effective_registry))
1192 .with_document_parsers(Arc::clone(&effective_document_parser_registry));
1193 let plugin_registry = tool_executor.registry();
1194 for plugin in &opts.plugins {
1195 tracing::info!("Loading plugin '{}' v{}", plugin.name(), plugin.version());
1196 match plugin.load(plugin_registry, &plugin_ctx) {
1197 Ok(()) => {
1198 for skill in plugin.skills() {
1199 tracing::debug!(
1200 "Plugin '{}' registered skill '{}'",
1201 plugin.name(),
1202 skill.name
1203 );
1204 effective_registry.register_unchecked(skill);
1205 }
1206 }
1207 Err(e) => {
1208 tracing::error!("Plugin '{}' failed to load: {}", plugin.name(), e);
1209 }
1210 }
1211 }
1212 }
1213
1214 let skill_prompt = effective_registry.to_system_prompt();
1216 if !skill_prompt.is_empty() {
1217 prompt_slots.extra = match prompt_slots.extra {
1218 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
1219 None => Some(skill_prompt),
1220 };
1221 }
1222
1223 let mut init_warning: Option<String> = None;
1225 let memory = {
1226 let store = if let Some(ref store) = opts.memory_store {
1227 Some(Arc::clone(store))
1228 } else if let Some(ref dir) = opts.file_memory_dir {
1229 match tokio::runtime::Handle::try_current() {
1230 Ok(handle) => {
1231 let dir = dir.clone();
1232 match tokio::task::block_in_place(|| {
1233 handle.block_on(FileMemoryStore::new(dir))
1234 }) {
1235 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1236 Err(e) => {
1237 let msg = format!("Failed to create file memory store: {}", e);
1238 tracing::warn!("{}", msg);
1239 init_warning = Some(msg);
1240 None
1241 }
1242 }
1243 }
1244 Err(_) => {
1245 let msg =
1246 "No async runtime available for file memory store — memory disabled"
1247 .to_string();
1248 tracing::warn!("{}", msg);
1249 init_warning = Some(msg);
1250 None
1251 }
1252 }
1253 } else {
1254 None
1255 };
1256 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1257 };
1258
1259 let base = self.config.clone();
1260 let config = AgentConfig {
1261 prompt_slots,
1262 tools: tool_defs,
1263 security_provider: opts.security_provider.clone(),
1264 permission_checker: opts.permission_checker.clone(),
1265 confirmation_manager: opts.confirmation_manager.clone(),
1266 context_providers: opts.context_providers.clone(),
1267 planning_enabled: opts.planning_enabled,
1268 goal_tracking: opts.goal_tracking,
1269 skill_registry: Some(Arc::clone(&effective_registry)),
1270 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1271 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1272 circuit_breaker_threshold: opts
1273 .circuit_breaker_threshold
1274 .unwrap_or(base.circuit_breaker_threshold),
1275 auto_compact: opts.auto_compact,
1276 auto_compact_threshold: opts
1277 .auto_compact_threshold
1278 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
1279 max_context_tokens: base.max_context_tokens,
1280 llm_client: Some(Arc::clone(&llm_client)),
1281 memory: memory.clone(),
1282 continuation_enabled: opts
1283 .continuation_enabled
1284 .unwrap_or(base.continuation_enabled),
1285 max_continuation_turns: opts
1286 .max_continuation_turns
1287 .unwrap_or(base.max_continuation_turns),
1288 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1289 ..base
1290 };
1291
1292 {
1296 use crate::tools::register_skill;
1297 register_skill(
1298 tool_executor.registry(),
1299 Arc::clone(&llm_client),
1300 Arc::clone(&effective_registry),
1301 Arc::clone(&tool_executor),
1302 config.clone(),
1303 );
1304 }
1305
1306 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1309 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1310 let session_id = uuid::Uuid::new_v4().to_string();
1311 let rt = tokio::runtime::Handle::try_current();
1312
1313 match rt {
1314 Ok(handle) => {
1315 let queue = tokio::task::block_in_place(|| {
1317 handle.block_on(SessionLaneQueue::new(
1318 &session_id,
1319 queue_config.clone(),
1320 agent_event_tx.clone(),
1321 ))
1322 });
1323 match queue {
1324 Ok(q) => {
1325 let q = Arc::new(q);
1327 let q2 = Arc::clone(&q);
1328 tokio::task::block_in_place(|| {
1329 handle.block_on(async { q2.start().await.ok() })
1330 });
1331 Some(q)
1332 }
1333 Err(e) => {
1334 tracing::warn!("Failed to create session lane queue: {}", e);
1335 None
1336 }
1337 }
1338 }
1339 Err(_) => {
1340 tracing::warn!(
1341 "No async runtime available for queue creation — queue disabled"
1342 );
1343 None
1344 }
1345 }
1346 } else {
1347 None
1348 };
1349
1350 let mut tool_context = ToolContext::new(canonical.clone());
1352 if let Some(ref search_config) = self.code_config.search {
1353 tool_context = tool_context.with_search_config(search_config.clone());
1354 }
1355 tool_context = tool_context.with_agentic_search_config(search_builtin_cfg);
1356 tool_context = tool_context.with_agentic_parse_config(parse_builtin_cfg);
1357 tool_context = tool_context.with_default_parser_config(
1358 self.code_config.default_parser.clone().unwrap_or_default(),
1359 );
1360 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1361
1362 tool_context =
1364 tool_context.with_document_parsers(Arc::clone(&effective_document_parser_registry));
1365
1366 if let Some(handle) = opts.sandbox_handle.clone() {
1368 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1369 tool_context = tool_context.with_sandbox(handle);
1370 } else if opts.sandbox_config.is_some() {
1371 tracing::warn!(
1372 "sandbox_config is set but no sandbox_handle was provided \
1373 — bash commands will run locally"
1374 );
1375 }
1376
1377 let session_id = opts
1378 .session_id
1379 .clone()
1380 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1381
1382 let session_store = if opts.session_store.is_some() {
1384 opts.session_store.clone()
1385 } else if let Some(ref dir) = self.code_config.sessions_dir {
1386 match tokio::runtime::Handle::try_current() {
1387 Ok(handle) => {
1388 let dir = dir.clone();
1389 match tokio::task::block_in_place(|| {
1390 handle.block_on(crate::store::FileSessionStore::new(dir))
1391 }) {
1392 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1393 Err(e) => {
1394 tracing::warn!(
1395 "Failed to create session store from sessions_dir: {}",
1396 e
1397 );
1398 None
1399 }
1400 }
1401 }
1402 Err(_) => {
1403 tracing::warn!(
1404 "No async runtime for sessions_dir store — persistence disabled"
1405 );
1406 None
1407 }
1408 }
1409 } else {
1410 None
1411 };
1412
1413 let (cron_scheduler, cron_rx) = CronScheduler::new();
1417 let mut command_registry = CommandRegistry::new();
1418 command_registry.register(Arc::new(LoopCommand {
1419 scheduler: Arc::clone(&cron_scheduler),
1420 }));
1421 command_registry.register(Arc::new(CronListCommand {
1422 scheduler: Arc::clone(&cron_scheduler),
1423 }));
1424 command_registry.register(Arc::new(CronCancelCommand {
1425 scheduler: Arc::clone(&cron_scheduler),
1426 }));
1427
1428 Ok(AgentSession {
1429 llm_client,
1430 tool_executor,
1431 tool_context,
1432 memory: config.memory.clone(),
1433 config,
1434 workspace: canonical,
1435 session_id,
1436 history: RwLock::new(Vec::new()),
1437 command_queue,
1438 session_store,
1439 auto_save: opts.auto_save,
1440 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1441 ahp_executor: opts.hook_executor.clone(),
1442 init_warning,
1443 command_registry: std::sync::Mutex::new(command_registry),
1444 model_name: opts
1445 .model
1446 .clone()
1447 .or_else(|| self.code_config.default_model.clone())
1448 .unwrap_or_else(|| "unknown".to_string()),
1449 mcp_manager: opts
1450 .mcp_manager
1451 .clone()
1452 .or_else(|| self.global_mcp.clone())
1453 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1454 agent_registry,
1455 cron_scheduler,
1456 cron_rx: tokio::sync::Mutex::new(cron_rx),
1457 is_processing_cron: AtomicBool::new(false),
1458 cron_started: AtomicBool::new(false),
1459 cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
1460 active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
1461 })
1462 }
1463}
1464
1465#[derive(Debug, Clone)]
1474pub struct BtwResult {
1475 pub question: String,
1477 pub answer: String,
1479 pub usage: crate::llm::TokenUsage,
1481}
1482
1483pub struct AgentSession {
1492 llm_client: Arc<dyn LlmClient>,
1493 tool_executor: Arc<ToolExecutor>,
1494 tool_context: ToolContext,
1495 config: AgentConfig,
1496 workspace: PathBuf,
1497 session_id: String,
1499 history: RwLock<Vec<Message>>,
1501 command_queue: Option<Arc<SessionLaneQueue>>,
1503 memory: Option<Arc<crate::memory::AgentMemory>>,
1505 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1507 auto_save: bool,
1509 hook_engine: Arc<crate::hooks::HookEngine>,
1511 ahp_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
1514 init_warning: Option<String>,
1516 command_registry: std::sync::Mutex<CommandRegistry>,
1519 model_name: String,
1521 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1523 agent_registry: Arc<crate::subagent::AgentRegistry>,
1525 cron_scheduler: Arc<CronScheduler>,
1527 cron_rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<ScheduledFire>>,
1529 is_processing_cron: AtomicBool,
1531 cron_started: AtomicBool,
1535 cancel_token: Arc<tokio::sync::Mutex<Option<tokio_util::sync::CancellationToken>>>,
1538 active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1540}
1541
1542#[derive(Debug, Clone)]
1543struct ActiveToolSnapshot {
1544 tool_name: String,
1545 started_at_ms: u64,
1546}
1547
1548impl std::fmt::Debug for AgentSession {
1549 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1550 f.debug_struct("AgentSession")
1551 .field("session_id", &self.session_id)
1552 .field("workspace", &self.workspace.display().to_string())
1553 .field("auto_save", &self.auto_save)
1554 .finish()
1555 }
1556}
1557
1558impl AgentSession {
1559 fn now_ms() -> u64 {
1560 std::time::SystemTime::now()
1561 .duration_since(std::time::UNIX_EPOCH)
1562 .map(|d| d.as_millis() as u64)
1563 .unwrap_or(0)
1564 }
1565
1566 fn compact_json_value(value: &serde_json::Value) -> String {
1567 let raw = match value {
1568 serde_json::Value::Null => String::new(),
1569 serde_json::Value::String(s) => s.clone(),
1570 _ => serde_json::to_string(value).unwrap_or_default(),
1571 };
1572 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1573 if compact.len() > 180 {
1574 format!("{}...", truncate_utf8(&compact, 180))
1575 } else {
1576 compact
1577 }
1578 }
1579
1580 async fn apply_runtime_event(
1581 active_tools: &Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1582 event: &AgentEvent,
1583 ) {
1584 match event {
1585 AgentEvent::ToolStart { id, name } => {
1586 active_tools.write().await.insert(
1587 id.clone(),
1588 ActiveToolSnapshot {
1589 tool_name: name.clone(),
1590 started_at_ms: Self::now_ms(),
1591 },
1592 );
1593 }
1594 AgentEvent::ToolEnd { id, .. }
1595 | AgentEvent::PermissionDenied { tool_id: id, .. }
1596 | AgentEvent::ConfirmationRequired { tool_id: id, .. }
1597 | AgentEvent::ConfirmationReceived { tool_id: id, .. }
1598 | AgentEvent::ConfirmationTimeout { tool_id: id, .. } => {
1599 active_tools.write().await.remove(id);
1600 }
1601 _ => {}
1602 }
1603 }
1604
1605 async fn clear_runtime_tracking(&self) {
1606 self.active_tools.write().await.clear();
1607 }
1608
1609 fn build_agent_loop(&self) -> AgentLoop {
1613 let mut config = self.config.clone();
1614 config.hook_engine = Some(if let Some(ref ahp) = self.ahp_executor {
1615 ahp.clone()
1616 } else {
1617 Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>
1618 });
1619 config.tools = self.tool_executor.definitions();
1623 let mut agent_loop = AgentLoop::new(
1624 self.llm_client.clone(),
1625 self.tool_executor.clone(),
1626 self.tool_context.clone(),
1627 config,
1628 );
1629 if let Some(ref queue) = self.command_queue {
1630 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1631 }
1632 agent_loop
1633 }
1634
1635 fn build_command_context(&self) -> CommandContext {
1637 let history = read_or_recover(&self.history);
1638
1639 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1641
1642 let mut mcp_map: std::collections::HashMap<String, usize> =
1644 std::collections::HashMap::new();
1645 for name in &tool_names {
1646 if let Some(rest) = name.strip_prefix("mcp__") {
1647 if let Some((server, _)) = rest.split_once("__") {
1648 *mcp_map.entry(server.to_string()).or_default() += 1;
1649 }
1650 }
1651 }
1652 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1653 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1654
1655 CommandContext {
1656 session_id: self.session_id.clone(),
1657 workspace: self.workspace.display().to_string(),
1658 model: self.model_name.clone(),
1659 history_len: history.len(),
1660 total_tokens: 0,
1661 total_cost: 0.0,
1662 tool_names,
1663 mcp_servers,
1664 }
1665 }
1666
1667 pub fn command_registry(&self) -> std::sync::MutexGuard<'_, CommandRegistry> {
1671 self.command_registry
1672 .lock()
1673 .expect("command_registry lock poisoned")
1674 }
1675
1676 pub fn register_command(&self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1680 self.command_registry
1681 .lock()
1682 .expect("command_registry lock poisoned")
1683 .register(cmd);
1684 }
1685
1686 pub fn cron_scheduler(&self) -> &Arc<CronScheduler> {
1688 &self.cron_scheduler
1689 }
1690
1691 fn ensure_cron_started(&self) {
1696 if !self.cron_started.swap(true, Ordering::Relaxed) {
1697 CronScheduler::start(Arc::clone(&self.cron_scheduler));
1698 }
1699 }
1700
1701 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1710 self.ensure_cron_started();
1713
1714 if CommandRegistry::is_command(prompt) {
1716 let ctx = self.build_command_context();
1717 let output = self.command_registry().dispatch(prompt, &ctx);
1718 if let Some(output) = output {
1720 if let Some(CommandAction::BtwQuery(ref question)) = output.action {
1722 let result = self.btw(question).await?;
1723 return Ok(AgentResult {
1724 text: result.answer,
1725 messages: history
1726 .map(|h| h.to_vec())
1727 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1728 tool_calls_count: 0,
1729 usage: result.usage,
1730 });
1731 }
1732 return Ok(AgentResult {
1733 text: output.text,
1734 messages: history
1735 .map(|h| h.to_vec())
1736 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1737 tool_calls_count: 0,
1738 usage: crate::llm::TokenUsage::default(),
1739 });
1740 }
1741 }
1742
1743 if let Some(ref w) = self.init_warning {
1744 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1745 }
1746 let agent_loop = self.build_agent_loop();
1747 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
1748 let runtime_state = Arc::clone(&self.active_tools);
1749 let runtime_collector = tokio::spawn(async move {
1750 while let Some(event) = runtime_rx.recv().await {
1751 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1752 }
1753 });
1754
1755 let use_internal = history.is_none();
1756 let effective_history = match history {
1757 Some(h) => h.to_vec(),
1758 None => read_or_recover(&self.history).clone(),
1759 };
1760
1761 let cancel_token = tokio_util::sync::CancellationToken::new();
1762 *self.cancel_token.lock().await = Some(cancel_token.clone());
1763 let result = agent_loop
1764 .execute_with_session(
1765 &effective_history,
1766 prompt,
1767 Some(&self.session_id),
1768 Some(runtime_tx),
1769 Some(&cancel_token),
1770 )
1771 .await;
1772 *self.cancel_token.lock().await = None;
1773 let _ = runtime_collector.await;
1774 let result = result?;
1775
1776 if use_internal {
1779 *write_or_recover(&self.history) = result.messages.clone();
1780
1781 if self.auto_save {
1783 if let Err(e) = self.save().await {
1784 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1785 }
1786 }
1787 }
1788
1789 if !self.is_processing_cron.swap(true, Ordering::Relaxed) {
1794 let fires = {
1795 let mut rx = self.cron_rx.lock().await;
1796 let mut fires: Vec<ScheduledFire> = Vec::new();
1797 while let Ok(fire) = rx.try_recv() {
1798 fires.push(fire);
1799 }
1800 fires
1801 };
1802 for fire in fires {
1803 tracing::debug!(
1804 task_id = %fire.task_id,
1805 "Firing scheduled cron task"
1806 );
1807 if let Err(e) = Box::pin(self.send(&fire.prompt, None)).await {
1808 tracing::warn!(
1809 task_id = %fire.task_id,
1810 "Scheduled task failed: {e}"
1811 );
1812 }
1813 }
1814 self.is_processing_cron.store(false, Ordering::Relaxed);
1815 }
1816
1817 self.clear_runtime_tracking().await;
1818
1819 Ok(result)
1820 }
1821
1822 async fn build_btw_runtime_context(&self) -> String {
1823 let mut sections = Vec::new();
1824
1825 let active_tools = {
1826 let tools = self.active_tools.read().await;
1827 let mut items = tools
1828 .iter()
1829 .map(|(tool_id, tool)| {
1830 let elapsed_ms = Self::now_ms().saturating_sub(tool.started_at_ms);
1831 format!(
1832 "- {} [{}] running_for={}ms",
1833 tool.tool_name, tool_id, elapsed_ms
1834 )
1835 })
1836 .collect::<Vec<_>>();
1837 items.sort();
1838 items
1839 };
1840 if !active_tools.is_empty() {
1841 sections.push(format!("[active tools]\n{}", active_tools.join("\n")));
1842 }
1843
1844 if let Some(cm) = &self.config.confirmation_manager {
1845 let pending = cm.pending_confirmations().await;
1846 if !pending.is_empty() {
1847 let mut lines = pending
1848 .into_iter()
1849 .map(
1850 |PendingConfirmationInfo {
1851 tool_id,
1852 tool_name,
1853 args,
1854 remaining_ms,
1855 }| {
1856 let arg_summary = Self::compact_json_value(&args);
1857 if arg_summary.is_empty() {
1858 format!(
1859 "- {} [{}] remaining={}ms",
1860 tool_name, tool_id, remaining_ms
1861 )
1862 } else {
1863 format!(
1864 "- {} [{}] remaining={}ms {}",
1865 tool_name, tool_id, remaining_ms, arg_summary
1866 )
1867 }
1868 },
1869 )
1870 .collect::<Vec<_>>();
1871 lines.sort();
1872 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1873 }
1874 }
1875
1876 if let Some(queue) = &self.command_queue {
1877 let stats = queue.stats().await;
1878 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1879 let mut lines = vec![format!(
1880 "active={}, pending={}, external_pending={}",
1881 stats.total_active, stats.total_pending, stats.external_pending
1882 )];
1883 let mut lanes = stats
1884 .lanes
1885 .into_values()
1886 .filter(|lane| lane.active > 0 || lane.pending > 0)
1887 .map(|lane| {
1888 format!(
1889 "- {:?}: active={}, pending={}, handler={:?}",
1890 lane.lane, lane.active, lane.pending, lane.handler_mode
1891 )
1892 })
1893 .collect::<Vec<_>>();
1894 lanes.sort();
1895 lines.extend(lanes);
1896 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1897 }
1898
1899 let external_tasks = queue.pending_external_tasks().await;
1900 if !external_tasks.is_empty() {
1901 let mut lines = external_tasks
1902 .into_iter()
1903 .take(6)
1904 .map(|task| {
1905 let payload_summary = Self::compact_json_value(&task.payload);
1906 if payload_summary.is_empty() {
1907 format!(
1908 "- {} {:?} remaining={}ms",
1909 task.command_type,
1910 task.lane,
1911 task.remaining_ms()
1912 )
1913 } else {
1914 format!(
1915 "- {} {:?} remaining={}ms {}",
1916 task.command_type,
1917 task.lane,
1918 task.remaining_ms(),
1919 payload_summary
1920 )
1921 }
1922 })
1923 .collect::<Vec<_>>();
1924 lines.sort();
1925 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1926 }
1927 }
1928
1929 if let Some(store) = &self.session_store {
1930 if let Ok(Some(session)) = store.load(&self.session_id).await {
1931 let active_tasks = session
1932 .tasks
1933 .into_iter()
1934 .filter(|task| task.status.is_active())
1935 .take(6)
1936 .map(|task| match task.tool {
1937 Some(tool) if !tool.is_empty() => {
1938 format!("- [{}] {} ({})", task.status, task.content, tool)
1939 }
1940 _ => format!("- [{}] {}", task.status, task.content),
1941 })
1942 .collect::<Vec<_>>();
1943 if !active_tasks.is_empty() {
1944 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1945 }
1946 }
1947 }
1948
1949 sections.join("\n\n")
1950 }
1951
1952 pub async fn btw(&self, question: &str) -> Result<BtwResult> {
1970 self.btw_with_context(question, None).await
1971 }
1972
1973 pub async fn btw_with_context(
1978 &self,
1979 question: &str,
1980 runtime_context: Option<&str>,
1981 ) -> Result<BtwResult> {
1982 let question = question.trim();
1983 if question.is_empty() {
1984 return Err(crate::error::CodeError::Session(
1985 "btw: question cannot be empty".to_string(),
1986 ));
1987 }
1988
1989 let history_snapshot = read_or_recover(&self.history).clone();
1991
1992 let mut messages = history_snapshot;
1994 let mut injected_sections = Vec::new();
1995 let session_runtime = self.build_btw_runtime_context().await;
1996 if !session_runtime.is_empty() {
1997 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1998 }
1999 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
2000 injected_sections.push(format!("[host runtime context]\n{}", extra));
2001 }
2002 if !injected_sections.is_empty() {
2003 let injected_context = format!(
2004 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
2005 injected_sections.join("\n\n")
2006 );
2007 messages.push(Message::user(&injected_context));
2008 }
2009 messages.push(Message::user(question));
2010
2011 let response = self
2012 .llm_client
2013 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2014 .await
2015 .map_err(|e| {
2016 crate::error::CodeError::Llm(format!("btw: ephemeral LLM call failed: {e}"))
2017 })?;
2018
2019 Ok(BtwResult {
2020 question: question.to_string(),
2021 answer: response.text(),
2022 usage: response.usage,
2023 })
2024 }
2025
2026 pub async fn send_with_attachments(
2031 &self,
2032 prompt: &str,
2033 attachments: &[crate::llm::Attachment],
2034 history: Option<&[Message]>,
2035 ) -> Result<AgentResult> {
2036 let use_internal = history.is_none();
2040 let mut effective_history = match history {
2041 Some(h) => h.to_vec(),
2042 None => read_or_recover(&self.history).clone(),
2043 };
2044 effective_history.push(Message::user_with_attachments(prompt, attachments));
2045
2046 let agent_loop = self.build_agent_loop();
2047 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2048 let runtime_state = Arc::clone(&self.active_tools);
2049 let runtime_collector = tokio::spawn(async move {
2050 while let Some(event) = runtime_rx.recv().await {
2051 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2052 }
2053 });
2054 let result = agent_loop
2055 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2056 .await?;
2057 let _ = runtime_collector.await;
2058
2059 if use_internal {
2060 *write_or_recover(&self.history) = result.messages.clone();
2061 if self.auto_save {
2062 if let Err(e) = self.save().await {
2063 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2064 }
2065 }
2066 }
2067
2068 self.clear_runtime_tracking().await;
2069
2070 Ok(result)
2071 }
2072
2073 pub async fn stream_with_attachments(
2078 &self,
2079 prompt: &str,
2080 attachments: &[crate::llm::Attachment],
2081 history: Option<&[Message]>,
2082 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2083 let (tx, rx) = mpsc::channel(256);
2084 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2085 let mut effective_history = match history {
2086 Some(h) => h.to_vec(),
2087 None => read_or_recover(&self.history).clone(),
2088 };
2089 effective_history.push(Message::user_with_attachments(prompt, attachments));
2090
2091 let agent_loop = self.build_agent_loop();
2092 let runtime_state = Arc::clone(&self.active_tools);
2093 let forwarder = tokio::spawn(async move {
2094 let mut forward_enabled = true;
2095 while let Some(event) = runtime_rx.recv().await {
2096 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2097 if forward_enabled && tx.send(event).await.is_err() {
2098 forward_enabled = false;
2099 }
2100 }
2101 });
2102 let handle = tokio::spawn(async move {
2103 let _ = agent_loop
2104 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2105 .await;
2106 });
2107 let active_tools = Arc::clone(&self.active_tools);
2108 let wrapped_handle = tokio::spawn(async move {
2109 let _ = handle.await;
2110 let _ = forwarder.await;
2111 active_tools.write().await.clear();
2112 });
2113
2114 Ok((rx, wrapped_handle))
2115 }
2116
2117 pub async fn stream(
2127 &self,
2128 prompt: &str,
2129 history: Option<&[Message]>,
2130 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2131 self.ensure_cron_started();
2132
2133 if CommandRegistry::is_command(prompt) {
2135 let ctx = self.build_command_context();
2136 let output = self.command_registry().dispatch(prompt, &ctx);
2137 if let Some(output) = output {
2139 let (tx, rx) = mpsc::channel(256);
2140
2141 if let Some(CommandAction::BtwQuery(question)) = output.action {
2143 let llm_client = self.llm_client.clone();
2145 let history_snapshot = read_or_recover(&self.history).clone();
2146 let handle = tokio::spawn(async move {
2147 let mut messages = history_snapshot;
2148 messages.push(Message::user(&question));
2149 match llm_client
2150 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2151 .await
2152 {
2153 Ok(response) => {
2154 let answer = response.text();
2155 let _ = tx
2156 .send(AgentEvent::BtwAnswer {
2157 question: question.clone(),
2158 answer: answer.clone(),
2159 usage: response.usage,
2160 })
2161 .await;
2162 let _ = tx
2163 .send(AgentEvent::End {
2164 text: answer,
2165 usage: crate::llm::TokenUsage::default(),
2166 meta: None,
2167 })
2168 .await;
2169 }
2170 Err(e) => {
2171 let _ = tx
2172 .send(AgentEvent::Error {
2173 message: format!("btw failed: {e}"),
2174 })
2175 .await;
2176 }
2177 }
2178 });
2179 return Ok((rx, handle));
2180 }
2181
2182 let handle = tokio::spawn(async move {
2183 let _ = tx
2184 .send(AgentEvent::TextDelta {
2185 text: output.text.clone(),
2186 })
2187 .await;
2188 let _ = tx
2189 .send(AgentEvent::End {
2190 text: output.text.clone(),
2191 usage: crate::llm::TokenUsage::default(),
2192 meta: None,
2193 })
2194 .await;
2195 });
2196 return Ok((rx, handle));
2197 }
2198 }
2199
2200 let (tx, rx) = mpsc::channel(256);
2201 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2202 let agent_loop = self.build_agent_loop();
2203 let effective_history = match history {
2204 Some(h) => h.to_vec(),
2205 None => read_or_recover(&self.history).clone(),
2206 };
2207 let prompt = prompt.to_string();
2208 let session_id = self.session_id.clone();
2209
2210 let cancel_token = tokio_util::sync::CancellationToken::new();
2211 *self.cancel_token.lock().await = Some(cancel_token.clone());
2212 let token_clone = cancel_token.clone();
2213 let runtime_state = Arc::clone(&self.active_tools);
2214 let forwarder = tokio::spawn(async move {
2215 let mut forward_enabled = true;
2216 while let Some(event) = runtime_rx.recv().await {
2217 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2218 if forward_enabled && tx.send(event).await.is_err() {
2219 forward_enabled = false;
2220 }
2221 }
2222 });
2223
2224 let handle = tokio::spawn(async move {
2225 let _ = agent_loop
2226 .execute_with_session(
2227 &effective_history,
2228 &prompt,
2229 Some(&session_id),
2230 Some(runtime_tx),
2231 Some(&token_clone),
2232 )
2233 .await;
2234 });
2235
2236 let cancel_token_ref = self.cancel_token.clone();
2238 let active_tools = Arc::clone(&self.active_tools);
2239 let wrapped_handle = tokio::spawn(async move {
2240 let _ = handle.await;
2241 let _ = forwarder.await;
2242 *cancel_token_ref.lock().await = None;
2243 active_tools.write().await.clear();
2244 });
2245
2246 Ok((rx, wrapped_handle))
2247 }
2248
2249 pub async fn cancel(&self) -> bool {
2256 let token = self.cancel_token.lock().await.clone();
2257 if let Some(token) = token {
2258 token.cancel();
2259 tracing::info!(session_id = %self.session_id, "Cancelled ongoing operation");
2260 true
2261 } else {
2262 tracing::debug!(session_id = %self.session_id, "No ongoing operation to cancel");
2263 false
2264 }
2265 }
2266
2267 pub fn history(&self) -> Vec<Message> {
2269 read_or_recover(&self.history).clone()
2270 }
2271
2272 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
2274 self.memory.as_ref()
2275 }
2276
2277 pub fn id(&self) -> &str {
2279 &self.session_id
2280 }
2281
2282 pub fn workspace(&self) -> &std::path::Path {
2284 &self.workspace
2285 }
2286
2287 pub fn init_warning(&self) -> Option<&str> {
2289 self.init_warning.as_deref()
2290 }
2291
2292 pub fn session_id(&self) -> &str {
2294 &self.session_id
2295 }
2296
2297 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
2303 self.tool_executor.definitions()
2304 }
2305
2306 pub fn tool_names(&self) -> Vec<String> {
2312 self.tool_executor
2313 .definitions()
2314 .into_iter()
2315 .map(|t| t.name)
2316 .collect()
2317 }
2318
2319 pub fn register_hook(&self, hook: crate::hooks::Hook) {
2325 self.hook_engine.register(hook);
2326 }
2327
2328 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
2330 self.hook_engine.unregister(hook_id)
2331 }
2332
2333 pub fn register_hook_handler(
2335 &self,
2336 hook_id: &str,
2337 handler: Arc<dyn crate::hooks::HookHandler>,
2338 ) {
2339 self.hook_engine.register_handler(hook_id, handler);
2340 }
2341
2342 pub fn unregister_hook_handler(&self, hook_id: &str) {
2344 self.hook_engine.unregister_handler(hook_id);
2345 }
2346
2347 pub fn hook_count(&self) -> usize {
2349 self.hook_engine.hook_count()
2350 }
2351
2352 pub async fn save(&self) -> Result<()> {
2356 let store = match &self.session_store {
2357 Some(s) => s,
2358 None => return Ok(()),
2359 };
2360
2361 let history = read_or_recover(&self.history).clone();
2362 let now = chrono::Utc::now().timestamp();
2363
2364 let data = crate::store::SessionData {
2365 id: self.session_id.clone(),
2366 config: crate::session::SessionConfig {
2367 name: String::new(),
2368 workspace: self.workspace.display().to_string(),
2369 system_prompt: Some(self.config.prompt_slots.build()),
2370 max_context_length: 200_000,
2371 auto_compact: false,
2372 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
2373 storage_type: crate::config::StorageBackend::File,
2374 queue_config: None,
2375 confirmation_policy: None,
2376 permission_policy: None,
2377 parent_id: None,
2378 security_config: None,
2379 hook_engine: None,
2380 planning_enabled: self.config.planning_enabled,
2381 goal_tracking: self.config.goal_tracking,
2382 },
2383 state: crate::session::SessionState::Active,
2384 messages: history,
2385 context_usage: crate::session::ContextUsage::default(),
2386 total_usage: crate::llm::TokenUsage::default(),
2387 total_cost: 0.0,
2388 model_name: None,
2389 cost_records: Vec::new(),
2390 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
2391 thinking_enabled: false,
2392 thinking_budget: None,
2393 created_at: now,
2394 updated_at: now,
2395 llm_config: None,
2396 tasks: Vec::new(),
2397 parent_id: None,
2398 };
2399
2400 store.save(&data).await?;
2401 tracing::debug!("Session {} saved", self.session_id);
2402 Ok(())
2403 }
2404
2405 pub async fn read_file(&self, path: &str) -> Result<String> {
2407 let args = serde_json::json!({ "file_path": path });
2408 let result = self.tool_executor.execute("read", &args).await?;
2409 Ok(result.output)
2410 }
2411
2412 pub async fn bash(&self, command: &str) -> Result<String> {
2417 let args = serde_json::json!({ "command": command });
2418 let result = self
2419 .tool_executor
2420 .execute_with_context("bash", &args, &self.tool_context)
2421 .await?;
2422 Ok(result.output)
2423 }
2424
2425 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
2427 let args = serde_json::json!({ "pattern": pattern });
2428 let result = self.tool_executor.execute("glob", &args).await?;
2429 let files: Vec<String> = result
2430 .output
2431 .lines()
2432 .filter(|l| !l.is_empty())
2433 .map(|l| l.to_string())
2434 .collect();
2435 Ok(files)
2436 }
2437
2438 pub async fn grep(&self, pattern: &str) -> Result<String> {
2440 let args = serde_json::json!({ "pattern": pattern });
2441 let result = self.tool_executor.execute("grep", &args).await?;
2442 Ok(result.output)
2443 }
2444
2445 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
2447 let result = self.tool_executor.execute(name, &args).await?;
2448 Ok(ToolCallResult {
2449 name: name.to_string(),
2450 output: result.output,
2451 exit_code: result.exit_code,
2452 })
2453 }
2454
2455 pub fn has_queue(&self) -> bool {
2461 self.command_queue.is_some()
2462 }
2463
2464 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
2468 if let Some(ref queue) = self.command_queue {
2469 queue.set_lane_handler(lane, config).await;
2470 }
2471 }
2472
2473 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
2477 if let Some(ref queue) = self.command_queue {
2478 queue.complete_external_task(task_id, result).await
2479 } else {
2480 false
2481 }
2482 }
2483
2484 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
2486 if let Some(ref queue) = self.command_queue {
2487 queue.pending_external_tasks().await
2488 } else {
2489 Vec::new()
2490 }
2491 }
2492
2493 pub async fn queue_stats(&self) -> SessionQueueStats {
2495 if let Some(ref queue) = self.command_queue {
2496 queue.stats().await
2497 } else {
2498 SessionQueueStats::default()
2499 }
2500 }
2501
2502 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
2504 if let Some(ref queue) = self.command_queue {
2505 queue.metrics_snapshot().await
2506 } else {
2507 None
2508 }
2509 }
2510
2511 pub async fn submit(
2517 &self,
2518 lane: SessionLane,
2519 command: Box<dyn crate::queue::SessionCommand>,
2520 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
2521 let queue = self
2522 .command_queue
2523 .as_ref()
2524 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2525 Ok(queue.submit(lane, command).await)
2526 }
2527
2528 pub async fn submit_batch(
2535 &self,
2536 lane: SessionLane,
2537 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
2538 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
2539 {
2540 let queue = self
2541 .command_queue
2542 .as_ref()
2543 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2544 Ok(queue.submit_batch(lane, commands).await)
2545 }
2546
2547 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
2549 if let Some(ref queue) = self.command_queue {
2550 queue.dead_letters().await
2551 } else {
2552 Vec::new()
2553 }
2554 }
2555
2556 pub fn register_agent_dir(&self, dir: &std::path::Path) -> usize {
2569 use crate::subagent::load_agents_from_dir;
2570 let agents = load_agents_from_dir(dir);
2571 let count = agents.len();
2572 for agent in agents {
2573 tracing::info!(
2574 session_id = %self.session_id,
2575 agent = agent.name,
2576 dir = %dir.display(),
2577 "Dynamically registered agent"
2578 );
2579 self.agent_registry.register(agent);
2580 }
2581 count
2582 }
2583
2584 pub async fn add_mcp_server(
2591 &self,
2592 config: crate::mcp::McpServerConfig,
2593 ) -> crate::error::Result<usize> {
2594 let server_name = config.name.clone();
2595 self.mcp_manager.register_server(config).await;
2596 self.mcp_manager.connect(&server_name).await.map_err(|e| {
2597 crate::error::CodeError::Tool {
2598 tool: server_name.clone(),
2599 message: format!("Failed to connect MCP server: {}", e),
2600 }
2601 })?;
2602
2603 let tools = self.mcp_manager.get_server_tools(&server_name).await;
2604 let count = tools.len();
2605
2606 for tool in
2607 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
2608 {
2609 self.tool_executor.register_dynamic_tool(tool);
2610 }
2611
2612 tracing::info!(
2613 session_id = %self.session_id,
2614 server = server_name,
2615 tools = count,
2616 "MCP server added to live session"
2617 );
2618
2619 Ok(count)
2620 }
2621
2622 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
2627 self.tool_executor
2628 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
2629 self.mcp_manager
2630 .disconnect(server_name)
2631 .await
2632 .map_err(|e| crate::error::CodeError::Tool {
2633 tool: server_name.to_string(),
2634 message: format!("Failed to disconnect MCP server: {}", e),
2635 })?;
2636 tracing::info!(
2637 session_id = %self.session_id,
2638 server = server_name,
2639 "MCP server removed from live session"
2640 );
2641 Ok(())
2642 }
2643
2644 pub async fn mcp_status(
2646 &self,
2647 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
2648 self.mcp_manager.get_status().await
2649 }
2650}
2651
2652#[cfg(test)]
2657mod tests {
2658 use super::*;
2659 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
2660 use crate::store::SessionStore;
2661
2662 #[tokio::test]
2663 async fn test_session_submit_no_queue_returns_err() {
2664 let agent = Agent::from_config(test_config()).await.unwrap();
2665 let session = agent.session(".", None).unwrap();
2666 struct Noop;
2667 #[async_trait::async_trait]
2668 impl crate::queue::SessionCommand for Noop {
2669 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2670 Ok(serde_json::json!(null))
2671 }
2672 fn command_type(&self) -> &str {
2673 "noop"
2674 }
2675 }
2676 let result: anyhow::Result<
2677 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
2678 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
2679 assert!(result.is_err());
2680 assert!(result.unwrap_err().to_string().contains("No queue"));
2681 }
2682
2683 #[tokio::test]
2684 async fn test_session_submit_batch_no_queue_returns_err() {
2685 let agent = Agent::from_config(test_config()).await.unwrap();
2686 let session = agent.session(".", None).unwrap();
2687 struct Noop;
2688 #[async_trait::async_trait]
2689 impl crate::queue::SessionCommand for Noop {
2690 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2691 Ok(serde_json::json!(null))
2692 }
2693 fn command_type(&self) -> &str {
2694 "noop"
2695 }
2696 }
2697 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
2698 let result: anyhow::Result<
2699 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
2700 > = session.submit_batch(SessionLane::Query, cmds).await;
2701 assert!(result.is_err());
2702 assert!(result.unwrap_err().to_string().contains("No queue"));
2703 }
2704
2705 fn test_config() -> CodeConfig {
2706 CodeConfig {
2707 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
2708 providers: vec![
2709 ProviderConfig {
2710 name: "anthropic".to_string(),
2711 api_key: Some("test-key".to_string()),
2712 base_url: None,
2713 models: vec![ModelConfig {
2714 id: "claude-sonnet-4-20250514".to_string(),
2715 name: "Claude Sonnet 4".to_string(),
2716 family: "claude-sonnet".to_string(),
2717 api_key: None,
2718 base_url: None,
2719 attachment: false,
2720 reasoning: false,
2721 tool_call: true,
2722 temperature: true,
2723 release_date: None,
2724 modalities: ModelModalities::default(),
2725 cost: Default::default(),
2726 limit: Default::default(),
2727 }],
2728 },
2729 ProviderConfig {
2730 name: "openai".to_string(),
2731 api_key: Some("test-openai-key".to_string()),
2732 base_url: None,
2733 models: vec![ModelConfig {
2734 id: "gpt-4o".to_string(),
2735 name: "GPT-4o".to_string(),
2736 family: "gpt-4".to_string(),
2737 api_key: None,
2738 base_url: None,
2739 attachment: false,
2740 reasoning: false,
2741 tool_call: true,
2742 temperature: true,
2743 release_date: None,
2744 modalities: ModelModalities::default(),
2745 cost: Default::default(),
2746 limit: Default::default(),
2747 }],
2748 },
2749 ],
2750 ..Default::default()
2751 }
2752 }
2753
2754 fn build_effective_registry_for_test(
2755 agent_registry: Option<Arc<crate::skills::SkillRegistry>>,
2756 opts: &SessionOptions,
2757 ) -> Arc<crate::skills::SkillRegistry> {
2758 let base_registry = agent_registry
2759 .as_deref()
2760 .map(|r| r.fork())
2761 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
2762 if let Some(ref r) = opts.skill_registry {
2763 for skill in r.all() {
2764 base_registry.register_unchecked(skill);
2765 }
2766 }
2767 for dir in &opts.skill_dirs {
2768 if let Err(e) = base_registry.load_from_dir(dir) {
2769 tracing::warn!(
2770 dir = %dir.display(),
2771 error = %e,
2772 "Failed to load session skill dir — skipping"
2773 );
2774 }
2775 }
2776 Arc::new(base_registry)
2777 }
2778
2779 #[tokio::test]
2780 async fn test_from_config() {
2781 let agent = Agent::from_config(test_config()).await;
2782 assert!(agent.is_ok());
2783 }
2784
2785 #[tokio::test]
2786 async fn test_session_default() {
2787 let agent = Agent::from_config(test_config()).await.unwrap();
2788 let session = agent.session("/tmp/test-workspace", None);
2789 assert!(session.is_ok());
2790 let debug = format!("{:?}", session.unwrap());
2791 assert!(debug.contains("AgentSession"));
2792 }
2793
2794 #[tokio::test]
2795 async fn test_session_registers_agentic_tools_by_default() {
2796 let agent = Agent::from_config(test_config()).await.unwrap();
2797 let session = agent.session("/tmp/test-workspace", None).unwrap();
2798 let tool_names = session.tool_names();
2799
2800 assert!(tool_names.iter().any(|name| name == "agentic_search"));
2801 assert!(tool_names.iter().any(|name| name == "agentic_parse"));
2802 }
2803
2804 #[tokio::test]
2805 async fn test_session_can_disable_agentic_tools_via_config() {
2806 let mut config = test_config();
2807 config.agentic_search = Some(crate::config::AgenticSearchConfig {
2808 enabled: false,
2809 ..Default::default()
2810 });
2811 config.agentic_parse = Some(crate::config::AgenticParseConfig {
2812 enabled: false,
2813 ..Default::default()
2814 });
2815
2816 let agent = Agent::from_config(config).await.unwrap();
2817 let session = agent.session("/tmp/test-workspace", None).unwrap();
2818 let tool_names = session.tool_names();
2819
2820 assert!(!tool_names.iter().any(|name| name == "agentic_search"));
2821 assert!(!tool_names.iter().any(|name| name == "agentic_parse"));
2822 }
2823
2824 #[tokio::test]
2825 async fn test_session_with_model_override() {
2826 let agent = Agent::from_config(test_config()).await.unwrap();
2827 let opts = SessionOptions::new().with_model("openai/gpt-4o");
2828 let session = agent.session("/tmp/test-workspace", Some(opts));
2829 assert!(session.is_ok());
2830 }
2831
2832 #[tokio::test]
2833 async fn test_session_with_invalid_model_format() {
2834 let agent = Agent::from_config(test_config()).await.unwrap();
2835 let opts = SessionOptions::new().with_model("gpt-4o");
2836 let session = agent.session("/tmp/test-workspace", Some(opts));
2837 assert!(session.is_err());
2838 }
2839
2840 #[test]
2841 fn test_session_options_with_default_parser_ocr_provider() {
2842 struct MockOcrProvider;
2843
2844 impl crate::default_parser::DefaultParserOcrProvider for MockOcrProvider {
2845 fn name(&self) -> &str {
2846 "mock-ocr"
2847 }
2848
2849 fn ocr_pdf(
2850 &self,
2851 _path: &std::path::Path,
2852 _config: &crate::config::DefaultParserOcrConfig,
2853 ) -> anyhow::Result<Option<String>> {
2854 Ok(None)
2855 }
2856 }
2857
2858 let opts =
2859 SessionOptions::new().with_default_parser_ocr_provider(Arc::new(MockOcrProvider));
2860 assert!(opts.default_parser_ocr_provider.is_some());
2861 }
2862
2863 #[tokio::test]
2864 async fn test_session_with_model_not_found() {
2865 let agent = Agent::from_config(test_config()).await.unwrap();
2866 let opts = SessionOptions::new().with_model("openai/nonexistent");
2867 let session = agent.session("/tmp/test-workspace", Some(opts));
2868 assert!(session.is_err());
2869 }
2870
2871 #[tokio::test]
2872 async fn test_session_preserves_skill_scorer_from_agent_registry() {
2873 use crate::skills::feedback::{
2874 DefaultSkillScorer, SkillFeedback, SkillOutcome, SkillScorer,
2875 };
2876 use crate::skills::{Skill, SkillKind, SkillRegistry};
2877
2878 let registry = Arc::new(SkillRegistry::new());
2879 let scorer = Arc::new(DefaultSkillScorer::default());
2880 registry.set_scorer(scorer.clone());
2881
2882 registry.register_unchecked(Arc::new(Skill {
2883 name: "healthy-skill".to_string(),
2884 description: "healthy".to_string(),
2885 allowed_tools: None,
2886 disable_model_invocation: false,
2887 kind: SkillKind::Instruction,
2888 content: "healthy".to_string(),
2889 tags: vec![],
2890 version: None,
2891 }));
2892 registry.register_unchecked(Arc::new(Skill {
2893 name: "disabled-skill".to_string(),
2894 description: "disabled".to_string(),
2895 allowed_tools: None,
2896 disable_model_invocation: false,
2897 kind: SkillKind::Instruction,
2898 content: "disabled".to_string(),
2899 tags: vec![],
2900 version: None,
2901 }));
2902
2903 for _ in 0..5 {
2904 scorer.record(SkillFeedback {
2905 skill_name: "disabled-skill".to_string(),
2906 outcome: SkillOutcome::Failure,
2907 score_delta: -1.0,
2908 reason: "bad".to_string(),
2909 timestamp: 0,
2910 });
2911 }
2912
2913 let effective_registry =
2914 build_effective_registry_for_test(Some(registry), &SessionOptions::new());
2915 let prompt = effective_registry.to_system_prompt();
2916
2917 assert!(prompt.contains("healthy-skill"));
2918 assert!(!prompt.contains("disabled-skill"));
2919 }
2920
2921 #[tokio::test]
2922 async fn test_session_skill_dirs_preserve_agent_registry_validator() {
2923 use crate::skills::validator::DefaultSkillValidator;
2924 use crate::skills::SkillRegistry;
2925
2926 let registry = Arc::new(SkillRegistry::new());
2927 registry.set_validator(Arc::new(DefaultSkillValidator::default()));
2928
2929 let temp_dir = tempfile::tempdir().unwrap();
2930 let invalid_skill = temp_dir.path().join("invalid.md");
2931 std::fs::write(
2932 &invalid_skill,
2933 r#"---
2934name: BadName
2935description: "invalid skill name"
2936kind: instruction
2937---
2938# Invalid Skill
2939"#,
2940 )
2941 .unwrap();
2942
2943 let opts = SessionOptions::new().with_skill_dirs([temp_dir.path()]);
2944 let effective_registry = build_effective_registry_for_test(Some(registry), &opts);
2945 assert!(effective_registry.get("BadName").is_none());
2946 }
2947
2948 #[tokio::test]
2949 async fn test_session_skill_registry_overrides_agent_registry_without_polluting_parent() {
2950 use crate::skills::{Skill, SkillKind, SkillRegistry};
2951
2952 let registry = Arc::new(SkillRegistry::new());
2953 registry.register_unchecked(Arc::new(Skill {
2954 name: "shared-skill".to_string(),
2955 description: "agent level".to_string(),
2956 allowed_tools: None,
2957 disable_model_invocation: false,
2958 kind: SkillKind::Instruction,
2959 content: "agent content".to_string(),
2960 tags: vec![],
2961 version: None,
2962 }));
2963
2964 let session_registry = Arc::new(SkillRegistry::new());
2965 session_registry.register_unchecked(Arc::new(Skill {
2966 name: "shared-skill".to_string(),
2967 description: "session level".to_string(),
2968 allowed_tools: None,
2969 disable_model_invocation: false,
2970 kind: SkillKind::Instruction,
2971 content: "session content".to_string(),
2972 tags: vec![],
2973 version: None,
2974 }));
2975
2976 let opts = SessionOptions::new().with_skill_registry(session_registry);
2977 let effective_registry = build_effective_registry_for_test(Some(registry.clone()), &opts);
2978
2979 assert_eq!(
2980 effective_registry.get("shared-skill").unwrap().content,
2981 "session content"
2982 );
2983 assert_eq!(
2984 registry.get("shared-skill").unwrap().content,
2985 "agent content"
2986 );
2987 }
2988
2989 #[tokio::test]
2990 async fn test_session_skill_dirs_override_session_registry_and_skip_invalid_entries() {
2991 use crate::skills::{Skill, SkillKind, SkillRegistry};
2992
2993 let session_registry = Arc::new(SkillRegistry::new());
2994 session_registry.register_unchecked(Arc::new(Skill {
2995 name: "shared-skill".to_string(),
2996 description: "session registry".to_string(),
2997 allowed_tools: None,
2998 disable_model_invocation: false,
2999 kind: SkillKind::Instruction,
3000 content: "registry content".to_string(),
3001 tags: vec![],
3002 version: None,
3003 }));
3004
3005 let temp_dir = tempfile::tempdir().unwrap();
3006 std::fs::write(
3007 temp_dir.path().join("shared.md"),
3008 r#"---
3009name: shared-skill
3010description: "skill dir override"
3011kind: instruction
3012---
3013# Shared Skill
3014dir content
3015"#,
3016 )
3017 .unwrap();
3018 std::fs::write(temp_dir.path().join("README.md"), "# not a skill").unwrap();
3019
3020 let opts = SessionOptions::new()
3021 .with_skill_registry(session_registry)
3022 .with_skill_dirs([temp_dir.path()]);
3023 let effective_registry = build_effective_registry_for_test(None, &opts);
3024
3025 assert_eq!(
3026 effective_registry.get("shared-skill").unwrap().description,
3027 "skill dir override"
3028 );
3029 assert!(effective_registry.get("README").is_none());
3030 }
3031
3032 #[tokio::test]
3033 async fn test_session_plugin_skills_are_loaded_into_session_registry_only() {
3034 use crate::plugin::{Plugin, PluginContext};
3035 use crate::skills::{Skill, SkillKind, SkillRegistry};
3036 use crate::tools::ToolRegistry;
3037
3038 struct SessionOnlySkillPlugin;
3039
3040 impl Plugin for SessionOnlySkillPlugin {
3041 fn name(&self) -> &str {
3042 "session-only-skill"
3043 }
3044
3045 fn version(&self) -> &str {
3046 "0.1.0"
3047 }
3048
3049 fn tool_names(&self) -> &[&str] {
3050 &[]
3051 }
3052
3053 fn load(
3054 &self,
3055 _registry: &Arc<ToolRegistry>,
3056 _ctx: &PluginContext,
3057 ) -> anyhow::Result<()> {
3058 Ok(())
3059 }
3060
3061 fn skills(&self) -> Vec<Arc<Skill>> {
3062 vec![Arc::new(Skill {
3063 name: "plugin-session-skill".to_string(),
3064 description: "plugin skill".to_string(),
3065 allowed_tools: None,
3066 disable_model_invocation: false,
3067 kind: SkillKind::Instruction,
3068 content: "plugin content".to_string(),
3069 tags: vec!["plugin".to_string()],
3070 version: None,
3071 })]
3072 }
3073 }
3074
3075 let mut agent = Agent::from_config(test_config()).await.unwrap();
3076 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3077 agent.config.skill_registry = Some(Arc::clone(&agent_registry));
3078
3079 let opts = SessionOptions::new().with_plugin(SessionOnlySkillPlugin);
3080 let session = agent.session("/tmp/test-workspace", Some(opts)).unwrap();
3081
3082 let session_registry = session.config.skill_registry.as_ref().unwrap();
3083 assert!(session_registry.get("plugin-session-skill").is_some());
3084 assert!(agent_registry.get("plugin-session-skill").is_none());
3085 }
3086
3087 #[tokio::test]
3088 async fn test_session_specific_skills_do_not_leak_across_sessions() {
3089 use crate::skills::{Skill, SkillKind, SkillRegistry};
3090
3091 let mut agent = Agent::from_config(test_config()).await.unwrap();
3092 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3093 agent.config.skill_registry = Some(agent_registry);
3094
3095 let session_registry = Arc::new(SkillRegistry::new());
3096 session_registry.register_unchecked(Arc::new(Skill {
3097 name: "session-only".to_string(),
3098 description: "only for first session".to_string(),
3099 allowed_tools: None,
3100 disable_model_invocation: false,
3101 kind: SkillKind::Instruction,
3102 content: "session one".to_string(),
3103 tags: vec![],
3104 version: None,
3105 }));
3106
3107 let session_one = agent
3108 .session(
3109 "/tmp/test-workspace",
3110 Some(SessionOptions::new().with_skill_registry(session_registry)),
3111 )
3112 .unwrap();
3113 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3114
3115 assert!(session_one
3116 .config
3117 .skill_registry
3118 .as_ref()
3119 .unwrap()
3120 .get("session-only")
3121 .is_some());
3122 assert!(session_two
3123 .config
3124 .skill_registry
3125 .as_ref()
3126 .unwrap()
3127 .get("session-only")
3128 .is_none());
3129 }
3130
3131 #[tokio::test]
3132 async fn test_plugin_skills_do_not_leak_across_sessions() {
3133 use crate::plugin::{Plugin, PluginContext};
3134 use crate::skills::{Skill, SkillKind, SkillRegistry};
3135 use crate::tools::ToolRegistry;
3136
3137 struct LeakyPlugin;
3138
3139 impl Plugin for LeakyPlugin {
3140 fn name(&self) -> &str {
3141 "leaky-plugin"
3142 }
3143
3144 fn version(&self) -> &str {
3145 "0.1.0"
3146 }
3147
3148 fn tool_names(&self) -> &[&str] {
3149 &[]
3150 }
3151
3152 fn load(
3153 &self,
3154 _registry: &Arc<ToolRegistry>,
3155 _ctx: &PluginContext,
3156 ) -> anyhow::Result<()> {
3157 Ok(())
3158 }
3159
3160 fn skills(&self) -> Vec<Arc<Skill>> {
3161 vec![Arc::new(Skill {
3162 name: "plugin-only".to_string(),
3163 description: "plugin only".to_string(),
3164 allowed_tools: None,
3165 disable_model_invocation: false,
3166 kind: SkillKind::Instruction,
3167 content: "plugin skill".to_string(),
3168 tags: vec![],
3169 version: None,
3170 })]
3171 }
3172 }
3173
3174 let mut agent = Agent::from_config(test_config()).await.unwrap();
3175 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3176
3177 let session_one = agent
3178 .session(
3179 "/tmp/test-workspace",
3180 Some(SessionOptions::new().with_plugin(LeakyPlugin)),
3181 )
3182 .unwrap();
3183 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3184
3185 assert!(session_one
3186 .config
3187 .skill_registry
3188 .as_ref()
3189 .unwrap()
3190 .get("plugin-only")
3191 .is_some());
3192 assert!(session_two
3193 .config
3194 .skill_registry
3195 .as_ref()
3196 .unwrap()
3197 .get("plugin-only")
3198 .is_none());
3199 }
3200
3201 #[tokio::test]
3202 async fn test_session_for_agent_applies_definition_and_keeps_skill_overrides_isolated() {
3203 use crate::skills::{Skill, SkillKind, SkillRegistry};
3204 use crate::subagent::AgentDefinition;
3205
3206 let mut agent = Agent::from_config(test_config()).await.unwrap();
3207 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3208
3209 let definition = AgentDefinition::new("reviewer", "Review code")
3210 .with_prompt("Agent definition prompt")
3211 .with_max_steps(7);
3212
3213 let session_registry = Arc::new(SkillRegistry::new());
3214 session_registry.register_unchecked(Arc::new(Skill {
3215 name: "agent-session-skill".to_string(),
3216 description: "agent session only".to_string(),
3217 allowed_tools: None,
3218 disable_model_invocation: false,
3219 kind: SkillKind::Instruction,
3220 content: "agent session content".to_string(),
3221 tags: vec![],
3222 version: None,
3223 }));
3224
3225 let session_one = agent
3226 .session_for_agent(
3227 "/tmp/test-workspace",
3228 &definition,
3229 Some(SessionOptions::new().with_skill_registry(session_registry)),
3230 )
3231 .unwrap();
3232 let session_two = agent
3233 .session_for_agent("/tmp/test-workspace", &definition, None)
3234 .unwrap();
3235
3236 assert_eq!(session_one.config.max_tool_rounds, 7);
3237 let extra = session_one.config.prompt_slots.extra.as_deref().unwrap();
3238 assert!(extra.contains("Agent definition prompt"));
3239 assert!(extra.contains("agent-session-skill"));
3240 assert!(session_one
3241 .config
3242 .skill_registry
3243 .as_ref()
3244 .unwrap()
3245 .get("agent-session-skill")
3246 .is_some());
3247 assert!(session_two
3248 .config
3249 .skill_registry
3250 .as_ref()
3251 .unwrap()
3252 .get("agent-session-skill")
3253 .is_none());
3254 }
3255
3256 #[tokio::test]
3257 async fn test_session_for_agent_preserves_existing_prompt_slots_when_injecting_definition_prompt(
3258 ) {
3259 use crate::prompts::SystemPromptSlots;
3260 use crate::subagent::AgentDefinition;
3261
3262 let agent = Agent::from_config(test_config()).await.unwrap();
3263 let definition = AgentDefinition::new("planner", "Plan work")
3264 .with_prompt("Definition extra prompt")
3265 .with_max_steps(3);
3266
3267 let opts = SessionOptions::new().with_prompt_slots(SystemPromptSlots {
3268 role: Some("Custom role".to_string()),
3269 guidelines: None,
3270 response_style: None,
3271 extra: None,
3272 });
3273
3274 let session = agent
3275 .session_for_agent("/tmp/test-workspace", &definition, Some(opts))
3276 .unwrap();
3277
3278 assert_eq!(
3279 session.config.prompt_slots.role.as_deref(),
3280 Some("Custom role")
3281 );
3282 assert!(session
3283 .config
3284 .prompt_slots
3285 .extra
3286 .as_deref()
3287 .unwrap()
3288 .contains("Definition extra prompt"));
3289 assert_eq!(session.config.max_tool_rounds, 3);
3290 }
3291
3292 #[tokio::test]
3293 async fn test_new_with_hcl_string() {
3294 let hcl = r#"
3295 default_model = "anthropic/claude-sonnet-4-20250514"
3296 providers {
3297 name = "anthropic"
3298 api_key = "test-key"
3299 models {
3300 id = "claude-sonnet-4-20250514"
3301 name = "Claude Sonnet 4"
3302 }
3303 }
3304 "#;
3305 let agent = Agent::new(hcl).await;
3306 assert!(agent.is_ok());
3307 }
3308
3309 #[tokio::test]
3310 async fn test_create_alias_hcl() {
3311 let hcl = r#"
3312 default_model = "anthropic/claude-sonnet-4-20250514"
3313 providers {
3314 name = "anthropic"
3315 api_key = "test-key"
3316 models {
3317 id = "claude-sonnet-4-20250514"
3318 name = "Claude Sonnet 4"
3319 }
3320 }
3321 "#;
3322 let agent = Agent::create(hcl).await;
3323 assert!(agent.is_ok());
3324 }
3325
3326 #[tokio::test]
3327 async fn test_create_and_new_produce_same_result() {
3328 let hcl = r#"
3329 default_model = "anthropic/claude-sonnet-4-20250514"
3330 providers {
3331 name = "anthropic"
3332 api_key = "test-key"
3333 models {
3334 id = "claude-sonnet-4-20250514"
3335 name = "Claude Sonnet 4"
3336 }
3337 }
3338 "#;
3339 let agent_new = Agent::new(hcl).await;
3340 let agent_create = Agent::create(hcl).await;
3341 assert!(agent_new.is_ok());
3342 assert!(agent_create.is_ok());
3343
3344 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
3346 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
3347 assert!(session_new.is_ok());
3348 assert!(session_create.is_ok());
3349 }
3350
3351 #[tokio::test]
3352 async fn test_new_with_existing_hcl_file_uses_file_loading() {
3353 let temp_dir = tempfile::tempdir().unwrap();
3354 let config_path = temp_dir.path().join("agent.hcl");
3355 std::fs::write(&config_path, "this is not valid hcl").unwrap();
3356
3357 let err = Agent::new(config_path.display().to_string())
3358 .await
3359 .unwrap_err();
3360 let msg = err.to_string();
3361
3362 assert!(msg.contains("Failed to load config"));
3363 assert!(msg.contains("agent.hcl"));
3364 assert!(!msg.contains("Failed to parse config as HCL string"));
3365 }
3366
3367 #[tokio::test]
3368 async fn test_new_with_missing_hcl_file_reports_not_found() {
3369 let temp_dir = tempfile::tempdir().unwrap();
3370 let missing_path = temp_dir.path().join("agent.hcl");
3371
3372 let err = Agent::new(missing_path.display().to_string())
3373 .await
3374 .unwrap_err();
3375 let msg = err.to_string();
3376
3377 assert!(msg.contains("Config file not found"));
3378 assert!(msg.contains("agent.hcl"));
3379 assert!(!msg.contains("Failed to parse config as HCL string"));
3380 }
3381
3382 #[test]
3383 fn test_from_config_requires_default_model() {
3384 let rt = tokio::runtime::Runtime::new().unwrap();
3385 let config = CodeConfig {
3386 providers: vec![ProviderConfig {
3387 name: "anthropic".to_string(),
3388 api_key: Some("test-key".to_string()),
3389 base_url: None,
3390 models: vec![],
3391 }],
3392 ..Default::default()
3393 };
3394 let result = rt.block_on(Agent::from_config(config));
3395 assert!(result.is_err());
3396 }
3397
3398 #[tokio::test]
3399 async fn test_history_empty_on_new_session() {
3400 let agent = Agent::from_config(test_config()).await.unwrap();
3401 let session = agent.session("/tmp/test-workspace", None).unwrap();
3402 assert!(session.history().is_empty());
3403 }
3404
3405 #[tokio::test]
3406 async fn test_session_options_with_agent_dir() {
3407 let opts = SessionOptions::new()
3408 .with_agent_dir("/tmp/agents")
3409 .with_agent_dir("/tmp/more-agents");
3410 assert_eq!(opts.agent_dirs.len(), 2);
3411 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
3412 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
3413 }
3414
3415 #[test]
3420 fn test_session_options_with_queue_config() {
3421 let qc = SessionQueueConfig::default().with_lane_features();
3422 let opts = SessionOptions::new().with_queue_config(qc.clone());
3423 assert!(opts.queue_config.is_some());
3424
3425 let config = opts.queue_config.unwrap();
3426 assert!(config.enable_dlq);
3427 assert!(config.enable_metrics);
3428 assert!(config.enable_alerts);
3429 assert_eq!(config.default_timeout_ms, Some(60_000));
3430 }
3431
3432 #[tokio::test(flavor = "multi_thread")]
3433 async fn test_session_with_queue_config() {
3434 let agent = Agent::from_config(test_config()).await.unwrap();
3435 let qc = SessionQueueConfig::default();
3436 let opts = SessionOptions::new().with_queue_config(qc);
3437 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
3438 assert!(session.is_ok());
3439 let session = session.unwrap();
3440 assert!(session.has_queue());
3441 }
3442
3443 #[tokio::test]
3444 async fn test_session_without_queue_config() {
3445 let agent = Agent::from_config(test_config()).await.unwrap();
3446 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
3447 assert!(!session.has_queue());
3448 }
3449
3450 #[tokio::test]
3451 async fn test_session_queue_stats_without_queue() {
3452 let agent = Agent::from_config(test_config()).await.unwrap();
3453 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
3454 let stats = session.queue_stats().await;
3455 assert_eq!(stats.total_pending, 0);
3457 assert_eq!(stats.total_active, 0);
3458 }
3459
3460 #[tokio::test(flavor = "multi_thread")]
3461 async fn test_session_queue_stats_with_queue() {
3462 let agent = Agent::from_config(test_config()).await.unwrap();
3463 let qc = SessionQueueConfig::default();
3464 let opts = SessionOptions::new().with_queue_config(qc);
3465 let session = agent
3466 .session("/tmp/test-workspace-qstats", Some(opts))
3467 .unwrap();
3468 let stats = session.queue_stats().await;
3469 assert_eq!(stats.total_pending, 0);
3471 assert_eq!(stats.total_active, 0);
3472 }
3473
3474 #[tokio::test(flavor = "multi_thread")]
3475 async fn test_session_pending_external_tasks_empty() {
3476 let agent = Agent::from_config(test_config()).await.unwrap();
3477 let qc = SessionQueueConfig::default();
3478 let opts = SessionOptions::new().with_queue_config(qc);
3479 let session = agent
3480 .session("/tmp/test-workspace-ext", Some(opts))
3481 .unwrap();
3482 let tasks = session.pending_external_tasks().await;
3483 assert!(tasks.is_empty());
3484 }
3485
3486 #[tokio::test(flavor = "multi_thread")]
3487 async fn test_session_dead_letters_empty() {
3488 let agent = Agent::from_config(test_config()).await.unwrap();
3489 let qc = SessionQueueConfig::default().with_dlq(Some(100));
3490 let opts = SessionOptions::new().with_queue_config(qc);
3491 let session = agent
3492 .session("/tmp/test-workspace-dlq", Some(opts))
3493 .unwrap();
3494 let dead = session.dead_letters().await;
3495 assert!(dead.is_empty());
3496 }
3497
3498 #[tokio::test(flavor = "multi_thread")]
3499 async fn test_session_queue_metrics_disabled() {
3500 let agent = Agent::from_config(test_config()).await.unwrap();
3501 let qc = SessionQueueConfig::default();
3503 let opts = SessionOptions::new().with_queue_config(qc);
3504 let session = agent
3505 .session("/tmp/test-workspace-nomet", Some(opts))
3506 .unwrap();
3507 let metrics = session.queue_metrics().await;
3508 assert!(metrics.is_none());
3509 }
3510
3511 #[tokio::test(flavor = "multi_thread")]
3512 async fn test_session_queue_metrics_enabled() {
3513 let agent = Agent::from_config(test_config()).await.unwrap();
3514 let qc = SessionQueueConfig::default().with_metrics();
3515 let opts = SessionOptions::new().with_queue_config(qc);
3516 let session = agent
3517 .session("/tmp/test-workspace-met", Some(opts))
3518 .unwrap();
3519 let metrics = session.queue_metrics().await;
3520 assert!(metrics.is_some());
3521 }
3522
3523 #[tokio::test(flavor = "multi_thread")]
3524 async fn test_session_set_lane_handler() {
3525 let agent = Agent::from_config(test_config()).await.unwrap();
3526 let qc = SessionQueueConfig::default();
3527 let opts = SessionOptions::new().with_queue_config(qc);
3528 let session = agent
3529 .session("/tmp/test-workspace-handler", Some(opts))
3530 .unwrap();
3531
3532 session
3534 .set_lane_handler(
3535 SessionLane::Execute,
3536 LaneHandlerConfig {
3537 mode: crate::queue::TaskHandlerMode::External,
3538 timeout_ms: 30_000,
3539 },
3540 )
3541 .await;
3542
3543 }
3546
3547 #[tokio::test(flavor = "multi_thread")]
3552 async fn test_session_has_id() {
3553 let agent = Agent::from_config(test_config()).await.unwrap();
3554 let session = agent.session("/tmp/test-ws-id", None).unwrap();
3555 assert!(!session.session_id().is_empty());
3557 assert_eq!(session.session_id().len(), 36); }
3559
3560 #[tokio::test(flavor = "multi_thread")]
3561 async fn test_session_explicit_id() {
3562 let agent = Agent::from_config(test_config()).await.unwrap();
3563 let opts = SessionOptions::new().with_session_id("my-session-42");
3564 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
3565 assert_eq!(session.session_id(), "my-session-42");
3566 }
3567
3568 #[tokio::test(flavor = "multi_thread")]
3569 async fn test_session_save_no_store() {
3570 let agent = Agent::from_config(test_config()).await.unwrap();
3571 let session = agent.session("/tmp/test-ws-save", None).unwrap();
3572 session.save().await.unwrap();
3574 }
3575
3576 #[tokio::test(flavor = "multi_thread")]
3577 async fn test_session_save_and_load() {
3578 let store = Arc::new(crate::store::MemorySessionStore::new());
3579 let agent = Agent::from_config(test_config()).await.unwrap();
3580
3581 let opts = SessionOptions::new()
3582 .with_session_store(store.clone())
3583 .with_session_id("persist-test");
3584 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
3585
3586 session.save().await.unwrap();
3588
3589 assert!(store.exists("persist-test").await.unwrap());
3591
3592 let data = store.load("persist-test").await.unwrap().unwrap();
3593 assert_eq!(data.id, "persist-test");
3594 assert!(data.messages.is_empty());
3595 }
3596
3597 #[tokio::test(flavor = "multi_thread")]
3598 async fn test_session_save_with_history() {
3599 let store = Arc::new(crate::store::MemorySessionStore::new());
3600 let agent = Agent::from_config(test_config()).await.unwrap();
3601
3602 let opts = SessionOptions::new()
3603 .with_session_store(store.clone())
3604 .with_session_id("history-test");
3605 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
3606
3607 {
3609 let mut h = session.history.write().unwrap();
3610 h.push(Message::user("Hello"));
3611 h.push(Message::user("How are you?"));
3612 }
3613
3614 session.save().await.unwrap();
3615
3616 let data = store.load("history-test").await.unwrap().unwrap();
3617 assert_eq!(data.messages.len(), 2);
3618 }
3619
3620 #[tokio::test(flavor = "multi_thread")]
3621 async fn test_resume_session() {
3622 let store = Arc::new(crate::store::MemorySessionStore::new());
3623 let agent = Agent::from_config(test_config()).await.unwrap();
3624
3625 let opts = SessionOptions::new()
3627 .with_session_store(store.clone())
3628 .with_session_id("resume-test");
3629 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
3630 {
3631 let mut h = session.history.write().unwrap();
3632 h.push(Message::user("What is Rust?"));
3633 h.push(Message::user("Tell me more"));
3634 }
3635 session.save().await.unwrap();
3636
3637 let opts2 = SessionOptions::new().with_session_store(store.clone());
3639 let resumed = agent.resume_session("resume-test", opts2).unwrap();
3640
3641 assert_eq!(resumed.session_id(), "resume-test");
3642 let history = resumed.history();
3643 assert_eq!(history.len(), 2);
3644 assert_eq!(history[0].text(), "What is Rust?");
3645 }
3646
3647 #[tokio::test(flavor = "multi_thread")]
3648 async fn test_resume_session_not_found() {
3649 let store = Arc::new(crate::store::MemorySessionStore::new());
3650 let agent = Agent::from_config(test_config()).await.unwrap();
3651
3652 let opts = SessionOptions::new().with_session_store(store.clone());
3653 let result = agent.resume_session("nonexistent", opts);
3654 assert!(result.is_err());
3655 assert!(result.unwrap_err().to_string().contains("not found"));
3656 }
3657
3658 #[tokio::test(flavor = "multi_thread")]
3659 async fn test_resume_session_no_store() {
3660 let agent = Agent::from_config(test_config()).await.unwrap();
3661 let opts = SessionOptions::new();
3662 let result = agent.resume_session("any-id", opts);
3663 assert!(result.is_err());
3664 assert!(result.unwrap_err().to_string().contains("session_store"));
3665 }
3666
3667 #[tokio::test(flavor = "multi_thread")]
3668 async fn test_file_session_store_persistence() {
3669 let dir = tempfile::TempDir::new().unwrap();
3670 let store = Arc::new(
3671 crate::store::FileSessionStore::new(dir.path())
3672 .await
3673 .unwrap(),
3674 );
3675 let agent = Agent::from_config(test_config()).await.unwrap();
3676
3677 let opts = SessionOptions::new()
3679 .with_session_store(store.clone())
3680 .with_session_id("file-persist");
3681 let session = agent
3682 .session("/tmp/test-ws-file-persist", Some(opts))
3683 .unwrap();
3684 {
3685 let mut h = session.history.write().unwrap();
3686 h.push(Message::user("test message"));
3687 }
3688 session.save().await.unwrap();
3689
3690 let store2 = Arc::new(
3692 crate::store::FileSessionStore::new(dir.path())
3693 .await
3694 .unwrap(),
3695 );
3696 let data = store2.load("file-persist").await.unwrap().unwrap();
3697 assert_eq!(data.messages.len(), 1);
3698 }
3699
3700 #[tokio::test(flavor = "multi_thread")]
3701 async fn test_session_options_builders() {
3702 let opts = SessionOptions::new()
3703 .with_session_id("test-id")
3704 .with_auto_save(true);
3705 assert_eq!(opts.session_id, Some("test-id".to_string()));
3706 assert!(opts.auto_save);
3707 }
3708
3709 #[test]
3714 fn test_session_options_with_sandbox_sets_config() {
3715 use crate::sandbox::SandboxConfig;
3716 let cfg = SandboxConfig {
3717 image: "ubuntu:22.04".into(),
3718 memory_mb: 1024,
3719 ..SandboxConfig::default()
3720 };
3721 let opts = SessionOptions::new().with_sandbox(cfg);
3722 assert!(opts.sandbox_config.is_some());
3723 let sc = opts.sandbox_config.unwrap();
3724 assert_eq!(sc.image, "ubuntu:22.04");
3725 assert_eq!(sc.memory_mb, 1024);
3726 }
3727
3728 #[test]
3729 fn test_session_options_default_has_no_sandbox() {
3730 let opts = SessionOptions::default();
3731 assert!(opts.sandbox_config.is_none());
3732 }
3733
3734 #[tokio::test]
3735 async fn test_session_debug_includes_sandbox_config() {
3736 use crate::sandbox::SandboxConfig;
3737 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
3738 let debug = format!("{:?}", opts);
3739 assert!(debug.contains("sandbox_config"));
3740 }
3741
3742 #[tokio::test]
3743 async fn test_session_build_with_sandbox_config_no_feature_warn() {
3744 let agent = Agent::from_config(test_config()).await.unwrap();
3747 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
3748 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
3750 assert!(session.is_ok());
3751 }
3752
3753 #[tokio::test(flavor = "multi_thread")]
3758 async fn test_session_with_memory_store() {
3759 use a3s_memory::InMemoryStore;
3760 let store = Arc::new(InMemoryStore::new());
3761 let agent = Agent::from_config(test_config()).await.unwrap();
3762 let opts = SessionOptions::new().with_memory(store);
3763 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
3764 assert!(session.memory().is_some());
3765 }
3766
3767 #[tokio::test(flavor = "multi_thread")]
3768 async fn test_session_without_memory_store() {
3769 let agent = Agent::from_config(test_config()).await.unwrap();
3770 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
3771 assert!(session.memory().is_none());
3772 }
3773
3774 #[tokio::test(flavor = "multi_thread")]
3775 async fn test_session_memory_wired_into_config() {
3776 use a3s_memory::InMemoryStore;
3777 let store = Arc::new(InMemoryStore::new());
3778 let agent = Agent::from_config(test_config()).await.unwrap();
3779 let opts = SessionOptions::new().with_memory(store);
3780 let session = agent
3781 .session("/tmp/test-ws-mem-config", Some(opts))
3782 .unwrap();
3783 assert!(session.memory().is_some());
3785 }
3786
3787 #[tokio::test(flavor = "multi_thread")]
3788 async fn test_session_with_file_memory() {
3789 let dir = tempfile::TempDir::new().unwrap();
3790 let agent = Agent::from_config(test_config()).await.unwrap();
3791 let opts = SessionOptions::new().with_file_memory(dir.path());
3792 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
3793 assert!(session.memory().is_some());
3794 }
3795
3796 #[tokio::test(flavor = "multi_thread")]
3797 async fn test_memory_remember_and_recall() {
3798 use a3s_memory::InMemoryStore;
3799 let store = Arc::new(InMemoryStore::new());
3800 let agent = Agent::from_config(test_config()).await.unwrap();
3801 let opts = SessionOptions::new().with_memory(store);
3802 let session = agent
3803 .session("/tmp/test-ws-mem-recall", Some(opts))
3804 .unwrap();
3805
3806 let memory = session.memory().unwrap();
3807 memory
3808 .remember_success("write a file", &["write".to_string()], "done")
3809 .await
3810 .unwrap();
3811
3812 let results = memory.recall_similar("write", 5).await.unwrap();
3813 assert!(!results.is_empty());
3814 let stats = memory.stats().await.unwrap();
3815 assert_eq!(stats.long_term_count, 1);
3816 }
3817
3818 #[tokio::test(flavor = "multi_thread")]
3823 async fn test_session_tool_timeout_configured() {
3824 let agent = Agent::from_config(test_config()).await.unwrap();
3825 let opts = SessionOptions::new().with_tool_timeout(5000);
3826 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
3827 assert!(!session.id().is_empty());
3828 }
3829
3830 #[tokio::test(flavor = "multi_thread")]
3835 async fn test_session_without_queue_builds_ok() {
3836 let agent = Agent::from_config(test_config()).await.unwrap();
3837 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
3838 assert!(!session.id().is_empty());
3839 }
3840
3841 #[tokio::test(flavor = "multi_thread")]
3846 async fn test_concurrent_history_reads() {
3847 let agent = Agent::from_config(test_config()).await.unwrap();
3848 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
3849
3850 let handles: Vec<_> = (0..10)
3851 .map(|_| {
3852 let s = Arc::clone(&session);
3853 tokio::spawn(async move { s.history().len() })
3854 })
3855 .collect();
3856
3857 for h in handles {
3858 h.await.unwrap();
3859 }
3860 }
3861
3862 #[tokio::test(flavor = "multi_thread")]
3867 async fn test_session_no_init_warning_without_file_memory() {
3868 let agent = Agent::from_config(test_config()).await.unwrap();
3869 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
3870 assert!(session.init_warning().is_none());
3871 }
3872
3873 #[tokio::test(flavor = "multi_thread")]
3874 async fn test_register_agent_dir_loads_agents_into_live_session() {
3875 let temp_dir = tempfile::tempdir().unwrap();
3876
3877 std::fs::write(
3879 temp_dir.path().join("my-agent.yaml"),
3880 "name: my-dynamic-agent\ndescription: Dynamically registered agent\n",
3881 )
3882 .unwrap();
3883
3884 let agent = Agent::from_config(test_config()).await.unwrap();
3885 let session = agent.session(".", None).unwrap();
3886
3887 assert!(!session.agent_registry.exists("my-dynamic-agent"));
3889
3890 let count = session.register_agent_dir(temp_dir.path());
3891 assert_eq!(count, 1);
3892 assert!(session.agent_registry.exists("my-dynamic-agent"));
3893 }
3894
3895 #[tokio::test(flavor = "multi_thread")]
3896 async fn test_register_agent_dir_empty_dir_returns_zero() {
3897 let temp_dir = tempfile::tempdir().unwrap();
3898 let agent = Agent::from_config(test_config()).await.unwrap();
3899 let session = agent.session(".", None).unwrap();
3900 let count = session.register_agent_dir(temp_dir.path());
3901 assert_eq!(count, 0);
3902 }
3903
3904 #[tokio::test(flavor = "multi_thread")]
3905 async fn test_register_agent_dir_nonexistent_returns_zero() {
3906 let agent = Agent::from_config(test_config()).await.unwrap();
3907 let session = agent.session(".", None).unwrap();
3908 let count = session.register_agent_dir(std::path::Path::new("/nonexistent/path/abc"));
3909 assert_eq!(count, 0);
3910 }
3911
3912 #[tokio::test(flavor = "multi_thread")]
3913 async fn test_session_with_mcp_manager_builds_ok() {
3914 use crate::mcp::manager::McpManager;
3915 let mcp = Arc::new(McpManager::new());
3916 let agent = Agent::from_config(test_config()).await.unwrap();
3917 let opts = SessionOptions::new().with_mcp(mcp);
3918 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
3920 assert!(!session.id().is_empty());
3921 }
3922
3923 #[test]
3924 fn test_session_command_is_pub() {
3925 use crate::SessionCommand;
3927 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
3928 }
3929
3930 #[tokio::test(flavor = "multi_thread")]
3931 async fn test_session_submit_with_queue_executes() {
3932 let agent = Agent::from_config(test_config()).await.unwrap();
3933 let qc = SessionQueueConfig::default();
3934 let opts = SessionOptions::new().with_queue_config(qc);
3935 let session = agent
3936 .session("/tmp/test-ws-submit-exec", Some(opts))
3937 .unwrap();
3938
3939 struct Echo(serde_json::Value);
3940 #[async_trait::async_trait]
3941 impl crate::queue::SessionCommand for Echo {
3942 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
3943 Ok(self.0.clone())
3944 }
3945 fn command_type(&self) -> &str {
3946 "echo"
3947 }
3948 }
3949
3950 let rx = session
3951 .submit(
3952 SessionLane::Query,
3953 Box::new(Echo(serde_json::json!({"ok": true}))),
3954 )
3955 .await
3956 .expect("submit should succeed with queue configured");
3957
3958 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
3959 .await
3960 .expect("timed out waiting for command result")
3961 .expect("channel closed before result")
3962 .expect("command returned an error");
3963
3964 assert_eq!(result["ok"], true);
3965 }
3966}