1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{
21 CommandAction, CommandContext, CommandRegistry, CronCancelCommand, CronListCommand, LoopCommand,
22};
23use crate::config::CodeConfig;
24use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
25use crate::hitl::PendingConfirmationInfo;
26use crate::llm::{LlmClient, Message};
27use crate::prompts::{PlanningMode, SystemPromptSlots};
28use crate::queue::{
29 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
30 SessionQueueStats,
31};
32use crate::scheduler::{CronScheduler, ScheduledFire};
33use crate::session_lane_queue::SessionLaneQueue;
34use crate::task::{ProgressTracker, TaskManager};
35use crate::text::truncate_utf8;
36use crate::tools::{ToolContext, ToolExecutor};
37use a3s_lane::{DeadLetter, MetricsSnapshot};
38use a3s_memory::{FileMemoryStore, MemoryStore};
39use anyhow::Context;
40use std::collections::HashMap;
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicBool, Ordering};
43use std::sync::{Arc, RwLock};
44use tokio::sync::{broadcast, mpsc};
45use tokio::task::JoinHandle;
46
47fn safe_canonicalize(path: &Path) -> PathBuf {
50 match std::fs::canonicalize(path) {
51 Ok(p) => strip_unc_prefix(p),
52 Err(_) => path.to_path_buf(),
53 }
54}
55
56fn strip_unc_prefix(path: PathBuf) -> PathBuf {
59 #[cfg(windows)]
60 {
61 let s = path.to_string_lossy();
62 if let Some(stripped) = s.strip_prefix(r"\\?\") {
63 return PathBuf::from(stripped);
64 }
65 }
66 path
67}
68
69#[derive(Debug, Clone)]
75pub struct ToolCallResult {
76 pub name: String,
77 pub output: String,
78 pub exit_code: i32,
79 pub metadata: Option<serde_json::Value>,
80}
81
82#[derive(Clone, Default)]
88pub struct SessionOptions {
89 pub model: Option<String>,
91 pub agent_dirs: Vec<PathBuf>,
94 pub queue_config: Option<SessionQueueConfig>,
99 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
101 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
103 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
105 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
107 pub planning_mode: PlanningMode,
109 pub goal_tracking: bool,
111 pub skill_dirs: Vec<PathBuf>,
114 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
116 pub memory_store: Option<Arc<dyn MemoryStore>>,
118 pub(crate) file_memory_dir: Option<PathBuf>,
120 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
122 pub session_id: Option<String>,
124 pub auto_save: bool,
126 pub max_parse_retries: Option<u32>,
129 pub tool_timeout_ms: Option<u64>,
132 pub circuit_breaker_threshold: Option<u32>,
136 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
144 pub sandbox_handle: Option<Arc<dyn crate::sandbox::BashSandbox>>,
150 pub auto_compact: bool,
152 pub auto_compact_threshold: Option<f32>,
155 pub continuation_enabled: Option<bool>,
158 pub max_continuation_turns: Option<u32>,
161 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
166 pub temperature: Option<f32>,
168 pub thinking_budget: Option<usize>,
170 pub max_tool_rounds: Option<usize>,
176 pub prompt_slots: Option<SystemPromptSlots>,
182 pub hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
189 pub plugins: Vec<std::sync::Arc<dyn crate::plugin::Plugin>>,
198}
199
200impl std::fmt::Debug for SessionOptions {
201 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
202 f.debug_struct("SessionOptions")
203 .field("model", &self.model)
204 .field("agent_dirs", &self.agent_dirs)
205 .field("skill_dirs", &self.skill_dirs)
206 .field("queue_config", &self.queue_config)
207 .field("security_provider", &self.security_provider.is_some())
208 .field("context_providers", &self.context_providers.len())
209 .field("confirmation_manager", &self.confirmation_manager.is_some())
210 .field("permission_checker", &self.permission_checker.is_some())
211 .field("planning_mode", &self.planning_mode)
212 .field("goal_tracking", &self.goal_tracking)
213 .field(
214 "skill_registry",
215 &self
216 .skill_registry
217 .as_ref()
218 .map(|r| format!("{} skills", r.len())),
219 )
220 .field("memory_store", &self.memory_store.is_some())
221 .field("session_store", &self.session_store.is_some())
222 .field("session_id", &self.session_id)
223 .field("auto_save", &self.auto_save)
224 .field("max_parse_retries", &self.max_parse_retries)
225 .field("tool_timeout_ms", &self.tool_timeout_ms)
226 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
227 .field("sandbox_config", &self.sandbox_config)
228 .field("auto_compact", &self.auto_compact)
229 .field("auto_compact_threshold", &self.auto_compact_threshold)
230 .field("continuation_enabled", &self.continuation_enabled)
231 .field("max_continuation_turns", &self.max_continuation_turns)
232 .field(
233 "plugins",
234 &self.plugins.iter().map(|p| p.name()).collect::<Vec<_>>(),
235 )
236 .field("mcp_manager", &self.mcp_manager.is_some())
237 .field("temperature", &self.temperature)
238 .field("thinking_budget", &self.thinking_budget)
239 .field("max_tool_rounds", &self.max_tool_rounds)
240 .field("prompt_slots", &self.prompt_slots.is_some())
241 .finish()
242 }
243}
244
245impl SessionOptions {
246 pub fn new() -> Self {
247 Self::default()
248 }
249
250 pub fn with_plugin(mut self, plugin: impl crate::plugin::Plugin + 'static) -> Self {
255 self.plugins.push(std::sync::Arc::new(plugin));
256 self
257 }
258
259 pub fn with_model(mut self, model: impl Into<String>) -> Self {
260 self.model = Some(model.into());
261 self
262 }
263
264 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
265 self.agent_dirs.push(dir.into());
266 self
267 }
268
269 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
270 self.queue_config = Some(config);
271 self
272 }
273
274 pub fn with_default_security(mut self) -> Self {
276 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
277 self
278 }
279
280 pub fn with_security_provider(
282 mut self,
283 provider: Arc<dyn crate::security::SecurityProvider>,
284 ) -> Self {
285 self.security_provider = Some(provider);
286 self
287 }
288
289 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
291 let config = crate::context::FileSystemContextConfig::new(root_path);
292 self.context_providers
293 .push(Arc::new(crate::context::FileSystemContextProvider::new(
294 config,
295 )));
296 self
297 }
298
299 pub fn with_context_provider(
301 mut self,
302 provider: Arc<dyn crate::context::ContextProvider>,
303 ) -> Self {
304 self.context_providers.push(provider);
305 self
306 }
307
308 pub fn with_confirmation_manager(
310 mut self,
311 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
312 ) -> Self {
313 self.confirmation_manager = Some(manager);
314 self
315 }
316
317 pub fn with_permission_checker(
319 mut self,
320 checker: Arc<dyn crate::permissions::PermissionChecker>,
321 ) -> Self {
322 self.permission_checker = Some(checker);
323 self
324 }
325
326 pub fn with_permissive_policy(self) -> Self {
333 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
334 }
335
336 pub fn with_planning_mode(mut self, mode: PlanningMode) -> Self {
338 self.planning_mode = mode;
339 self
340 }
341
342 pub fn with_planning(mut self, enabled: bool) -> Self {
344 self.planning_mode = if enabled {
345 PlanningMode::Enabled
346 } else {
347 PlanningMode::Disabled
348 };
349 self
350 }
351
352 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
354 self.goal_tracking = enabled;
355 self
356 }
357
358 pub fn with_builtin_skills(mut self) -> Self {
360 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
361 self
362 }
363
364 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
366 self.skill_registry = Some(registry);
367 self
368 }
369
370 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
373 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
374 self
375 }
376
377 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
379 let registry = self
380 .skill_registry
381 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
382 if let Err(e) = registry.load_from_dir(&dir) {
383 tracing::warn!(
384 dir = %dir.as_ref().display(),
385 error = %e,
386 "Failed to load skills from directory — continuing without them"
387 );
388 }
389 self.skill_registry = Some(registry);
390 self
391 }
392
393 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
395 self.memory_store = Some(store);
396 self
397 }
398
399 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
405 self.file_memory_dir = Some(dir.into());
406 self
407 }
408
409 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
411 self.session_store = Some(store);
412 self
413 }
414
415 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
417 let dir = dir.into();
418 match tokio::runtime::Handle::try_current() {
419 Ok(handle) => {
420 match tokio::task::block_in_place(|| {
421 handle.block_on(crate::store::FileSessionStore::new(dir))
422 }) {
423 Ok(store) => {
424 self.session_store =
425 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
426 }
427 Err(e) => {
428 tracing::warn!("Failed to create file session store: {}", e);
429 }
430 }
431 }
432 Err(_) => {
433 tracing::warn!(
434 "No async runtime available for file session store — persistence disabled"
435 );
436 }
437 }
438 self
439 }
440
441 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
443 self.session_id = Some(id.into());
444 self
445 }
446
447 pub fn with_auto_save(mut self, enabled: bool) -> Self {
449 self.auto_save = enabled;
450 self
451 }
452
453 pub fn with_parse_retries(mut self, max: u32) -> Self {
459 self.max_parse_retries = Some(max);
460 self
461 }
462
463 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
469 self.tool_timeout_ms = Some(timeout_ms);
470 self
471 }
472
473 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
479 self.circuit_breaker_threshold = Some(threshold);
480 self
481 }
482
483 pub fn with_resilience_defaults(self) -> Self {
489 self.with_parse_retries(2)
490 .with_tool_timeout(120_000)
491 .with_circuit_breaker(3)
492 }
493
494 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
513 self.sandbox_config = Some(config);
514 self
515 }
516
517 pub fn with_sandbox_handle(mut self, handle: Arc<dyn crate::sandbox::BashSandbox>) -> Self {
525 self.sandbox_handle = Some(handle);
526 self
527 }
528
529 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
534 self.auto_compact = enabled;
535 self
536 }
537
538 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
540 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
541 self
542 }
543
544 pub fn with_continuation(mut self, enabled: bool) -> Self {
549 self.continuation_enabled = Some(enabled);
550 self
551 }
552
553 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
555 self.max_continuation_turns = Some(turns);
556 self
557 }
558
559 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
564 self.mcp_manager = Some(manager);
565 self
566 }
567
568 pub fn with_temperature(mut self, temperature: f32) -> Self {
569 self.temperature = Some(temperature);
570 self
571 }
572
573 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
574 self.thinking_budget = Some(budget);
575 self
576 }
577
578 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
583 self.max_tool_rounds = Some(rounds);
584 self
585 }
586
587 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
592 self.prompt_slots = Some(slots);
593 self
594 }
595
596 pub fn with_hook_executor(mut self, executor: Arc<dyn crate::hooks::HookExecutor>) -> Self {
602 self.hook_executor = Some(executor);
603 self
604 }
605}
606
607pub struct Agent {
616 llm_client: Arc<dyn LlmClient>,
617 code_config: CodeConfig,
618 config: AgentConfig,
619 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
621 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
624}
625
626impl std::fmt::Debug for Agent {
627 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
628 f.debug_struct("Agent").finish()
629 }
630}
631
632impl Agent {
633 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
637 let source = config_source.into();
638
639 let expanded = if let Some(rest) = source.strip_prefix("~/") {
641 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
642 if let Some(home) = home {
643 PathBuf::from(home).join(rest).display().to_string()
644 } else {
645 source.clone()
646 }
647 } else {
648 source.clone()
649 };
650
651 let path = Path::new(&expanded);
652
653 let config = if matches!(
654 path.extension().and_then(|ext| ext.to_str()),
655 Some("hcl" | "json")
656 ) {
657 if !path.exists() {
658 return Err(CodeError::Config(format!(
659 "Config file not found: {}",
660 path.display()
661 )));
662 }
663
664 CodeConfig::from_file(path)
665 .with_context(|| format!("Failed to load config: {}", path.display()))?
666 } else if matches!(path.extension().and_then(|ext| ext.to_str()), Some("acl")) {
667 if !path.exists() {
669 return Err(CodeError::Config(format!(
670 "Config file not found: {}",
671 path.display()
672 )));
673 }
674 let content = std::fs::read_to_string(path)
675 .map_err(|e| CodeError::Config(format!("Failed to read ACL file: {}", e)))?;
676 CodeConfig::from_acl(&content)
677 .with_context(|| format!("Failed to parse ACL config: {}", path.display()))?
678 } else if source.trim().starts_with('{') {
679 serde_json::from_str(&source)
681 .map_err(|e| CodeError::Config(format!("Failed to parse JSON config: {}", e)))?
682 } else if source.trim().starts_with("providers \"") {
683 CodeConfig::from_acl(&source).context("Failed to parse config as ACL string")?
685 } else {
686 CodeConfig::from_acl(&source).context("Failed to parse config as ACL string")?
688 };
689
690 Self::from_config(config).await
691 }
692
693 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
698 Self::new(config_source).await
699 }
700
701 pub async fn from_config(config: CodeConfig) -> Result<Self> {
703 let llm_config = config
704 .default_llm_config()
705 .context("default_model must be set in 'provider/model' format with a valid API key")?;
706 let llm_client = crate::llm::create_client_with_config(llm_config);
707
708 let agent_config = AgentConfig {
709 max_tool_rounds: config
710 .max_tool_rounds
711 .unwrap_or(AgentConfig::default().max_tool_rounds),
712 ..AgentConfig::default()
713 };
714
715 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
717 (None, vec![])
718 } else {
719 let manager = Arc::new(crate::mcp::manager::McpManager::new());
720 for server in &config.mcp_servers {
721 if !server.enabled {
722 continue;
723 }
724 manager.register_server(server.clone()).await;
725 if let Err(e) = manager.connect(&server.name).await {
726 tracing::warn!(
727 server = %server.name,
728 error = %e,
729 "Failed to connect to MCP server — skipping"
730 );
731 }
732 }
733 let tools = manager.get_all_tools().await;
735 (Some(manager), tools)
736 };
737
738 let mut agent = Agent {
739 llm_client,
740 code_config: config,
741 config: agent_config,
742 global_mcp,
743 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
744 };
745
746 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
748 for dir in &agent.code_config.skill_dirs.clone() {
749 if let Err(e) = registry.load_from_dir(dir) {
750 tracing::warn!(
751 dir = %dir.display(),
752 error = %e,
753 "Failed to load skills from directory — skipping"
754 );
755 }
756 }
757 agent.config.skill_registry = Some(registry);
758
759 Ok(agent)
760 }
761
762 pub async fn refresh_mcp_tools(&self) -> Result<()> {
770 if let Some(ref mcp) = self.global_mcp {
771 let fresh = mcp.get_all_tools().await;
772 *self
773 .global_mcp_tools
774 .lock()
775 .expect("global_mcp_tools lock poisoned") = fresh;
776 }
777 Ok(())
778 }
779
780 pub fn session(
785 &self,
786 workspace: impl Into<String>,
787 options: Option<SessionOptions>,
788 ) -> Result<AgentSession> {
789 let opts = options.unwrap_or_default();
790
791 let mut merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
794 (Some(global), Some(session)) => {
795 let global = Arc::clone(global);
796 let session_mgr = Arc::clone(session);
797 match tokio::runtime::Handle::try_current() {
798 Ok(handle) => {
799 let global_for_merge = Arc::clone(&global);
800 tokio::task::block_in_place(|| {
801 handle.block_on(async move {
802 for config in session_mgr.all_configs().await {
803 let name = config.name.clone();
804 global_for_merge.register_server(config).await;
805 if let Err(e) = global_for_merge.connect(&name).await {
806 tracing::warn!(
807 server = %name,
808 error = %e,
809 "Failed to connect session-level MCP server — skipping"
810 );
811 }
812 }
813 })
814 });
815 }
816 Err(_) => {
817 tracing::warn!(
818 "No async runtime available to merge session-level MCP servers \
819 into global manager — session MCP servers will not be available"
820 );
821 }
822 }
823 SessionOptions {
824 mcp_manager: Some(Arc::clone(&global)),
825 ..opts
826 }
827 }
828 (Some(global), None) => SessionOptions {
829 mcp_manager: Some(Arc::clone(global)),
830 ..opts
831 },
832 _ => opts,
833 };
834
835 let session_id = merged_opts
836 .session_id
837 .clone()
838 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
839 merged_opts.session_id = Some(session_id.clone());
840 let llm_client = self.resolve_session_llm_client(&merged_opts, Some(&session_id))?;
841
842 self.build_session(workspace.into(), llm_client, &merged_opts)
843 }
844
845 pub fn session_for_agent(
860 &self,
861 workspace: impl Into<String>,
862 def: &crate::subagent::AgentDefinition,
863 extra: Option<SessionOptions>,
864 ) -> Result<AgentSession> {
865 let mut opts = extra.unwrap_or_default();
866
867 if opts.permission_checker.is_none()
869 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
870 {
871 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
872 }
873
874 if opts.max_tool_rounds.is_none() {
876 if let Some(steps) = def.max_steps {
877 opts.max_tool_rounds = Some(steps);
878 }
879 }
880
881 if opts.model.is_none() {
883 if let Some(ref m) = def.model {
884 let provider = m.provider.as_deref().unwrap_or("anthropic");
885 opts.model = Some(format!("{}/{}", provider, m.model));
886 }
887 }
888
889 if let Some(ref prompt) = def.prompt {
896 let slots = opts
897 .prompt_slots
898 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
899 if slots.extra.is_none() {
900 slots.extra = Some(prompt.clone());
901 }
902 }
903
904 self.session(workspace, Some(opts))
905 }
906
907 pub fn resume_session(
915 &self,
916 session_id: &str,
917 options: SessionOptions,
918 ) -> Result<AgentSession> {
919 let store = options.session_store.as_ref().ok_or_else(|| {
920 crate::error::CodeError::Session(
921 "resume_session requires a session_store in SessionOptions".to_string(),
922 )
923 })?;
924
925 let data = match tokio::runtime::Handle::try_current() {
927 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
928 .map_err(|e| {
929 crate::error::CodeError::Session(format!(
930 "Failed to load session {}: {}",
931 session_id, e
932 ))
933 })?,
934 Err(_) => {
935 return Err(crate::error::CodeError::Session(
936 "No async runtime available for session resume".to_string(),
937 ))
938 }
939 };
940
941 let data = data.ok_or_else(|| {
942 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
943 })?;
944
945 let mut opts = options;
947 opts.session_id = Some(data.id.clone());
948 let llm_client = self.resolve_session_llm_client(&opts, Some(&data.id))?;
949
950 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
951
952 *write_or_recover(&session.history) = data.messages;
954
955 Ok(session)
956 }
957
958 fn resolve_session_llm_client(
959 &self,
960 opts: &SessionOptions,
961 session_id: Option<&str>,
962 ) -> Result<Arc<dyn LlmClient>> {
963 let model_ref = if let Some(ref model) = opts.model {
964 model.as_str()
965 } else {
966 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
967 tracing::warn!(
968 "temperature/thinking_budget set without model override — these will be ignored. \
969 Use with_model() to apply LLM parameter overrides."
970 );
971 }
972 self.code_config
973 .default_model
974 .as_deref()
975 .context("default_model must be set in 'provider/model' format")?
976 };
977
978 let (provider_name, model_id) = model_ref
979 .split_once('/')
980 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
981
982 let mut llm_config = self
983 .code_config
984 .llm_config(provider_name, model_id)
985 .with_context(|| {
986 format!("provider '{provider_name}' or model '{model_id}' not found in config")
987 })?;
988
989 if opts.model.is_some() {
990 if let Some(temp) = opts.temperature {
991 llm_config = llm_config.with_temperature(temp);
992 }
993 if let Some(budget) = opts.thinking_budget {
994 llm_config = llm_config.with_thinking_budget(budget);
995 }
996 }
997
998 if let Some(session_id) = session_id {
999 llm_config = llm_config.with_session_id(session_id);
1000 }
1001
1002 Ok(crate::llm::create_client_with_config(llm_config))
1003 }
1004
1005 fn build_session(
1006 &self,
1007 workspace: String,
1008 llm_client: Arc<dyn LlmClient>,
1009 opts: &SessionOptions,
1010 ) -> Result<AgentSession> {
1011 let canonical = safe_canonicalize(Path::new(&workspace));
1012
1013 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
1014
1015 if let Some(ref search_config) = self.code_config.search {
1017 tool_executor
1018 .registry()
1019 .set_search_config(search_config.clone());
1020 }
1021
1022 let agent_registry = {
1026 use crate::subagent::{load_agents_from_dir, AgentRegistry};
1027 use crate::tools::register_task_with_mcp;
1028 let registry = AgentRegistry::new();
1029 for dir in self
1030 .code_config
1031 .agent_dirs
1032 .iter()
1033 .chain(opts.agent_dirs.iter())
1034 {
1035 for agent in load_agents_from_dir(dir) {
1036 registry.register(agent);
1037 }
1038 }
1039 let registry = Arc::new(registry);
1040 register_task_with_mcp(
1041 tool_executor.registry(),
1042 Arc::clone(&llm_client),
1043 Arc::clone(®istry),
1044 canonical.display().to_string(),
1045 opts.mcp_manager.clone(),
1046 );
1047 registry
1048 };
1049
1050 if let Some(ref mcp) = opts.mcp_manager {
1053 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
1056 Arc::as_ptr(mcp),
1057 self.global_mcp
1058 .as_ref()
1059 .map(Arc::as_ptr)
1060 .unwrap_or(std::ptr::null()),
1061 ) {
1062 self.global_mcp_tools
1064 .lock()
1065 .expect("global_mcp_tools lock poisoned")
1066 .clone()
1067 } else {
1068 match tokio::runtime::Handle::try_current() {
1070 Ok(handle) => {
1071 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
1072 }
1073 Err(_) => {
1074 tracing::warn!(
1075 "No async runtime available for session-level MCP tools — \
1076 MCP tools will not be registered"
1077 );
1078 vec![]
1079 }
1080 }
1081 };
1082
1083 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
1084 std::collections::HashMap::new();
1085 for (server, tool) in all_tools {
1086 by_server.entry(server).or_default().push(tool);
1087 }
1088 for (server_name, tools) in by_server {
1089 for tool in
1090 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
1091 {
1092 tool_executor.register_dynamic_tool(tool);
1093 }
1094 }
1095 }
1096
1097 let tool_defs = tool_executor.definitions();
1098
1099 let mut prompt_slots = opts
1101 .prompt_slots
1102 .clone()
1103 .unwrap_or_else(|| self.config.prompt_slots.clone());
1104
1105 let agents_md_path = canonical.join("AGENTS.md");
1107 if agents_md_path.exists() && agents_md_path.is_file() {
1108 match std::fs::read_to_string(&agents_md_path) {
1109 Ok(content) if !content.trim().is_empty() => {
1110 tracing::info!(
1111 path = %agents_md_path.display(),
1112 "Auto-loaded AGENTS.md from workspace root"
1113 );
1114 prompt_slots.extra = match prompt_slots.extra {
1115 Some(existing) => Some(format!(
1116 "{}\n\n# Project Instructions (AGENTS.md)\n\n{}",
1117 existing, content
1118 )),
1119 None => Some(format!("# Project Instructions (AGENTS.md)\n\n{}", content)),
1120 };
1121 }
1122 Ok(_) => {
1123 tracing::debug!(
1124 path = %agents_md_path.display(),
1125 "AGENTS.md exists but is empty — skipping"
1126 );
1127 }
1128 Err(e) => {
1129 tracing::warn!(
1130 path = %agents_md_path.display(),
1131 error = %e,
1132 "Failed to read AGENTS.md — skipping"
1133 );
1134 }
1135 }
1136 }
1137
1138 let base_registry = self
1142 .config
1143 .skill_registry
1144 .as_deref()
1145 .map(|r| r.fork())
1146 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
1147 if let Some(ref r) = opts.skill_registry {
1149 for skill in r.all() {
1150 base_registry.register_unchecked(skill);
1151 }
1152 }
1153 for dir in &opts.skill_dirs {
1155 if let Err(e) = base_registry.load_from_dir(dir) {
1156 tracing::warn!(
1157 dir = %dir.display(),
1158 error = %e,
1159 "Failed to load session skill dir — skipping"
1160 );
1161 }
1162 }
1163 let effective_registry = Arc::new(base_registry);
1164
1165 if !opts.plugins.is_empty() {
1168 use crate::plugin::PluginContext;
1169 let plugin_ctx = PluginContext::new()
1170 .with_llm(Arc::clone(&self.llm_client))
1171 .with_skill_registry(Arc::clone(&effective_registry));
1172 let plugin_registry = tool_executor.registry();
1173 for plugin in &opts.plugins {
1174 tracing::info!("Loading plugin '{}' v{}", plugin.name(), plugin.version());
1175 match plugin.load(plugin_registry, &plugin_ctx) {
1176 Ok(()) => {
1177 for skill in plugin.skills() {
1178 tracing::debug!(
1179 "Plugin '{}' registered skill '{}'",
1180 plugin.name(),
1181 skill.name
1182 );
1183 effective_registry.register_unchecked(skill);
1184 }
1185 }
1186 Err(e) => {
1187 tracing::error!("Plugin '{}' failed to load: {}", plugin.name(), e);
1188 }
1189 }
1190 }
1191 }
1192
1193 let skill_prompt = effective_registry.to_system_prompt();
1195 if !skill_prompt.is_empty() {
1196 prompt_slots.extra = match prompt_slots.extra {
1197 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
1198 None => Some(skill_prompt),
1199 };
1200 }
1201
1202 let mut init_warning: Option<String> = None;
1204 let memory = {
1205 let store = if let Some(ref store) = opts.memory_store {
1206 Some(Arc::clone(store))
1207 } else if let Some(ref dir) = opts.file_memory_dir {
1208 match tokio::runtime::Handle::try_current() {
1209 Ok(handle) => {
1210 let dir = dir.clone();
1211 match tokio::task::block_in_place(|| {
1212 handle.block_on(FileMemoryStore::new(dir))
1213 }) {
1214 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1215 Err(e) => {
1216 let msg = format!("Failed to create file memory store: {}", e);
1217 tracing::warn!("{}", msg);
1218 init_warning = Some(msg);
1219 None
1220 }
1221 }
1222 }
1223 Err(_) => {
1224 let msg =
1225 "No async runtime available for file memory store — memory disabled"
1226 .to_string();
1227 tracing::warn!("{}", msg);
1228 init_warning = Some(msg);
1229 None
1230 }
1231 }
1232 } else {
1233 None
1234 };
1235 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1236 };
1237
1238 let base = self.config.clone();
1239 let config = AgentConfig {
1240 prompt_slots,
1241 tools: tool_defs,
1242 security_provider: opts.security_provider.clone(),
1243 permission_checker: opts.permission_checker.clone(),
1244 confirmation_manager: opts.confirmation_manager.clone(),
1245 context_providers: opts.context_providers.clone(),
1246 planning_mode: opts.planning_mode,
1247 goal_tracking: opts.goal_tracking,
1248 skill_registry: Some(Arc::clone(&effective_registry)),
1249 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1250 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1251 circuit_breaker_threshold: opts
1252 .circuit_breaker_threshold
1253 .unwrap_or(base.circuit_breaker_threshold),
1254 auto_compact: opts.auto_compact,
1255 auto_compact_threshold: opts
1256 .auto_compact_threshold
1257 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
1258 max_context_tokens: base.max_context_tokens,
1259 llm_client: Some(Arc::clone(&llm_client)),
1260 memory: memory.clone(),
1261 continuation_enabled: opts
1262 .continuation_enabled
1263 .unwrap_or(base.continuation_enabled),
1264 max_continuation_turns: opts
1265 .max_continuation_turns
1266 .unwrap_or(base.max_continuation_turns),
1267 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1268 ..base
1269 };
1270
1271 {
1275 use crate::tools::register_skill;
1276 register_skill(
1277 tool_executor.registry(),
1278 Arc::clone(&llm_client),
1279 Arc::clone(&effective_registry),
1280 Arc::clone(&tool_executor),
1281 config.clone(),
1282 );
1283 }
1284
1285 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1288 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1289 let session_id = uuid::Uuid::new_v4().to_string();
1290 let rt = tokio::runtime::Handle::try_current();
1291
1292 match rt {
1293 Ok(handle) => {
1294 let queue = tokio::task::block_in_place(|| {
1296 handle.block_on(SessionLaneQueue::new(
1297 &session_id,
1298 queue_config.clone(),
1299 agent_event_tx.clone(),
1300 ))
1301 });
1302 match queue {
1303 Ok(q) => {
1304 let q = Arc::new(q);
1306 let q2 = Arc::clone(&q);
1307 tokio::task::block_in_place(|| {
1308 handle.block_on(async { q2.start().await.ok() })
1309 });
1310 Some(q)
1311 }
1312 Err(e) => {
1313 tracing::warn!("Failed to create session lane queue: {}", e);
1314 None
1315 }
1316 }
1317 }
1318 Err(_) => {
1319 tracing::warn!(
1320 "No async runtime available for queue creation — queue disabled"
1321 );
1322 None
1323 }
1324 }
1325 } else {
1326 None
1327 };
1328
1329 let mut tool_context = ToolContext::new(canonical.clone());
1331 if let Some(ref search_config) = self.code_config.search {
1332 tool_context = tool_context.with_search_config(search_config.clone());
1333 }
1334 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1335
1336 if let Some(handle) = opts.sandbox_handle.clone() {
1338 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1339 tool_context = tool_context.with_sandbox(handle);
1340 } else if opts.sandbox_config.is_some() {
1341 tracing::warn!(
1342 "sandbox_config is set but no sandbox_handle was provided \
1343 — bash commands will run locally"
1344 );
1345 }
1346
1347 let session_id = opts
1348 .session_id
1349 .clone()
1350 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1351
1352 let session_store = if opts.session_store.is_some() {
1354 opts.session_store.clone()
1355 } else if let Some(ref dir) = self.code_config.sessions_dir {
1356 match tokio::runtime::Handle::try_current() {
1357 Ok(handle) => {
1358 let dir = dir.clone();
1359 match tokio::task::block_in_place(|| {
1360 handle.block_on(crate::store::FileSessionStore::new(dir))
1361 }) {
1362 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1363 Err(e) => {
1364 tracing::warn!(
1365 "Failed to create session store from sessions_dir: {}",
1366 e
1367 );
1368 None
1369 }
1370 }
1371 }
1372 Err(_) => {
1373 tracing::warn!(
1374 "No async runtime for sessions_dir store — persistence disabled"
1375 );
1376 None
1377 }
1378 }
1379 } else {
1380 None
1381 };
1382
1383 let (cron_scheduler, cron_rx) = CronScheduler::new();
1387 let mut command_registry = CommandRegistry::new();
1388 command_registry.register(Arc::new(LoopCommand {
1389 scheduler: Arc::clone(&cron_scheduler),
1390 }));
1391 command_registry.register(Arc::new(CronListCommand {
1392 scheduler: Arc::clone(&cron_scheduler),
1393 }));
1394 command_registry.register(Arc::new(CronCancelCommand {
1395 scheduler: Arc::clone(&cron_scheduler),
1396 }));
1397
1398 Ok(AgentSession {
1399 llm_client,
1400 tool_executor,
1401 tool_context,
1402 memory: config.memory.clone(),
1403 config,
1404 workspace: canonical,
1405 session_id,
1406 history: RwLock::new(Vec::new()),
1407 command_queue,
1408 session_store,
1409 auto_save: opts.auto_save,
1410 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1411 ahp_executor: opts.hook_executor.clone(),
1412 init_warning,
1413 command_registry: std::sync::Mutex::new(command_registry),
1414 model_name: opts
1415 .model
1416 .clone()
1417 .or_else(|| self.code_config.default_model.clone())
1418 .unwrap_or_else(|| "unknown".to_string()),
1419 mcp_manager: opts
1420 .mcp_manager
1421 .clone()
1422 .or_else(|| self.global_mcp.clone())
1423 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1424 agent_registry,
1425 cron_scheduler,
1426 cron_rx: tokio::sync::Mutex::new(cron_rx),
1427 is_processing_cron: AtomicBool::new(false),
1428 cron_started: AtomicBool::new(false),
1429 cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
1430 active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
1431 task_manager: Arc::new(TaskManager::new()),
1432 progress_tracker: Arc::new(tokio::sync::RwLock::new(ProgressTracker::new(30))),
1433 })
1434 }
1435}
1436
1437#[derive(Debug, Clone)]
1446pub struct BtwResult {
1447 pub question: String,
1449 pub answer: String,
1451 pub usage: crate::llm::TokenUsage,
1453}
1454
1455pub struct AgentSession {
1464 llm_client: Arc<dyn LlmClient>,
1465 tool_executor: Arc<ToolExecutor>,
1466 tool_context: ToolContext,
1467 config: AgentConfig,
1468 workspace: PathBuf,
1469 session_id: String,
1471 history: RwLock<Vec<Message>>,
1473 command_queue: Option<Arc<SessionLaneQueue>>,
1475 memory: Option<Arc<crate::memory::AgentMemory>>,
1477 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1479 auto_save: bool,
1481 hook_engine: Arc<crate::hooks::HookEngine>,
1483 ahp_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
1486 init_warning: Option<String>,
1488 command_registry: std::sync::Mutex<CommandRegistry>,
1491 model_name: String,
1493 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1495 agent_registry: Arc<crate::subagent::AgentRegistry>,
1497 cron_scheduler: Arc<CronScheduler>,
1499 cron_rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<ScheduledFire>>,
1501 is_processing_cron: AtomicBool,
1503 cron_started: AtomicBool,
1507 cancel_token: Arc<tokio::sync::Mutex<Option<tokio_util::sync::CancellationToken>>>,
1510 active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1512 task_manager: Arc<TaskManager>,
1514 progress_tracker: Arc<tokio::sync::RwLock<ProgressTracker>>,
1516}
1517
1518#[derive(Debug, Clone)]
1519struct ActiveToolSnapshot {
1520 tool_name: String,
1521 started_at_ms: u64,
1522}
1523
1524impl std::fmt::Debug for AgentSession {
1525 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1526 f.debug_struct("AgentSession")
1527 .field("session_id", &self.session_id)
1528 .field("workspace", &self.workspace.display().to_string())
1529 .field("auto_save", &self.auto_save)
1530 .finish()
1531 }
1532}
1533
1534impl AgentSession {
1535 fn now_ms() -> u64 {
1536 std::time::SystemTime::now()
1537 .duration_since(std::time::UNIX_EPOCH)
1538 .map(|d| d.as_millis() as u64)
1539 .unwrap_or(0)
1540 }
1541
1542 fn compact_json_value(value: &serde_json::Value) -> String {
1543 let raw = match value {
1544 serde_json::Value::Null => String::new(),
1545 serde_json::Value::String(s) => s.clone(),
1546 _ => serde_json::to_string(value).unwrap_or_default(),
1547 };
1548 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1549 if compact.len() > 180 {
1550 format!("{}...", truncate_utf8(&compact, 180))
1551 } else {
1552 compact
1553 }
1554 }
1555
1556 async fn apply_runtime_event(
1557 active_tools: &Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1558 event: &AgentEvent,
1559 ) {
1560 match event {
1561 AgentEvent::ToolStart { id, name } => {
1562 active_tools.write().await.insert(
1563 id.clone(),
1564 ActiveToolSnapshot {
1565 tool_name: name.clone(),
1566 started_at_ms: Self::now_ms(),
1567 },
1568 );
1569 }
1570 AgentEvent::ToolEnd { id, .. }
1571 | AgentEvent::PermissionDenied { tool_id: id, .. }
1572 | AgentEvent::ConfirmationRequired { tool_id: id, .. }
1573 | AgentEvent::ConfirmationReceived { tool_id: id, .. }
1574 | AgentEvent::ConfirmationTimeout { tool_id: id, .. } => {
1575 active_tools.write().await.remove(id);
1576 }
1577 _ => {}
1578 }
1579 }
1580
1581 async fn clear_runtime_tracking(&self) {
1582 self.active_tools.write().await.clear();
1583 }
1584
1585 fn build_agent_loop(&self) -> AgentLoop {
1589 let mut config = self.config.clone();
1590 config.hook_engine = Some(if let Some(ref ahp) = self.ahp_executor {
1591 ahp.clone()
1592 } else {
1593 Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>
1594 });
1595 config.tools = self.tool_executor.definitions();
1599 let mut agent_loop = AgentLoop::new(
1600 self.llm_client.clone(),
1601 self.tool_executor.clone(),
1602 self.tool_context.clone(),
1603 config,
1604 );
1605 if let Some(ref queue) = self.command_queue {
1606 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1607 }
1608 agent_loop = agent_loop.with_progress_tracker(Arc::clone(&self.progress_tracker));
1609 agent_loop = agent_loop.with_task_manager(Arc::clone(&self.task_manager));
1610 agent_loop
1611 }
1612
1613 fn build_command_context(&self) -> CommandContext {
1615 let history = read_or_recover(&self.history);
1616
1617 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1619
1620 let mut mcp_map: std::collections::HashMap<String, usize> =
1622 std::collections::HashMap::new();
1623 for name in &tool_names {
1624 if let Some(rest) = name.strip_prefix("mcp__") {
1625 if let Some((server, _)) = rest.split_once("__") {
1626 *mcp_map.entry(server.to_string()).or_default() += 1;
1627 }
1628 }
1629 }
1630 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1631 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1632
1633 CommandContext {
1634 session_id: self.session_id.clone(),
1635 workspace: self.workspace.display().to_string(),
1636 model: self.model_name.clone(),
1637 history_len: history.len(),
1638 total_tokens: 0,
1639 total_cost: 0.0,
1640 tool_names,
1641 mcp_servers,
1642 }
1643 }
1644
1645 pub fn command_registry(&self) -> std::sync::MutexGuard<'_, CommandRegistry> {
1649 self.command_registry
1650 .lock()
1651 .expect("command_registry lock poisoned")
1652 }
1653
1654 pub fn register_command(&self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1658 self.command_registry
1659 .lock()
1660 .expect("command_registry lock poisoned")
1661 .register(cmd);
1662 }
1663
1664 pub fn cron_scheduler(&self) -> &Arc<CronScheduler> {
1666 &self.cron_scheduler
1667 }
1668
1669 pub async fn close(&self) {
1671 let _ = self.cancel().await;
1672 self.cron_scheduler.stop();
1673 }
1674
1675 fn ensure_cron_started(&self) {
1680 if !self.cron_started.swap(true, Ordering::Relaxed) {
1681 CronScheduler::start(Arc::clone(&self.cron_scheduler));
1682 }
1683 }
1684
1685 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1694 self.ensure_cron_started();
1697
1698 if CommandRegistry::is_command(prompt) {
1700 let ctx = self.build_command_context();
1701 let output = self.command_registry().dispatch(prompt, &ctx);
1702 if let Some(output) = output {
1704 if let Some(CommandAction::BtwQuery(ref question)) = output.action {
1706 let result = self.btw(question).await?;
1707 return Ok(AgentResult {
1708 text: result.answer,
1709 messages: history
1710 .map(|h| h.to_vec())
1711 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1712 tool_calls_count: 0,
1713 usage: result.usage,
1714 });
1715 }
1716 return Ok(AgentResult {
1717 text: output.text,
1718 messages: history
1719 .map(|h| h.to_vec())
1720 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1721 tool_calls_count: 0,
1722 usage: crate::llm::TokenUsage::default(),
1723 });
1724 }
1725 }
1726
1727 if let Some(ref w) = self.init_warning {
1728 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1729 }
1730 let agent_loop = self.build_agent_loop();
1731 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
1732 let runtime_state = Arc::clone(&self.active_tools);
1733 let runtime_collector = tokio::spawn(async move {
1734 while let Some(event) = runtime_rx.recv().await {
1735 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1736 }
1737 });
1738
1739 let use_internal = history.is_none();
1740 let effective_history = match history {
1741 Some(h) => h.to_vec(),
1742 None => read_or_recover(&self.history).clone(),
1743 };
1744
1745 let cancel_token = tokio_util::sync::CancellationToken::new();
1746 *self.cancel_token.lock().await = Some(cancel_token.clone());
1747 let result = agent_loop
1748 .execute_with_session(
1749 &effective_history,
1750 prompt,
1751 Some(&self.session_id),
1752 Some(runtime_tx),
1753 Some(&cancel_token),
1754 )
1755 .await;
1756 *self.cancel_token.lock().await = None;
1757 let _ = runtime_collector.await;
1758 let result = result?;
1759
1760 if use_internal {
1763 *write_or_recover(&self.history) = result.messages.clone();
1764
1765 if self.auto_save {
1767 if let Err(e) = self.save().await {
1768 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1769 }
1770 }
1771 }
1772
1773 if !self.is_processing_cron.swap(true, Ordering::Relaxed) {
1778 let fires = {
1779 let mut rx = self.cron_rx.lock().await;
1780 let mut fires: Vec<ScheduledFire> = Vec::new();
1781 while let Ok(fire) = rx.try_recv() {
1782 fires.push(fire);
1783 }
1784 fires
1785 };
1786 for fire in fires {
1787 tracing::debug!(
1788 task_id = %fire.task_id,
1789 "Firing scheduled cron task"
1790 );
1791 if let Err(e) = Box::pin(self.send(&fire.prompt, None)).await {
1792 tracing::warn!(
1793 task_id = %fire.task_id,
1794 "Scheduled task failed: {e}"
1795 );
1796 }
1797 }
1798 self.is_processing_cron.store(false, Ordering::Relaxed);
1799 }
1800
1801 self.clear_runtime_tracking().await;
1802
1803 Ok(result)
1804 }
1805
1806 async fn build_btw_runtime_context(&self) -> String {
1807 let mut sections = Vec::new();
1808
1809 let active_tools = {
1810 let tools = self.active_tools.read().await;
1811 let mut items = tools
1812 .iter()
1813 .map(|(tool_id, tool)| {
1814 let elapsed_ms = Self::now_ms().saturating_sub(tool.started_at_ms);
1815 format!(
1816 "- {} [{}] running_for={}ms",
1817 tool.tool_name, tool_id, elapsed_ms
1818 )
1819 })
1820 .collect::<Vec<_>>();
1821 items.sort();
1822 items
1823 };
1824 if !active_tools.is_empty() {
1825 sections.push(format!("[active tools]\n{}", active_tools.join("\n")));
1826 }
1827
1828 if let Some(cm) = &self.config.confirmation_manager {
1829 let pending = cm.pending_confirmations().await;
1830 if !pending.is_empty() {
1831 let mut lines = pending
1832 .into_iter()
1833 .map(
1834 |PendingConfirmationInfo {
1835 tool_id,
1836 tool_name,
1837 args,
1838 remaining_ms,
1839 }| {
1840 let arg_summary = Self::compact_json_value(&args);
1841 if arg_summary.is_empty() {
1842 format!(
1843 "- {} [{}] remaining={}ms",
1844 tool_name, tool_id, remaining_ms
1845 )
1846 } else {
1847 format!(
1848 "- {} [{}] remaining={}ms {}",
1849 tool_name, tool_id, remaining_ms, arg_summary
1850 )
1851 }
1852 },
1853 )
1854 .collect::<Vec<_>>();
1855 lines.sort();
1856 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1857 }
1858 }
1859
1860 if let Some(queue) = &self.command_queue {
1861 let stats = queue.stats().await;
1862 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1863 let mut lines = vec![format!(
1864 "active={}, pending={}, external_pending={}",
1865 stats.total_active, stats.total_pending, stats.external_pending
1866 )];
1867 let mut lanes = stats
1868 .lanes
1869 .into_values()
1870 .filter(|lane| lane.active > 0 || lane.pending > 0)
1871 .map(|lane| {
1872 format!(
1873 "- {:?}: active={}, pending={}, handler={:?}",
1874 lane.lane, lane.active, lane.pending, lane.handler_mode
1875 )
1876 })
1877 .collect::<Vec<_>>();
1878 lanes.sort();
1879 lines.extend(lanes);
1880 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1881 }
1882
1883 let external_tasks = queue.pending_external_tasks().await;
1884 if !external_tasks.is_empty() {
1885 let mut lines = external_tasks
1886 .into_iter()
1887 .take(6)
1888 .map(|task| {
1889 let payload_summary = Self::compact_json_value(&task.payload);
1890 if payload_summary.is_empty() {
1891 format!(
1892 "- {} {:?} remaining={}ms",
1893 task.command_type,
1894 task.lane,
1895 task.remaining_ms()
1896 )
1897 } else {
1898 format!(
1899 "- {} {:?} remaining={}ms {}",
1900 task.command_type,
1901 task.lane,
1902 task.remaining_ms(),
1903 payload_summary
1904 )
1905 }
1906 })
1907 .collect::<Vec<_>>();
1908 lines.sort();
1909 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1910 }
1911 }
1912
1913 if let Some(store) = &self.session_store {
1914 if let Ok(Some(session)) = store.load(&self.session_id).await {
1915 let active_tasks = session
1916 .tasks
1917 .into_iter()
1918 .filter(|task| task.status.is_active())
1919 .take(6)
1920 .map(|task| match task.tool {
1921 Some(tool) if !tool.is_empty() => {
1922 format!("- [{}] {} ({})", task.status, task.content, tool)
1923 }
1924 _ => format!("- [{}] {}", task.status, task.content),
1925 })
1926 .collect::<Vec<_>>();
1927 if !active_tasks.is_empty() {
1928 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1929 }
1930 }
1931 }
1932
1933 sections.join("\n\n")
1934 }
1935
1936 pub async fn btw(&self, question: &str) -> Result<BtwResult> {
1954 self.btw_with_context(question, None).await
1955 }
1956
1957 pub async fn btw_with_context(
1962 &self,
1963 question: &str,
1964 runtime_context: Option<&str>,
1965 ) -> Result<BtwResult> {
1966 let question = question.trim();
1967 if question.is_empty() {
1968 return Err(crate::error::CodeError::Session(
1969 "btw: question cannot be empty".to_string(),
1970 ));
1971 }
1972
1973 let history_snapshot = read_or_recover(&self.history).clone();
1975
1976 let mut messages = history_snapshot;
1978 let mut injected_sections = Vec::new();
1979 let session_runtime = self.build_btw_runtime_context().await;
1980 if !session_runtime.is_empty() {
1981 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
1982 }
1983 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
1984 injected_sections.push(format!("[host runtime context]\n{}", extra));
1985 }
1986 if !injected_sections.is_empty() {
1987 let injected_context = format!(
1988 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
1989 injected_sections.join("\n\n")
1990 );
1991 messages.push(Message::user(&injected_context));
1992 }
1993 messages.push(Message::user(question));
1994
1995 let response = self
1996 .llm_client
1997 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
1998 .await
1999 .map_err(|e| {
2000 crate::error::CodeError::Llm(format!("btw: ephemeral LLM call failed: {e}"))
2001 })?;
2002
2003 Ok(BtwResult {
2004 question: question.to_string(),
2005 answer: response.text(),
2006 usage: response.usage,
2007 })
2008 }
2009
2010 pub async fn send_with_attachments(
2015 &self,
2016 prompt: &str,
2017 attachments: &[crate::llm::Attachment],
2018 history: Option<&[Message]>,
2019 ) -> Result<AgentResult> {
2020 let use_internal = history.is_none();
2024 let mut effective_history = match history {
2025 Some(h) => h.to_vec(),
2026 None => read_or_recover(&self.history).clone(),
2027 };
2028 effective_history.push(Message::user_with_attachments(prompt, attachments));
2029
2030 let agent_loop = self.build_agent_loop();
2031 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2032 let runtime_state = Arc::clone(&self.active_tools);
2033 let runtime_collector = tokio::spawn(async move {
2034 while let Some(event) = runtime_rx.recv().await {
2035 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2036 }
2037 });
2038 let result = agent_loop
2039 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2040 .await?;
2041 let _ = runtime_collector.await;
2042
2043 if use_internal {
2044 *write_or_recover(&self.history) = result.messages.clone();
2045 if self.auto_save {
2046 if let Err(e) = self.save().await {
2047 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2048 }
2049 }
2050 }
2051
2052 self.clear_runtime_tracking().await;
2053
2054 Ok(result)
2055 }
2056
2057 pub async fn stream_with_attachments(
2062 &self,
2063 prompt: &str,
2064 attachments: &[crate::llm::Attachment],
2065 history: Option<&[Message]>,
2066 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2067 let (tx, rx) = mpsc::channel(256);
2068 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2069 let mut effective_history = match history {
2070 Some(h) => h.to_vec(),
2071 None => read_or_recover(&self.history).clone(),
2072 };
2073 effective_history.push(Message::user_with_attachments(prompt, attachments));
2074
2075 let agent_loop = self.build_agent_loop();
2076 let runtime_state = Arc::clone(&self.active_tools);
2077 let forwarder = tokio::spawn(async move {
2078 while let Some(event) = runtime_rx.recv().await {
2079 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2080 if tx.send(event).await.is_err() {
2081 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2084 break;
2085 }
2086 }
2087 });
2088 let handle = tokio::spawn(async move {
2089 let _ = agent_loop
2090 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2091 .await;
2092 });
2093 let active_tools = Arc::clone(&self.active_tools);
2094 let wrapped_handle = tokio::spawn(async move {
2095 let _ = handle.await;
2096 let _ = forwarder.await;
2097 active_tools.write().await.clear();
2098 });
2099
2100 Ok((rx, wrapped_handle))
2101 }
2102
2103 pub async fn stream(
2113 &self,
2114 prompt: &str,
2115 history: Option<&[Message]>,
2116 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2117 self.ensure_cron_started();
2118
2119 if CommandRegistry::is_command(prompt) {
2121 let ctx = self.build_command_context();
2122 let output = self.command_registry().dispatch(prompt, &ctx);
2123 if let Some(output) = output {
2125 let (tx, rx) = mpsc::channel(256);
2126
2127 if let Some(CommandAction::BtwQuery(question)) = output.action {
2129 let llm_client = self.llm_client.clone();
2131 let history_snapshot = read_or_recover(&self.history).clone();
2132 let handle = tokio::spawn(async move {
2133 let mut messages = history_snapshot;
2134 messages.push(Message::user(&question));
2135 match llm_client
2136 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2137 .await
2138 {
2139 Ok(response) => {
2140 let answer = response.text();
2141 let _ = tx
2142 .send(AgentEvent::BtwAnswer {
2143 question: question.clone(),
2144 answer: answer.clone(),
2145 usage: response.usage,
2146 })
2147 .await;
2148 let _ = tx
2149 .send(AgentEvent::End {
2150 text: answer,
2151 usage: crate::llm::TokenUsage::default(),
2152 meta: None,
2153 })
2154 .await;
2155 }
2156 Err(e) => {
2157 let _ = tx
2158 .send(AgentEvent::Error {
2159 message: format!("btw failed: {e}"),
2160 })
2161 .await;
2162 }
2163 }
2164 });
2165 return Ok((rx, handle));
2166 }
2167
2168 let handle = tokio::spawn(async move {
2169 let _ = tx
2170 .send(AgentEvent::TextDelta {
2171 text: output.text.clone(),
2172 })
2173 .await;
2174 let _ = tx
2175 .send(AgentEvent::End {
2176 text: output.text.clone(),
2177 usage: crate::llm::TokenUsage::default(),
2178 meta: None,
2179 })
2180 .await;
2181 });
2182 return Ok((rx, handle));
2183 }
2184 }
2185
2186 let (tx, rx) = mpsc::channel(256);
2187 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2188 let agent_loop = self.build_agent_loop();
2189 let effective_history = match history {
2190 Some(h) => h.to_vec(),
2191 None => read_or_recover(&self.history).clone(),
2192 };
2193 let prompt = prompt.to_string();
2194 let session_id = self.session_id.clone();
2195
2196 let cancel_token = tokio_util::sync::CancellationToken::new();
2197 *self.cancel_token.lock().await = Some(cancel_token.clone());
2198 let token_clone = cancel_token.clone();
2199 let runtime_state = Arc::clone(&self.active_tools);
2200 let forwarder = tokio::spawn(async move {
2201 while let Some(event) = runtime_rx.recv().await {
2202 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2203 if tx.send(event).await.is_err() {
2204 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2207 break;
2208 }
2209 }
2210 });
2211
2212 let handle = tokio::spawn(async move {
2213 let _ = agent_loop
2214 .execute_with_session(
2215 &effective_history,
2216 &prompt,
2217 Some(&session_id),
2218 Some(runtime_tx),
2219 Some(&token_clone),
2220 )
2221 .await;
2222 });
2223
2224 let cancel_token_ref = self.cancel_token.clone();
2226 let active_tools = Arc::clone(&self.active_tools);
2227 let wrapped_handle = tokio::spawn(async move {
2228 let _ = handle.await;
2229 let _ = forwarder.await;
2230 *cancel_token_ref.lock().await = None;
2231 active_tools.write().await.clear();
2232 });
2233
2234 Ok((rx, wrapped_handle))
2235 }
2236
2237 pub async fn cancel(&self) -> bool {
2244 let token = self.cancel_token.lock().await.clone();
2245 if let Some(token) = token {
2246 token.cancel();
2247 tracing::info!(session_id = %self.session_id, "Cancelled ongoing operation");
2248 true
2249 } else {
2250 tracing::debug!(session_id = %self.session_id, "No ongoing operation to cancel");
2251 false
2252 }
2253 }
2254
2255 pub fn history(&self) -> Vec<Message> {
2257 read_or_recover(&self.history).clone()
2258 }
2259
2260 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
2262 self.memory.as_ref()
2263 }
2264
2265 pub fn id(&self) -> &str {
2267 &self.session_id
2268 }
2269
2270 pub fn workspace(&self) -> &std::path::Path {
2272 &self.workspace
2273 }
2274
2275 pub fn init_warning(&self) -> Option<&str> {
2277 self.init_warning.as_deref()
2278 }
2279
2280 pub fn session_id(&self) -> &str {
2282 &self.session_id
2283 }
2284
2285 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
2291 self.tool_executor.definitions()
2292 }
2293
2294 pub fn tool_names(&self) -> Vec<String> {
2300 self.tool_executor
2301 .definitions()
2302 .into_iter()
2303 .map(|t| t.name)
2304 .collect()
2305 }
2306
2307 pub fn task_manager(&self) -> &Arc<TaskManager> {
2316 &self.task_manager
2317 }
2318
2319 pub fn spawn_task(&self, task: crate::task::Task) -> crate::task::TaskId {
2334 self.task_manager.spawn(task)
2335 }
2336
2337 pub fn track_tool_call(&self, tool_name: &str, args_summary: &str, success: bool) {
2341 if let Ok(mut guard) = self.progress_tracker.try_write() {
2342 guard.track_tool_call(tool_name, args_summary, success);
2343 }
2344 }
2345
2346 pub async fn get_progress(&self) -> crate::task::AgentProgress {
2350 self.progress_tracker.read().await.progress()
2351 }
2352
2353 pub fn subscribe_tasks(
2357 &self,
2358 task_id: crate::task::TaskId,
2359 ) -> Option<tokio::sync::broadcast::Receiver<crate::task::manager::TaskEvent>> {
2360 self.task_manager.subscribe(task_id)
2361 }
2362
2363 pub fn subscribe_all_tasks(
2365 &self,
2366 ) -> tokio::sync::broadcast::Receiver<crate::task::manager::TaskEvent> {
2367 self.task_manager.subscribe_all()
2368 }
2369
2370 pub fn register_hook(&self, hook: crate::hooks::Hook) {
2376 self.hook_engine.register(hook);
2377 }
2378
2379 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
2381 self.hook_engine.unregister(hook_id)
2382 }
2383
2384 pub fn register_hook_handler(
2386 &self,
2387 hook_id: &str,
2388 handler: Arc<dyn crate::hooks::HookHandler>,
2389 ) {
2390 self.hook_engine.register_handler(hook_id, handler);
2391 }
2392
2393 pub fn unregister_hook_handler(&self, hook_id: &str) {
2395 self.hook_engine.unregister_handler(hook_id);
2396 }
2397
2398 pub fn hook_count(&self) -> usize {
2400 self.hook_engine.hook_count()
2401 }
2402
2403 pub async fn save(&self) -> Result<()> {
2407 let store = match &self.session_store {
2408 Some(s) => s,
2409 None => return Ok(()),
2410 };
2411
2412 let history = read_or_recover(&self.history).clone();
2413 let now = chrono::Utc::now().timestamp();
2414
2415 let data = crate::store::SessionData {
2416 id: self.session_id.clone(),
2417 config: crate::session::SessionConfig {
2418 name: String::new(),
2419 workspace: self.workspace.display().to_string(),
2420 system_prompt: Some(self.config.prompt_slots.build()),
2421 max_context_length: 200_000,
2422 auto_compact: false,
2423 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
2424 storage_type: crate::config::StorageBackend::File,
2425 queue_config: None,
2426 confirmation_policy: None,
2427 permission_policy: None,
2428 parent_id: None,
2429 security_config: None,
2430 hook_engine: None,
2431 planning_mode: self.config.planning_mode,
2432 goal_tracking: self.config.goal_tracking,
2433 },
2434 state: crate::session::SessionState::Active,
2435 messages: history,
2436 context_usage: crate::session::ContextUsage::default(),
2437 total_usage: crate::llm::TokenUsage::default(),
2438 total_cost: 0.0,
2439 model_name: None,
2440 cost_records: Vec::new(),
2441 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
2442 thinking_enabled: false,
2443 thinking_budget: None,
2444 created_at: now,
2445 updated_at: now,
2446 llm_config: None,
2447 tasks: Vec::new(),
2448 parent_id: None,
2449 };
2450
2451 store.save(&data).await?;
2452 tracing::debug!("Session {} saved", self.session_id);
2453 Ok(())
2454 }
2455
2456 pub async fn read_file(&self, path: &str) -> Result<String> {
2458 let args = serde_json::json!({ "file_path": path });
2459 let result = self.tool_executor.execute("read", &args).await?;
2460 Ok(result.output)
2461 }
2462
2463 pub async fn bash(&self, command: &str) -> Result<String> {
2468 let args = serde_json::json!({ "command": command });
2469 let result = self
2470 .tool_executor
2471 .execute_with_context("bash", &args, &self.tool_context)
2472 .await?;
2473 Ok(result.output)
2474 }
2475
2476 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
2478 let args = serde_json::json!({ "pattern": pattern });
2479 let result = self.tool_executor.execute("glob", &args).await?;
2480 let files: Vec<String> = result
2481 .output
2482 .lines()
2483 .filter(|l| !l.is_empty())
2484 .map(|l| l.to_string())
2485 .collect();
2486 Ok(files)
2487 }
2488
2489 pub async fn grep(&self, pattern: &str) -> Result<String> {
2491 let args = serde_json::json!({ "pattern": pattern });
2492 let result = self.tool_executor.execute("grep", &args).await?;
2493 Ok(result.output)
2494 }
2495
2496 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
2498 let result = self.tool_executor.execute(name, &args).await?;
2499 Ok(ToolCallResult {
2500 name: name.to_string(),
2501 output: result.output,
2502 exit_code: result.exit_code,
2503 metadata: result.metadata,
2504 })
2505 }
2506
2507 pub fn has_queue(&self) -> bool {
2513 self.command_queue.is_some()
2514 }
2515
2516 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
2520 if let Some(ref queue) = self.command_queue {
2521 queue.set_lane_handler(lane, config).await;
2522 }
2523 }
2524
2525 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
2529 if let Some(ref queue) = self.command_queue {
2530 queue.complete_external_task(task_id, result).await
2531 } else {
2532 false
2533 }
2534 }
2535
2536 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
2538 if let Some(ref queue) = self.command_queue {
2539 queue.pending_external_tasks().await
2540 } else {
2541 Vec::new()
2542 }
2543 }
2544
2545 pub async fn queue_stats(&self) -> SessionQueueStats {
2547 if let Some(ref queue) = self.command_queue {
2548 queue.stats().await
2549 } else {
2550 SessionQueueStats::default()
2551 }
2552 }
2553
2554 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
2556 if let Some(ref queue) = self.command_queue {
2557 queue.metrics_snapshot().await
2558 } else {
2559 None
2560 }
2561 }
2562
2563 pub async fn submit(
2569 &self,
2570 lane: SessionLane,
2571 command: Box<dyn crate::queue::SessionCommand>,
2572 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
2573 let queue = self
2574 .command_queue
2575 .as_ref()
2576 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2577 Ok(queue.submit(lane, command).await)
2578 }
2579
2580 pub async fn submit_batch(
2587 &self,
2588 lane: SessionLane,
2589 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
2590 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
2591 {
2592 let queue = self
2593 .command_queue
2594 .as_ref()
2595 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2596 Ok(queue.submit_batch(lane, commands).await)
2597 }
2598
2599 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
2601 if let Some(ref queue) = self.command_queue {
2602 queue.dead_letters().await
2603 } else {
2604 Vec::new()
2605 }
2606 }
2607
2608 pub fn register_agent_dir(&self, dir: &std::path::Path) -> usize {
2621 use crate::subagent::load_agents_from_dir;
2622 let agents = load_agents_from_dir(dir);
2623 let count = agents.len();
2624 for agent in agents {
2625 tracing::info!(
2626 session_id = %self.session_id,
2627 agent = agent.name,
2628 dir = %dir.display(),
2629 "Dynamically registered agent"
2630 );
2631 self.agent_registry.register(agent);
2632 }
2633 count
2634 }
2635
2636 pub async fn add_mcp_server(
2643 &self,
2644 config: crate::mcp::McpServerConfig,
2645 ) -> crate::error::Result<usize> {
2646 let server_name = config.name.clone();
2647 self.mcp_manager.register_server(config).await;
2648 self.mcp_manager.connect(&server_name).await.map_err(|e| {
2649 crate::error::CodeError::Tool {
2650 tool: server_name.clone(),
2651 message: format!("Failed to connect MCP server: {}", e),
2652 }
2653 })?;
2654
2655 let tools = self.mcp_manager.get_server_tools(&server_name).await;
2656 let count = tools.len();
2657
2658 for tool in
2659 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
2660 {
2661 self.tool_executor.register_dynamic_tool(tool);
2662 }
2663
2664 tracing::info!(
2665 session_id = %self.session_id,
2666 server = server_name,
2667 tools = count,
2668 "MCP server added to live session"
2669 );
2670
2671 Ok(count)
2672 }
2673
2674 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
2679 self.tool_executor
2680 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
2681 self.mcp_manager
2682 .disconnect(server_name)
2683 .await
2684 .map_err(|e| crate::error::CodeError::Tool {
2685 tool: server_name.to_string(),
2686 message: format!("Failed to disconnect MCP server: {}", e),
2687 })?;
2688 tracing::info!(
2689 session_id = %self.session_id,
2690 server = server_name,
2691 "MCP server removed from live session"
2692 );
2693 Ok(())
2694 }
2695
2696 pub async fn mcp_status(
2698 &self,
2699 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
2700 self.mcp_manager.get_status().await
2701 }
2702}
2703
2704#[cfg(test)]
2709mod tests {
2710 use super::*;
2711 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
2712 use crate::store::SessionStore;
2713
2714 #[tokio::test]
2715 async fn test_session_submit_no_queue_returns_err() {
2716 let agent = Agent::from_config(test_config()).await.unwrap();
2717 let session = agent.session(".", None).unwrap();
2718 struct Noop;
2719 #[async_trait::async_trait]
2720 impl crate::queue::SessionCommand for Noop {
2721 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2722 Ok(serde_json::json!(null))
2723 }
2724 fn command_type(&self) -> &str {
2725 "noop"
2726 }
2727 }
2728 let result: anyhow::Result<
2729 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
2730 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
2731 assert!(result.is_err());
2732 assert!(result.unwrap_err().to_string().contains("No queue"));
2733 }
2734
2735 #[tokio::test]
2736 async fn test_session_submit_batch_no_queue_returns_err() {
2737 let agent = Agent::from_config(test_config()).await.unwrap();
2738 let session = agent.session(".", None).unwrap();
2739 struct Noop;
2740 #[async_trait::async_trait]
2741 impl crate::queue::SessionCommand for Noop {
2742 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2743 Ok(serde_json::json!(null))
2744 }
2745 fn command_type(&self) -> &str {
2746 "noop"
2747 }
2748 }
2749 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
2750 let result: anyhow::Result<
2751 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
2752 > = session.submit_batch(SessionLane::Query, cmds).await;
2753 assert!(result.is_err());
2754 assert!(result.unwrap_err().to_string().contains("No queue"));
2755 }
2756
2757 fn test_config() -> CodeConfig {
2758 CodeConfig {
2759 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
2760 providers: vec![
2761 ProviderConfig {
2762 name: "anthropic".to_string(),
2763 api_key: Some("test-key".to_string()),
2764 base_url: None,
2765 headers: std::collections::HashMap::new(),
2766 session_id_header: None,
2767 models: vec![ModelConfig {
2768 id: "claude-sonnet-4-20250514".to_string(),
2769 name: "Claude Sonnet 4".to_string(),
2770 family: "claude-sonnet".to_string(),
2771 api_key: None,
2772 base_url: None,
2773 headers: std::collections::HashMap::new(),
2774 session_id_header: None,
2775 attachment: false,
2776 reasoning: false,
2777 tool_call: true,
2778 temperature: true,
2779 release_date: None,
2780 modalities: ModelModalities::default(),
2781 cost: Default::default(),
2782 limit: Default::default(),
2783 }],
2784 },
2785 ProviderConfig {
2786 name: "openai".to_string(),
2787 api_key: Some("test-openai-key".to_string()),
2788 base_url: None,
2789 headers: std::collections::HashMap::new(),
2790 session_id_header: None,
2791 models: vec![ModelConfig {
2792 id: "gpt-4o".to_string(),
2793 name: "GPT-4o".to_string(),
2794 family: "gpt-4".to_string(),
2795 api_key: None,
2796 base_url: None,
2797 headers: std::collections::HashMap::new(),
2798 session_id_header: None,
2799 attachment: false,
2800 reasoning: false,
2801 tool_call: true,
2802 temperature: true,
2803 release_date: None,
2804 modalities: ModelModalities::default(),
2805 cost: Default::default(),
2806 limit: Default::default(),
2807 }],
2808 },
2809 ],
2810 ..Default::default()
2811 }
2812 }
2813
2814 fn build_effective_registry_for_test(
2815 agent_registry: Option<Arc<crate::skills::SkillRegistry>>,
2816 opts: &SessionOptions,
2817 ) -> Arc<crate::skills::SkillRegistry> {
2818 let base_registry = agent_registry
2819 .as_deref()
2820 .map(|r| r.fork())
2821 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
2822 if let Some(ref r) = opts.skill_registry {
2823 for skill in r.all() {
2824 base_registry.register_unchecked(skill);
2825 }
2826 }
2827 for dir in &opts.skill_dirs {
2828 if let Err(e) = base_registry.load_from_dir(dir) {
2829 tracing::warn!(
2830 dir = %dir.display(),
2831 error = %e,
2832 "Failed to load session skill dir — skipping"
2833 );
2834 }
2835 }
2836 Arc::new(base_registry)
2837 }
2838
2839 #[tokio::test]
2840 async fn test_from_config() {
2841 let agent = Agent::from_config(test_config()).await;
2842 assert!(agent.is_ok());
2843 }
2844
2845 #[tokio::test]
2846 async fn test_session_default() {
2847 let agent = Agent::from_config(test_config()).await.unwrap();
2848 let session = agent.session("/tmp/test-workspace", None);
2849 assert!(session.is_ok());
2850 let debug = format!("{:?}", session.unwrap());
2851 assert!(debug.contains("AgentSession"));
2852 }
2853
2854 #[tokio::test]
2855 async fn test_session_registers_agentic_tools_by_default() {
2856 let agent = Agent::from_config(test_config()).await.unwrap();
2859 let _session = agent.session("/tmp/test-workspace", None).unwrap();
2860 }
2862
2863 #[tokio::test]
2864 async fn test_session_can_disable_agentic_tools_via_config() {
2865 let mut config = test_config();
2868 config.agentic_search = Some(crate::config::AgenticSearchConfig {
2869 enabled: false,
2870 ..Default::default()
2871 });
2872 config.agentic_parse = Some(crate::config::AgenticParseConfig {
2873 enabled: false,
2874 ..Default::default()
2875 });
2876
2877 let agent = Agent::from_config(config).await.unwrap();
2878 let _session = agent.session("/tmp/test-workspace", None).unwrap();
2879 }
2881
2882 #[tokio::test]
2883 async fn test_session_with_model_override() {
2884 let agent = Agent::from_config(test_config()).await.unwrap();
2885 let opts = SessionOptions::new().with_model("openai/gpt-4o");
2886 let session = agent.session("/tmp/test-workspace", Some(opts));
2887 assert!(session.is_ok());
2888 }
2889
2890 #[tokio::test]
2891 async fn test_session_with_invalid_model_format() {
2892 let agent = Agent::from_config(test_config()).await.unwrap();
2893 let opts = SessionOptions::new().with_model("gpt-4o");
2894 let session = agent.session("/tmp/test-workspace", Some(opts));
2895 assert!(session.is_err());
2896 }
2897
2898 #[tokio::test]
2899 async fn test_session_with_model_not_found() {
2900 let agent = Agent::from_config(test_config()).await.unwrap();
2901 let opts = SessionOptions::new().with_model("openai/nonexistent");
2902 let session = agent.session("/tmp/test-workspace", Some(opts));
2903 assert!(session.is_err());
2904 }
2905
2906 #[tokio::test]
2907 async fn test_session_preserves_skill_scorer_from_agent_registry() {
2908 use crate::skills::feedback::{
2909 DefaultSkillScorer, SkillFeedback, SkillOutcome, SkillScorer,
2910 };
2911 use crate::skills::{Skill, SkillKind, SkillRegistry};
2912
2913 let registry = Arc::new(SkillRegistry::new());
2914 let scorer = Arc::new(DefaultSkillScorer::default());
2915 registry.set_scorer(scorer.clone());
2916
2917 registry.register_unchecked(Arc::new(Skill {
2918 name: "healthy-skill".to_string(),
2919 description: "healthy".to_string(),
2920 allowed_tools: None,
2921 disable_model_invocation: false,
2922 kind: SkillKind::Instruction,
2923 content: "healthy".to_string(),
2924 tags: vec![],
2925 version: None,
2926 }));
2927 registry.register_unchecked(Arc::new(Skill {
2928 name: "disabled-skill".to_string(),
2929 description: "disabled".to_string(),
2930 allowed_tools: None,
2931 disable_model_invocation: false,
2932 kind: SkillKind::Instruction,
2933 content: "disabled".to_string(),
2934 tags: vec![],
2935 version: None,
2936 }));
2937
2938 for _ in 0..5 {
2939 scorer.record(SkillFeedback {
2940 skill_name: "disabled-skill".to_string(),
2941 outcome: SkillOutcome::Failure,
2942 score_delta: -1.0,
2943 reason: "bad".to_string(),
2944 timestamp: 0,
2945 });
2946 }
2947
2948 let effective_registry =
2949 build_effective_registry_for_test(Some(registry), &SessionOptions::new());
2950 let prompt = effective_registry.to_system_prompt();
2951
2952 assert!(prompt.contains("healthy-skill"));
2953 assert!(!prompt.contains("disabled-skill"));
2954 }
2955
2956 #[tokio::test]
2957 async fn test_session_skill_dirs_preserve_agent_registry_validator() {
2958 use crate::skills::validator::DefaultSkillValidator;
2959 use crate::skills::SkillRegistry;
2960
2961 let registry = Arc::new(SkillRegistry::new());
2962 registry.set_validator(Arc::new(DefaultSkillValidator::default()));
2963
2964 let temp_dir = tempfile::tempdir().unwrap();
2965 let invalid_skill = temp_dir.path().join("invalid.md");
2966 std::fs::write(
2967 &invalid_skill,
2968 r#"---
2969name: BadName
2970description: "invalid skill name"
2971kind: instruction
2972---
2973# Invalid Skill
2974"#,
2975 )
2976 .unwrap();
2977
2978 let opts = SessionOptions::new().with_skill_dirs([temp_dir.path()]);
2979 let effective_registry = build_effective_registry_for_test(Some(registry), &opts);
2980 assert!(effective_registry.get("BadName").is_none());
2981 }
2982
2983 #[tokio::test]
2984 async fn test_session_skill_registry_overrides_agent_registry_without_polluting_parent() {
2985 use crate::skills::{Skill, SkillKind, SkillRegistry};
2986
2987 let registry = Arc::new(SkillRegistry::new());
2988 registry.register_unchecked(Arc::new(Skill {
2989 name: "shared-skill".to_string(),
2990 description: "agent level".to_string(),
2991 allowed_tools: None,
2992 disable_model_invocation: false,
2993 kind: SkillKind::Instruction,
2994 content: "agent content".to_string(),
2995 tags: vec![],
2996 version: None,
2997 }));
2998
2999 let session_registry = Arc::new(SkillRegistry::new());
3000 session_registry.register_unchecked(Arc::new(Skill {
3001 name: "shared-skill".to_string(),
3002 description: "session level".to_string(),
3003 allowed_tools: None,
3004 disable_model_invocation: false,
3005 kind: SkillKind::Instruction,
3006 content: "session content".to_string(),
3007 tags: vec![],
3008 version: None,
3009 }));
3010
3011 let opts = SessionOptions::new().with_skill_registry(session_registry);
3012 let effective_registry = build_effective_registry_for_test(Some(registry.clone()), &opts);
3013
3014 assert_eq!(
3015 effective_registry.get("shared-skill").unwrap().content,
3016 "session content"
3017 );
3018 assert_eq!(
3019 registry.get("shared-skill").unwrap().content,
3020 "agent content"
3021 );
3022 }
3023
3024 #[tokio::test]
3025 async fn test_session_skill_dirs_override_session_registry_and_skip_invalid_entries() {
3026 use crate::skills::{Skill, SkillKind, SkillRegistry};
3027
3028 let session_registry = Arc::new(SkillRegistry::new());
3029 session_registry.register_unchecked(Arc::new(Skill {
3030 name: "shared-skill".to_string(),
3031 description: "session registry".to_string(),
3032 allowed_tools: None,
3033 disable_model_invocation: false,
3034 kind: SkillKind::Instruction,
3035 content: "registry content".to_string(),
3036 tags: vec![],
3037 version: None,
3038 }));
3039
3040 let temp_dir = tempfile::tempdir().unwrap();
3041 std::fs::write(
3042 temp_dir.path().join("shared.md"),
3043 r#"---
3044name: shared-skill
3045description: "skill dir override"
3046kind: instruction
3047---
3048# Shared Skill
3049dir content
3050"#,
3051 )
3052 .unwrap();
3053 std::fs::write(temp_dir.path().join("README.md"), "# not a skill").unwrap();
3054
3055 let opts = SessionOptions::new()
3056 .with_skill_registry(session_registry)
3057 .with_skill_dirs([temp_dir.path()]);
3058 let effective_registry = build_effective_registry_for_test(None, &opts);
3059
3060 assert_eq!(
3061 effective_registry.get("shared-skill").unwrap().description,
3062 "skill dir override"
3063 );
3064 assert!(effective_registry.get("README").is_none());
3065 }
3066
3067 #[tokio::test]
3068 async fn test_session_plugin_skills_are_loaded_into_session_registry_only() {
3069 use crate::plugin::{Plugin, PluginContext};
3070 use crate::skills::{Skill, SkillKind, SkillRegistry};
3071 use crate::tools::ToolRegistry;
3072
3073 struct SessionOnlySkillPlugin;
3074
3075 impl Plugin for SessionOnlySkillPlugin {
3076 fn name(&self) -> &str {
3077 "session-only-skill"
3078 }
3079
3080 fn version(&self) -> &str {
3081 "0.1.0"
3082 }
3083
3084 fn tool_names(&self) -> &[&str] {
3085 &[]
3086 }
3087
3088 fn load(
3089 &self,
3090 _registry: &Arc<ToolRegistry>,
3091 _ctx: &PluginContext,
3092 ) -> anyhow::Result<()> {
3093 Ok(())
3094 }
3095
3096 fn skills(&self) -> Vec<Arc<Skill>> {
3097 vec![Arc::new(Skill {
3098 name: "plugin-session-skill".to_string(),
3099 description: "plugin skill".to_string(),
3100 allowed_tools: None,
3101 disable_model_invocation: false,
3102 kind: SkillKind::Instruction,
3103 content: "plugin content".to_string(),
3104 tags: vec!["plugin".to_string()],
3105 version: None,
3106 })]
3107 }
3108 }
3109
3110 let mut agent = Agent::from_config(test_config()).await.unwrap();
3111 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3112 agent.config.skill_registry = Some(Arc::clone(&agent_registry));
3113
3114 let opts = SessionOptions::new().with_plugin(SessionOnlySkillPlugin);
3115 let session = agent.session("/tmp/test-workspace", Some(opts)).unwrap();
3116
3117 let session_registry = session.config.skill_registry.as_ref().unwrap();
3118 assert!(session_registry.get("plugin-session-skill").is_some());
3119 assert!(agent_registry.get("plugin-session-skill").is_none());
3120 }
3121
3122 #[tokio::test]
3123 async fn test_session_specific_skills_do_not_leak_across_sessions() {
3124 use crate::skills::{Skill, SkillKind, SkillRegistry};
3125
3126 let mut agent = Agent::from_config(test_config()).await.unwrap();
3127 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3128 agent.config.skill_registry = Some(agent_registry);
3129
3130 let session_registry = Arc::new(SkillRegistry::new());
3131 session_registry.register_unchecked(Arc::new(Skill {
3132 name: "session-only".to_string(),
3133 description: "only for first session".to_string(),
3134 allowed_tools: None,
3135 disable_model_invocation: false,
3136 kind: SkillKind::Instruction,
3137 content: "session one".to_string(),
3138 tags: vec![],
3139 version: None,
3140 }));
3141
3142 let session_one = agent
3143 .session(
3144 "/tmp/test-workspace",
3145 Some(SessionOptions::new().with_skill_registry(session_registry)),
3146 )
3147 .unwrap();
3148 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3149
3150 assert!(session_one
3151 .config
3152 .skill_registry
3153 .as_ref()
3154 .unwrap()
3155 .get("session-only")
3156 .is_some());
3157 assert!(session_two
3158 .config
3159 .skill_registry
3160 .as_ref()
3161 .unwrap()
3162 .get("session-only")
3163 .is_none());
3164 }
3165
3166 #[tokio::test]
3167 async fn test_plugin_skills_do_not_leak_across_sessions() {
3168 use crate::plugin::{Plugin, PluginContext};
3169 use crate::skills::{Skill, SkillKind, SkillRegistry};
3170 use crate::tools::ToolRegistry;
3171
3172 struct LeakyPlugin;
3173
3174 impl Plugin for LeakyPlugin {
3175 fn name(&self) -> &str {
3176 "leaky-plugin"
3177 }
3178
3179 fn version(&self) -> &str {
3180 "0.1.0"
3181 }
3182
3183 fn tool_names(&self) -> &[&str] {
3184 &[]
3185 }
3186
3187 fn load(
3188 &self,
3189 _registry: &Arc<ToolRegistry>,
3190 _ctx: &PluginContext,
3191 ) -> anyhow::Result<()> {
3192 Ok(())
3193 }
3194
3195 fn skills(&self) -> Vec<Arc<Skill>> {
3196 vec![Arc::new(Skill {
3197 name: "plugin-only".to_string(),
3198 description: "plugin only".to_string(),
3199 allowed_tools: None,
3200 disable_model_invocation: false,
3201 kind: SkillKind::Instruction,
3202 content: "plugin skill".to_string(),
3203 tags: vec![],
3204 version: None,
3205 })]
3206 }
3207 }
3208
3209 let mut agent = Agent::from_config(test_config()).await.unwrap();
3210 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3211
3212 let session_one = agent
3213 .session(
3214 "/tmp/test-workspace",
3215 Some(SessionOptions::new().with_plugin(LeakyPlugin)),
3216 )
3217 .unwrap();
3218 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3219
3220 assert!(session_one
3221 .config
3222 .skill_registry
3223 .as_ref()
3224 .unwrap()
3225 .get("plugin-only")
3226 .is_some());
3227 assert!(session_two
3228 .config
3229 .skill_registry
3230 .as_ref()
3231 .unwrap()
3232 .get("plugin-only")
3233 .is_none());
3234 }
3235
3236 #[tokio::test]
3237 async fn test_session_for_agent_applies_definition_and_keeps_skill_overrides_isolated() {
3238 use crate::skills::{Skill, SkillKind, SkillRegistry};
3239 use crate::subagent::AgentDefinition;
3240
3241 let mut agent = Agent::from_config(test_config()).await.unwrap();
3242 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3243
3244 let definition = AgentDefinition::new("reviewer", "Review code")
3245 .with_prompt("Agent definition prompt")
3246 .with_max_steps(7);
3247
3248 let session_registry = Arc::new(SkillRegistry::new());
3249 session_registry.register_unchecked(Arc::new(Skill {
3250 name: "agent-session-skill".to_string(),
3251 description: "agent session only".to_string(),
3252 allowed_tools: None,
3253 disable_model_invocation: false,
3254 kind: SkillKind::Instruction,
3255 content: "agent session content".to_string(),
3256 tags: vec![],
3257 version: None,
3258 }));
3259
3260 let session_one = agent
3261 .session_for_agent(
3262 "/tmp/test-workspace",
3263 &definition,
3264 Some(SessionOptions::new().with_skill_registry(session_registry)),
3265 )
3266 .unwrap();
3267 let session_two = agent
3268 .session_for_agent("/tmp/test-workspace", &definition, None)
3269 .unwrap();
3270
3271 assert_eq!(session_one.config.max_tool_rounds, 7);
3272 let extra = session_one.config.prompt_slots.extra.as_deref().unwrap();
3273 assert!(extra.contains("Agent definition prompt"));
3274 assert!(extra.contains("agent-session-skill"));
3275 assert!(session_one
3276 .config
3277 .skill_registry
3278 .as_ref()
3279 .unwrap()
3280 .get("agent-session-skill")
3281 .is_some());
3282 assert!(session_two
3283 .config
3284 .skill_registry
3285 .as_ref()
3286 .unwrap()
3287 .get("agent-session-skill")
3288 .is_none());
3289 }
3290
3291 #[tokio::test]
3292 async fn test_session_for_agent_preserves_existing_prompt_slots_when_injecting_definition_prompt(
3293 ) {
3294 use crate::prompts::SystemPromptSlots;
3295 use crate::subagent::AgentDefinition;
3296
3297 let agent = Agent::from_config(test_config()).await.unwrap();
3298 let definition = AgentDefinition::new("planner", "Plan work")
3299 .with_prompt("Definition extra prompt")
3300 .with_max_steps(3);
3301
3302 let opts = SessionOptions::new().with_prompt_slots(SystemPromptSlots {
3303 style: None,
3304 role: Some("Custom role".to_string()),
3305 guidelines: None,
3306 response_style: None,
3307 extra: None,
3308 });
3309
3310 let session = agent
3311 .session_for_agent("/tmp/test-workspace", &definition, Some(opts))
3312 .unwrap();
3313
3314 assert_eq!(
3315 session.config.prompt_slots.role.as_deref(),
3316 Some("Custom role")
3317 );
3318 assert!(session
3319 .config
3320 .prompt_slots
3321 .extra
3322 .as_deref()
3323 .unwrap()
3324 .contains("Definition extra prompt"));
3325 assert_eq!(session.config.max_tool_rounds, 3);
3326 }
3327
3328 #[tokio::test]
3329 async fn test_new_with_acl_string() {
3330 let acl = r#"
3331 default_model "anthropic/claude-sonnet-4-20250514"
3332 providers "anthropic" {
3333 apiKey = "test-key"
3334 models "claude-sonnet-4-20250514" {
3335 name = "Claude Sonnet 4"
3336 }
3337 }
3338 "#;
3339 let agent = Agent::new(acl).await;
3340 assert!(agent.is_ok());
3341 }
3342
3343 #[tokio::test]
3344 async fn test_create_alias_acl() {
3345 let acl = r#"
3346 default_model "anthropic/claude-sonnet-4-20250514"
3347 providers "anthropic" {
3348 apiKey = "test-key"
3349 models "claude-sonnet-4-20250514" {
3350 name = "Claude Sonnet 4"
3351 }
3352 }
3353 "#;
3354 let agent = Agent::create(acl).await;
3355 assert!(agent.is_ok());
3356 }
3357
3358 #[tokio::test]
3359 async fn test_create_and_new_produce_same_result() {
3360 let acl = r#"
3361 default_model "anthropic/claude-sonnet-4-20250514"
3362 providers "anthropic" {
3363 apiKey = "test-key"
3364 models "claude-sonnet-4-20250514" {
3365 name = "Claude Sonnet 4"
3366 }
3367 }
3368 "#;
3369 let agent_new = Agent::new(acl).await;
3370 let agent_create = Agent::create(acl).await;
3371 assert!(agent_new.is_ok());
3372 assert!(agent_create.is_ok());
3373
3374 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
3376 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
3377 assert!(session_new.is_ok());
3378 assert!(session_create.is_ok());
3379 }
3380
3381 #[tokio::test]
3382 async fn test_new_with_existing_hcl_file_uses_file_loading() {
3383 let temp_dir = tempfile::tempdir().unwrap();
3384 let config_path = temp_dir.path().join("agent.hcl");
3385 std::fs::write(&config_path, "this is not valid hcl").unwrap();
3386
3387 let err = Agent::new(config_path.display().to_string())
3388 .await
3389 .unwrap_err();
3390 let msg = err.to_string();
3391
3392 assert!(msg.contains("Failed to load config"));
3393 assert!(msg.contains("agent.hcl"));
3394 assert!(!msg.contains("Failed to parse config as HCL string"));
3395 }
3396
3397 #[tokio::test]
3398 async fn test_new_with_missing_hcl_file_reports_not_found() {
3399 let temp_dir = tempfile::tempdir().unwrap();
3400 let missing_path = temp_dir.path().join("agent.hcl");
3401
3402 let err = Agent::new(missing_path.display().to_string())
3403 .await
3404 .unwrap_err();
3405 let msg = err.to_string();
3406
3407 assert!(msg.contains("Config file not found"));
3408 assert!(msg.contains("agent.hcl"));
3409 assert!(!msg.contains("Failed to parse config as HCL string"));
3410 }
3411
3412 #[test]
3413 fn test_from_config_requires_default_model() {
3414 let rt = tokio::runtime::Runtime::new().unwrap();
3415 let config = CodeConfig {
3416 providers: vec![ProviderConfig {
3417 name: "anthropic".to_string(),
3418 api_key: Some("test-key".to_string()),
3419 base_url: None,
3420 headers: std::collections::HashMap::new(),
3421 session_id_header: None,
3422 models: vec![],
3423 }],
3424 ..Default::default()
3425 };
3426 let result = rt.block_on(Agent::from_config(config));
3427 assert!(result.is_err());
3428 }
3429
3430 #[tokio::test]
3431 async fn test_history_empty_on_new_session() {
3432 let agent = Agent::from_config(test_config()).await.unwrap();
3433 let session = agent.session("/tmp/test-workspace", None).unwrap();
3434 assert!(session.history().is_empty());
3435 }
3436
3437 #[tokio::test]
3438 async fn test_session_options_with_agent_dir() {
3439 let opts = SessionOptions::new()
3440 .with_agent_dir("/tmp/agents")
3441 .with_agent_dir("/tmp/more-agents");
3442 assert_eq!(opts.agent_dirs.len(), 2);
3443 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
3444 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
3445 }
3446
3447 #[test]
3452 fn test_session_options_with_queue_config() {
3453 let qc = SessionQueueConfig::default().with_lane_features();
3454 let opts = SessionOptions::new().with_queue_config(qc.clone());
3455 assert!(opts.queue_config.is_some());
3456
3457 let config = opts.queue_config.unwrap();
3458 assert!(config.enable_dlq);
3459 assert!(config.enable_metrics);
3460 assert!(config.enable_alerts);
3461 assert_eq!(config.default_timeout_ms, Some(60_000));
3462 }
3463
3464 #[tokio::test(flavor = "multi_thread")]
3465 async fn test_session_with_queue_config() {
3466 let agent = Agent::from_config(test_config()).await.unwrap();
3467 let qc = SessionQueueConfig::default();
3468 let opts = SessionOptions::new().with_queue_config(qc);
3469 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
3470 assert!(session.is_ok());
3471 let session = session.unwrap();
3472 assert!(session.has_queue());
3473 }
3474
3475 #[tokio::test]
3476 async fn test_session_without_queue_config() {
3477 let agent = Agent::from_config(test_config()).await.unwrap();
3478 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
3479 assert!(!session.has_queue());
3480 }
3481
3482 #[tokio::test]
3483 async fn test_session_queue_stats_without_queue() {
3484 let agent = Agent::from_config(test_config()).await.unwrap();
3485 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
3486 let stats = session.queue_stats().await;
3487 assert_eq!(stats.total_pending, 0);
3489 assert_eq!(stats.total_active, 0);
3490 }
3491
3492 #[tokio::test(flavor = "multi_thread")]
3493 async fn test_session_queue_stats_with_queue() {
3494 let agent = Agent::from_config(test_config()).await.unwrap();
3495 let qc = SessionQueueConfig::default();
3496 let opts = SessionOptions::new().with_queue_config(qc);
3497 let session = agent
3498 .session("/tmp/test-workspace-qstats", Some(opts))
3499 .unwrap();
3500 let stats = session.queue_stats().await;
3501 assert_eq!(stats.total_pending, 0);
3503 assert_eq!(stats.total_active, 0);
3504 }
3505
3506 #[tokio::test(flavor = "multi_thread")]
3507 async fn test_session_pending_external_tasks_empty() {
3508 let agent = Agent::from_config(test_config()).await.unwrap();
3509 let qc = SessionQueueConfig::default();
3510 let opts = SessionOptions::new().with_queue_config(qc);
3511 let session = agent
3512 .session("/tmp/test-workspace-ext", Some(opts))
3513 .unwrap();
3514 let tasks = session.pending_external_tasks().await;
3515 assert!(tasks.is_empty());
3516 }
3517
3518 #[tokio::test(flavor = "multi_thread")]
3519 async fn test_session_dead_letters_empty() {
3520 let agent = Agent::from_config(test_config()).await.unwrap();
3521 let qc = SessionQueueConfig::default().with_dlq(Some(100));
3522 let opts = SessionOptions::new().with_queue_config(qc);
3523 let session = agent
3524 .session("/tmp/test-workspace-dlq", Some(opts))
3525 .unwrap();
3526 let dead = session.dead_letters().await;
3527 assert!(dead.is_empty());
3528 }
3529
3530 #[tokio::test(flavor = "multi_thread")]
3531 async fn test_session_queue_metrics_disabled() {
3532 let agent = Agent::from_config(test_config()).await.unwrap();
3533 let qc = SessionQueueConfig::default();
3535 let opts = SessionOptions::new().with_queue_config(qc);
3536 let session = agent
3537 .session("/tmp/test-workspace-nomet", Some(opts))
3538 .unwrap();
3539 let metrics = session.queue_metrics().await;
3540 assert!(metrics.is_none());
3541 }
3542
3543 #[tokio::test(flavor = "multi_thread")]
3544 async fn test_session_queue_metrics_enabled() {
3545 let agent = Agent::from_config(test_config()).await.unwrap();
3546 let qc = SessionQueueConfig::default().with_metrics();
3547 let opts = SessionOptions::new().with_queue_config(qc);
3548 let session = agent
3549 .session("/tmp/test-workspace-met", Some(opts))
3550 .unwrap();
3551 let metrics = session.queue_metrics().await;
3552 assert!(metrics.is_some());
3553 }
3554
3555 #[tokio::test(flavor = "multi_thread")]
3556 async fn test_session_set_lane_handler() {
3557 let agent = Agent::from_config(test_config()).await.unwrap();
3558 let qc = SessionQueueConfig::default();
3559 let opts = SessionOptions::new().with_queue_config(qc);
3560 let session = agent
3561 .session("/tmp/test-workspace-handler", Some(opts))
3562 .unwrap();
3563
3564 session
3566 .set_lane_handler(
3567 SessionLane::Execute,
3568 LaneHandlerConfig {
3569 mode: crate::queue::TaskHandlerMode::External,
3570 timeout_ms: 30_000,
3571 },
3572 )
3573 .await;
3574
3575 }
3578
3579 #[tokio::test(flavor = "multi_thread")]
3584 async fn test_session_has_id() {
3585 let agent = Agent::from_config(test_config()).await.unwrap();
3586 let session = agent.session("/tmp/test-ws-id", None).unwrap();
3587 assert!(!session.session_id().is_empty());
3589 assert_eq!(session.session_id().len(), 36); }
3591
3592 #[tokio::test(flavor = "multi_thread")]
3593 async fn test_session_explicit_id() {
3594 let agent = Agent::from_config(test_config()).await.unwrap();
3595 let opts = SessionOptions::new().with_session_id("my-session-42");
3596 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
3597 assert_eq!(session.session_id(), "my-session-42");
3598 }
3599
3600 #[tokio::test(flavor = "multi_thread")]
3601 async fn test_session_save_no_store() {
3602 let agent = Agent::from_config(test_config()).await.unwrap();
3603 let session = agent.session("/tmp/test-ws-save", None).unwrap();
3604 session.save().await.unwrap();
3606 }
3607
3608 #[tokio::test(flavor = "multi_thread")]
3609 async fn test_session_save_and_load() {
3610 let store = Arc::new(crate::store::MemorySessionStore::new());
3611 let agent = Agent::from_config(test_config()).await.unwrap();
3612
3613 let opts = SessionOptions::new()
3614 .with_session_store(store.clone())
3615 .with_session_id("persist-test");
3616 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
3617
3618 session.save().await.unwrap();
3620
3621 assert!(store.exists("persist-test").await.unwrap());
3623
3624 let data = store.load("persist-test").await.unwrap().unwrap();
3625 assert_eq!(data.id, "persist-test");
3626 assert!(data.messages.is_empty());
3627 }
3628
3629 #[tokio::test(flavor = "multi_thread")]
3630 async fn test_session_save_with_history() {
3631 let store = Arc::new(crate::store::MemorySessionStore::new());
3632 let agent = Agent::from_config(test_config()).await.unwrap();
3633
3634 let opts = SessionOptions::new()
3635 .with_session_store(store.clone())
3636 .with_session_id("history-test");
3637 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
3638
3639 {
3641 let mut h = session.history.write().unwrap();
3642 h.push(Message::user("Hello"));
3643 h.push(Message::user("How are you?"));
3644 }
3645
3646 session.save().await.unwrap();
3647
3648 let data = store.load("history-test").await.unwrap().unwrap();
3649 assert_eq!(data.messages.len(), 2);
3650 }
3651
3652 #[tokio::test(flavor = "multi_thread")]
3653 async fn test_resume_session() {
3654 let store = Arc::new(crate::store::MemorySessionStore::new());
3655 let agent = Agent::from_config(test_config()).await.unwrap();
3656
3657 let opts = SessionOptions::new()
3659 .with_session_store(store.clone())
3660 .with_session_id("resume-test");
3661 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
3662 {
3663 let mut h = session.history.write().unwrap();
3664 h.push(Message::user("What is Rust?"));
3665 h.push(Message::user("Tell me more"));
3666 }
3667 session.save().await.unwrap();
3668
3669 let opts2 = SessionOptions::new().with_session_store(store.clone());
3671 let resumed = agent.resume_session("resume-test", opts2).unwrap();
3672
3673 assert_eq!(resumed.session_id(), "resume-test");
3674 let history = resumed.history();
3675 assert_eq!(history.len(), 2);
3676 assert_eq!(history[0].text(), "What is Rust?");
3677 }
3678
3679 #[tokio::test(flavor = "multi_thread")]
3680 async fn test_resume_session_not_found() {
3681 let store = Arc::new(crate::store::MemorySessionStore::new());
3682 let agent = Agent::from_config(test_config()).await.unwrap();
3683
3684 let opts = SessionOptions::new().with_session_store(store.clone());
3685 let result = agent.resume_session("nonexistent", opts);
3686 assert!(result.is_err());
3687 assert!(result.unwrap_err().to_string().contains("not found"));
3688 }
3689
3690 #[tokio::test(flavor = "multi_thread")]
3691 async fn test_resume_session_no_store() {
3692 let agent = Agent::from_config(test_config()).await.unwrap();
3693 let opts = SessionOptions::new();
3694 let result = agent.resume_session("any-id", opts);
3695 assert!(result.is_err());
3696 assert!(result.unwrap_err().to_string().contains("session_store"));
3697 }
3698
3699 #[tokio::test(flavor = "multi_thread")]
3700 async fn test_file_session_store_persistence() {
3701 let dir = tempfile::TempDir::new().unwrap();
3702 let store = Arc::new(
3703 crate::store::FileSessionStore::new(dir.path())
3704 .await
3705 .unwrap(),
3706 );
3707 let agent = Agent::from_config(test_config()).await.unwrap();
3708
3709 let opts = SessionOptions::new()
3711 .with_session_store(store.clone())
3712 .with_session_id("file-persist");
3713 let session = agent
3714 .session("/tmp/test-ws-file-persist", Some(opts))
3715 .unwrap();
3716 {
3717 let mut h = session.history.write().unwrap();
3718 h.push(Message::user("test message"));
3719 }
3720 session.save().await.unwrap();
3721
3722 let store2 = Arc::new(
3724 crate::store::FileSessionStore::new(dir.path())
3725 .await
3726 .unwrap(),
3727 );
3728 let data = store2.load("file-persist").await.unwrap().unwrap();
3729 assert_eq!(data.messages.len(), 1);
3730 }
3731
3732 #[tokio::test(flavor = "multi_thread")]
3733 async fn test_session_options_builders() {
3734 let opts = SessionOptions::new()
3735 .with_session_id("test-id")
3736 .with_auto_save(true);
3737 assert_eq!(opts.session_id, Some("test-id".to_string()));
3738 assert!(opts.auto_save);
3739 }
3740
3741 #[test]
3746 fn test_session_options_with_sandbox_sets_config() {
3747 use crate::sandbox::SandboxConfig;
3748 let cfg = SandboxConfig {
3749 image: "ubuntu:22.04".into(),
3750 memory_mb: 1024,
3751 ..SandboxConfig::default()
3752 };
3753 let opts = SessionOptions::new().with_sandbox(cfg);
3754 assert!(opts.sandbox_config.is_some());
3755 let sc = opts.sandbox_config.unwrap();
3756 assert_eq!(sc.image, "ubuntu:22.04");
3757 assert_eq!(sc.memory_mb, 1024);
3758 }
3759
3760 #[test]
3761 fn test_session_options_default_has_no_sandbox() {
3762 let opts = SessionOptions::default();
3763 assert!(opts.sandbox_config.is_none());
3764 }
3765
3766 #[tokio::test]
3767 async fn test_session_debug_includes_sandbox_config() {
3768 use crate::sandbox::SandboxConfig;
3769 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
3770 let debug = format!("{:?}", opts);
3771 assert!(debug.contains("sandbox_config"));
3772 }
3773
3774 #[tokio::test]
3775 async fn test_session_build_with_sandbox_config_no_feature_warn() {
3776 let agent = Agent::from_config(test_config()).await.unwrap();
3779 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
3780 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
3782 assert!(session.is_ok());
3783 }
3784
3785 #[tokio::test(flavor = "multi_thread")]
3790 async fn test_session_with_memory_store() {
3791 use a3s_memory::InMemoryStore;
3792 let store = Arc::new(InMemoryStore::new());
3793 let agent = Agent::from_config(test_config()).await.unwrap();
3794 let opts = SessionOptions::new().with_memory(store);
3795 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
3796 assert!(session.memory().is_some());
3797 }
3798
3799 #[tokio::test(flavor = "multi_thread")]
3800 async fn test_session_without_memory_store() {
3801 let agent = Agent::from_config(test_config()).await.unwrap();
3802 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
3803 assert!(session.memory().is_none());
3804 }
3805
3806 #[tokio::test(flavor = "multi_thread")]
3807 async fn test_session_memory_wired_into_config() {
3808 use a3s_memory::InMemoryStore;
3809 let store = Arc::new(InMemoryStore::new());
3810 let agent = Agent::from_config(test_config()).await.unwrap();
3811 let opts = SessionOptions::new().with_memory(store);
3812 let session = agent
3813 .session("/tmp/test-ws-mem-config", Some(opts))
3814 .unwrap();
3815 assert!(session.memory().is_some());
3817 }
3818
3819 #[tokio::test(flavor = "multi_thread")]
3820 async fn test_session_with_file_memory() {
3821 let dir = tempfile::TempDir::new().unwrap();
3822 let agent = Agent::from_config(test_config()).await.unwrap();
3823 let opts = SessionOptions::new().with_file_memory(dir.path());
3824 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
3825 assert!(session.memory().is_some());
3826 }
3827
3828 #[tokio::test(flavor = "multi_thread")]
3829 async fn test_memory_remember_and_recall() {
3830 use a3s_memory::InMemoryStore;
3831 let store = Arc::new(InMemoryStore::new());
3832 let agent = Agent::from_config(test_config()).await.unwrap();
3833 let opts = SessionOptions::new().with_memory(store);
3834 let session = agent
3835 .session("/tmp/test-ws-mem-recall", Some(opts))
3836 .unwrap();
3837
3838 let memory = session.memory().unwrap();
3839 memory
3840 .remember_success("write a file", &["write".to_string()], "done")
3841 .await
3842 .unwrap();
3843
3844 let results = memory.recall_similar("write", 5).await.unwrap();
3845 assert!(!results.is_empty());
3846 let stats = memory.stats().await.unwrap();
3847 assert_eq!(stats.long_term_count, 1);
3848 }
3849
3850 #[tokio::test(flavor = "multi_thread")]
3855 async fn test_session_tool_timeout_configured() {
3856 let agent = Agent::from_config(test_config()).await.unwrap();
3857 let opts = SessionOptions::new().with_tool_timeout(5000);
3858 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
3859 assert!(!session.id().is_empty());
3860 }
3861
3862 #[tokio::test(flavor = "multi_thread")]
3867 async fn test_session_without_queue_builds_ok() {
3868 let agent = Agent::from_config(test_config()).await.unwrap();
3869 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
3870 assert!(!session.id().is_empty());
3871 }
3872
3873 #[tokio::test(flavor = "multi_thread")]
3878 async fn test_concurrent_history_reads() {
3879 let agent = Agent::from_config(test_config()).await.unwrap();
3880 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
3881
3882 let handles: Vec<_> = (0..10)
3883 .map(|_| {
3884 let s = Arc::clone(&session);
3885 tokio::spawn(async move { s.history().len() })
3886 })
3887 .collect();
3888
3889 for h in handles {
3890 h.await.unwrap();
3891 }
3892 }
3893
3894 #[tokio::test(flavor = "multi_thread")]
3899 async fn test_session_no_init_warning_without_file_memory() {
3900 let agent = Agent::from_config(test_config()).await.unwrap();
3901 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
3902 assert!(session.init_warning().is_none());
3903 }
3904
3905 #[tokio::test(flavor = "multi_thread")]
3906 async fn test_register_agent_dir_loads_agents_into_live_session() {
3907 let temp_dir = tempfile::tempdir().unwrap();
3908
3909 std::fs::write(
3911 temp_dir.path().join("my-agent.yaml"),
3912 "name: my-dynamic-agent\ndescription: Dynamically registered agent\n",
3913 )
3914 .unwrap();
3915
3916 let agent = Agent::from_config(test_config()).await.unwrap();
3917 let session = agent.session(".", None).unwrap();
3918
3919 assert!(!session.agent_registry.exists("my-dynamic-agent"));
3921
3922 let count = session.register_agent_dir(temp_dir.path());
3923 assert_eq!(count, 1);
3924 assert!(session.agent_registry.exists("my-dynamic-agent"));
3925 }
3926
3927 #[tokio::test(flavor = "multi_thread")]
3928 async fn test_register_agent_dir_empty_dir_returns_zero() {
3929 let temp_dir = tempfile::tempdir().unwrap();
3930 let agent = Agent::from_config(test_config()).await.unwrap();
3931 let session = agent.session(".", None).unwrap();
3932 let count = session.register_agent_dir(temp_dir.path());
3933 assert_eq!(count, 0);
3934 }
3935
3936 #[tokio::test(flavor = "multi_thread")]
3937 async fn test_register_agent_dir_nonexistent_returns_zero() {
3938 let agent = Agent::from_config(test_config()).await.unwrap();
3939 let session = agent.session(".", None).unwrap();
3940 let count = session.register_agent_dir(std::path::Path::new("/nonexistent/path/abc"));
3941 assert_eq!(count, 0);
3942 }
3943
3944 #[tokio::test(flavor = "multi_thread")]
3945 async fn test_session_with_mcp_manager_builds_ok() {
3946 use crate::mcp::manager::McpManager;
3947 let mcp = Arc::new(McpManager::new());
3948 let agent = Agent::from_config(test_config()).await.unwrap();
3949 let opts = SessionOptions::new().with_mcp(mcp);
3950 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
3952 assert!(!session.id().is_empty());
3953 }
3954
3955 #[test]
3956 fn test_session_command_is_pub() {
3957 use crate::SessionCommand;
3959 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
3960 }
3961
3962 #[tokio::test(flavor = "multi_thread")]
3963 async fn test_session_submit_with_queue_executes() {
3964 let agent = Agent::from_config(test_config()).await.unwrap();
3965 let qc = SessionQueueConfig::default();
3966 let opts = SessionOptions::new().with_queue_config(qc);
3967 let session = agent
3968 .session("/tmp/test-ws-submit-exec", Some(opts))
3969 .unwrap();
3970
3971 struct Echo(serde_json::Value);
3972 #[async_trait::async_trait]
3973 impl crate::queue::SessionCommand for Echo {
3974 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
3975 Ok(self.0.clone())
3976 }
3977 fn command_type(&self) -> &str {
3978 "echo"
3979 }
3980 }
3981
3982 let rx = session
3983 .submit(
3984 SessionLane::Query,
3985 Box::new(Echo(serde_json::json!({"ok": true}))),
3986 )
3987 .await
3988 .expect("submit should succeed with queue configured");
3989
3990 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
3991 .await
3992 .expect("timed out waiting for command result")
3993 .expect("channel closed before result")
3994 .expect("command returned an error");
3995
3996 assert_eq!(result["ok"], true);
3997 }
3998}