1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{CommandAction, CommandContext, CommandRegistry};
21use crate::config::CodeConfig;
22use crate::context::{ContextItem, ContextType, StaticContextProvider};
23use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
24use crate::hitl::PendingConfirmationInfo;
25use crate::llm::{LlmClient, Message};
26use crate::prompts::{PlanningMode, SystemPromptSlots};
27use crate::queue::{
28 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
29 SessionQueueStats,
30};
31use crate::session_lane_queue::SessionLaneQueue;
32use crate::text::truncate_utf8;
33use crate::tools::{ToolContext, ToolExecutor};
34use a3s_lane::{DeadLetter, MetricsSnapshot};
35use a3s_memory::{FileMemoryStore, MemoryStore};
36use anyhow::Context;
37use std::collections::HashMap;
38use std::path::{Path, PathBuf};
39use std::sync::{Arc, RwLock};
40use tokio::sync::{broadcast, mpsc};
41use tokio::task::JoinHandle;
42
43fn safe_canonicalize(path: &Path) -> PathBuf {
46 match std::fs::canonicalize(path) {
47 Ok(p) => strip_unc_prefix(p),
48 Err(_) => path.to_path_buf(),
49 }
50}
51
52fn strip_unc_prefix(path: PathBuf) -> PathBuf {
55 #[cfg(windows)]
56 {
57 let s = path.to_string_lossy();
58 if let Some(stripped) = s.strip_prefix(r"\\?\") {
59 return PathBuf::from(stripped);
60 }
61 }
62 path
63}
64
65#[derive(Debug, Clone)]
71pub struct ToolCallResult {
72 pub name: String,
73 pub output: String,
74 pub exit_code: i32,
75 pub metadata: Option<serde_json::Value>,
76}
77
78#[derive(Clone, Default)]
84pub struct SessionOptions {
85 pub model: Option<String>,
87 pub agent_dirs: Vec<PathBuf>,
90 pub queue_config: Option<SessionQueueConfig>,
95 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
97 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
99 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
101 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
103 pub planning_mode: PlanningMode,
105 pub goal_tracking: bool,
107 pub skill_dirs: Vec<PathBuf>,
110 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
112 pub memory_store: Option<Arc<dyn MemoryStore>>,
114 pub(crate) file_memory_dir: Option<PathBuf>,
116 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
118 pub session_id: Option<String>,
120 pub auto_save: bool,
122 pub artifact_store_limits: Option<crate::tools::ArtifactStoreLimits>,
124 pub max_parse_retries: Option<u32>,
127 pub tool_timeout_ms: Option<u64>,
130 pub circuit_breaker_threshold: Option<u32>,
134 pub sandbox_handle: Option<Arc<dyn crate::sandbox::BashSandbox>>,
140 pub auto_compact: bool,
142 pub auto_compact_threshold: Option<f32>,
145 pub continuation_enabled: Option<bool>,
148 pub max_continuation_turns: Option<u32>,
151 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
156 pub temperature: Option<f32>,
158 pub thinking_budget: Option<usize>,
160 pub max_tool_rounds: Option<usize>,
166 pub prompt_slots: Option<SystemPromptSlots>,
172 pub hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
179 pub plugins: Vec<std::sync::Arc<dyn crate::plugin::Plugin>>,
188}
189
190impl std::fmt::Debug for SessionOptions {
191 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
192 f.debug_struct("SessionOptions")
193 .field("model", &self.model)
194 .field("agent_dirs", &self.agent_dirs)
195 .field("skill_dirs", &self.skill_dirs)
196 .field("queue_config", &self.queue_config)
197 .field("security_provider", &self.security_provider.is_some())
198 .field("context_providers", &self.context_providers.len())
199 .field("confirmation_manager", &self.confirmation_manager.is_some())
200 .field("permission_checker", &self.permission_checker.is_some())
201 .field("planning_mode", &self.planning_mode)
202 .field("goal_tracking", &self.goal_tracking)
203 .field(
204 "skill_registry",
205 &self
206 .skill_registry
207 .as_ref()
208 .map(|r| format!("{} skills", r.len())),
209 )
210 .field("memory_store", &self.memory_store.is_some())
211 .field("session_store", &self.session_store.is_some())
212 .field("session_id", &self.session_id)
213 .field("auto_save", &self.auto_save)
214 .field("artifact_store_limits", &self.artifact_store_limits)
215 .field("max_parse_retries", &self.max_parse_retries)
216 .field("tool_timeout_ms", &self.tool_timeout_ms)
217 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
218 .field("sandbox_handle", &self.sandbox_handle.is_some())
219 .field("auto_compact", &self.auto_compact)
220 .field("auto_compact_threshold", &self.auto_compact_threshold)
221 .field("continuation_enabled", &self.continuation_enabled)
222 .field("max_continuation_turns", &self.max_continuation_turns)
223 .field(
224 "plugins",
225 &self.plugins.iter().map(|p| p.name()).collect::<Vec<_>>(),
226 )
227 .field("mcp_manager", &self.mcp_manager.is_some())
228 .field("temperature", &self.temperature)
229 .field("thinking_budget", &self.thinking_budget)
230 .field("max_tool_rounds", &self.max_tool_rounds)
231 .field("prompt_slots", &self.prompt_slots.is_some())
232 .finish()
233 }
234}
235
236impl SessionOptions {
237 pub fn new() -> Self {
238 Self::default()
239 }
240
241 pub fn with_plugin(mut self, plugin: impl crate::plugin::Plugin + 'static) -> Self {
246 self.plugins.push(std::sync::Arc::new(plugin));
247 self
248 }
249
250 pub fn with_model(mut self, model: impl Into<String>) -> Self {
251 self.model = Some(model.into());
252 self
253 }
254
255 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
256 self.agent_dirs.push(dir.into());
257 self
258 }
259
260 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
261 self.queue_config = Some(config);
262 self
263 }
264
265 pub fn with_default_security(mut self) -> Self {
267 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
268 self
269 }
270
271 pub fn with_security_provider(
273 mut self,
274 provider: Arc<dyn crate::security::SecurityProvider>,
275 ) -> Self {
276 self.security_provider = Some(provider);
277 self
278 }
279
280 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
282 let config = crate::context::FileSystemContextConfig::new(root_path);
283 self.context_providers
284 .push(Arc::new(crate::context::FileSystemContextProvider::new(
285 config,
286 )));
287 self
288 }
289
290 pub fn with_context_provider(
292 mut self,
293 provider: Arc<dyn crate::context::ContextProvider>,
294 ) -> Self {
295 self.context_providers.push(provider);
296 self
297 }
298
299 pub fn with_confirmation_manager(
301 mut self,
302 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
303 ) -> Self {
304 self.confirmation_manager = Some(manager);
305 self
306 }
307
308 pub fn with_permission_checker(
310 mut self,
311 checker: Arc<dyn crate::permissions::PermissionChecker>,
312 ) -> Self {
313 self.permission_checker = Some(checker);
314 self
315 }
316
317 pub fn with_planning_mode(mut self, mode: PlanningMode) -> Self {
319 self.planning_mode = mode;
320 self
321 }
322
323 pub fn with_planning(mut self, enabled: bool) -> Self {
325 self.planning_mode = if enabled {
326 PlanningMode::Enabled
327 } else {
328 PlanningMode::Disabled
329 };
330 self
331 }
332
333 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
335 self.goal_tracking = enabled;
336 self
337 }
338
339 pub fn with_builtin_skills(mut self) -> Self {
341 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
342 self
343 }
344
345 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
347 self.skill_registry = Some(registry);
348 self
349 }
350
351 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
354 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
355 self
356 }
357
358 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
360 let registry = self
361 .skill_registry
362 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
363 if let Err(e) = registry.load_from_dir(&dir) {
364 tracing::warn!(
365 dir = %dir.as_ref().display(),
366 error = %e,
367 "Failed to load skills from directory — continuing without them"
368 );
369 }
370 self.skill_registry = Some(registry);
371 self
372 }
373
374 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
376 self.memory_store = Some(store);
377 self
378 }
379
380 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
386 self.file_memory_dir = Some(dir.into());
387 self
388 }
389
390 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
392 self.session_store = Some(store);
393 self
394 }
395
396 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
398 let dir = dir.into();
399 match tokio::runtime::Handle::try_current() {
400 Ok(handle) => {
401 match tokio::task::block_in_place(|| {
402 handle.block_on(crate::store::FileSessionStore::new(dir))
403 }) {
404 Ok(store) => {
405 self.session_store =
406 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
407 }
408 Err(e) => {
409 tracing::warn!("Failed to create file session store: {}", e);
410 }
411 }
412 }
413 Err(_) => {
414 tracing::warn!(
415 "No async runtime available for file session store — persistence disabled"
416 );
417 }
418 }
419 self
420 }
421
422 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
424 self.session_id = Some(id.into());
425 self
426 }
427
428 pub fn with_auto_save(mut self, enabled: bool) -> Self {
430 self.auto_save = enabled;
431 self
432 }
433
434 pub fn with_artifact_store_limits(mut self, limits: crate::tools::ArtifactStoreLimits) -> Self {
436 self.artifact_store_limits = Some(limits);
437 self
438 }
439
440 pub fn with_parse_retries(mut self, max: u32) -> Self {
446 self.max_parse_retries = Some(max);
447 self
448 }
449
450 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
456 self.tool_timeout_ms = Some(timeout_ms);
457 self
458 }
459
460 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
466 self.circuit_breaker_threshold = Some(threshold);
467 self
468 }
469
470 pub fn with_resilience_defaults(self) -> Self {
476 self.with_parse_retries(2)
477 .with_tool_timeout(120_000)
478 .with_circuit_breaker(3)
479 }
480
481 pub fn with_sandbox_handle(mut self, handle: Arc<dyn crate::sandbox::BashSandbox>) -> Self {
489 self.sandbox_handle = Some(handle);
490 self
491 }
492
493 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
498 self.auto_compact = enabled;
499 self
500 }
501
502 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
504 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
505 self
506 }
507
508 pub fn with_continuation(mut self, enabled: bool) -> Self {
513 self.continuation_enabled = Some(enabled);
514 self
515 }
516
517 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
519 self.max_continuation_turns = Some(turns);
520 self
521 }
522
523 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
528 self.mcp_manager = Some(manager);
529 self
530 }
531
532 pub fn with_temperature(mut self, temperature: f32) -> Self {
533 self.temperature = Some(temperature);
534 self
535 }
536
537 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
538 self.thinking_budget = Some(budget);
539 self
540 }
541
542 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
547 self.max_tool_rounds = Some(rounds);
548 self
549 }
550
551 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
556 self.prompt_slots = Some(slots);
557 self
558 }
559
560 pub fn with_hook_executor(mut self, executor: Arc<dyn crate::hooks::HookExecutor>) -> Self {
566 self.hook_executor = Some(executor);
567 self
568 }
569}
570
571pub struct Agent {
580 llm_client: Arc<dyn LlmClient>,
581 code_config: CodeConfig,
582 config: AgentConfig,
583 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
585 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
588}
589
590impl std::fmt::Debug for Agent {
591 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
592 f.debug_struct("Agent").finish()
593 }
594}
595
596impl Agent {
597 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
601 let source = config_source.into();
602
603 let expanded = if let Some(rest) = source.strip_prefix("~/") {
605 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
606 if let Some(home) = home {
607 PathBuf::from(home).join(rest).display().to_string()
608 } else {
609 source.clone()
610 }
611 } else {
612 source.clone()
613 };
614
615 let path = Path::new(&expanded);
616
617 let ext = path.extension().and_then(|ext| ext.to_str());
618
619 let config = if matches!(ext, Some("acl")) {
620 if !path.exists() {
621 return Err(CodeError::Config(format!(
622 "Config file not found: {}",
623 path.display()
624 )));
625 }
626
627 CodeConfig::from_file(path)
628 .with_context(|| format!("Failed to load config: {}", path.display()))?
629 } else if matches!(ext, Some("hcl")) {
630 return Err(CodeError::Config(
631 "HCL config files are not supported in 2.0; rename the file to .acl".into(),
632 ));
633 } else if source.trim().starts_with('{') {
634 return Err(CodeError::Config(
635 "JSON config is not supported; use ACL-compatible .acl config".into(),
636 ));
637 } else if matches!(ext, Some("json")) {
638 return Err(CodeError::Config(
639 "JSON config files are not supported; use .acl".into(),
640 ));
641 } else {
642 CodeConfig::from_acl(&source).context("Failed to parse config as ACL string")?
643 };
644
645 Self::from_config(config).await
646 }
647
648 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
653 Self::new(config_source).await
654 }
655
656 pub async fn from_config(config: CodeConfig) -> Result<Self> {
658 let llm_config = config
659 .default_llm_config()
660 .context("default_model must be set in 'provider/model' format with a valid API key")?;
661 let llm_client = crate::llm::create_client_with_config(llm_config);
662
663 let agent_config = AgentConfig {
664 max_tool_rounds: config
665 .max_tool_rounds
666 .unwrap_or(AgentConfig::default().max_tool_rounds),
667 ..AgentConfig::default()
668 };
669
670 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
672 (None, vec![])
673 } else {
674 let manager = Arc::new(crate::mcp::manager::McpManager::new());
675 for server in &config.mcp_servers {
676 if !server.enabled {
677 continue;
678 }
679 manager.register_server(server.clone()).await;
680 if let Err(e) = manager.connect(&server.name).await {
681 tracing::warn!(
682 server = %server.name,
683 error = %e,
684 "Failed to connect to MCP server — skipping"
685 );
686 }
687 }
688 let tools = manager.get_all_tools().await;
690 (Some(manager), tools)
691 };
692
693 let mut agent = Agent {
694 llm_client,
695 code_config: config,
696 config: agent_config,
697 global_mcp,
698 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
699 };
700
701 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
703 for dir in &agent.code_config.skill_dirs.clone() {
704 if let Err(e) = registry.load_from_dir(dir) {
705 tracing::warn!(
706 dir = %dir.display(),
707 error = %e,
708 "Failed to load skills from directory — skipping"
709 );
710 }
711 }
712 agent.config.skill_registry = Some(registry);
713
714 Ok(agent)
715 }
716
717 pub async fn refresh_mcp_tools(&self) -> Result<()> {
725 if let Some(ref mcp) = self.global_mcp {
726 let fresh = mcp.get_all_tools().await;
727 *self
728 .global_mcp_tools
729 .lock()
730 .expect("global_mcp_tools lock poisoned") = fresh;
731 }
732 Ok(())
733 }
734
735 pub fn session(
740 &self,
741 workspace: impl Into<String>,
742 options: Option<SessionOptions>,
743 ) -> Result<AgentSession> {
744 let opts = options.unwrap_or_default();
745
746 let mut merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
749 (Some(global), Some(session)) => {
750 let global = Arc::clone(global);
751 let session_mgr = Arc::clone(session);
752 match tokio::runtime::Handle::try_current() {
753 Ok(handle) => {
754 let global_for_merge = Arc::clone(&global);
755 tokio::task::block_in_place(|| {
756 handle.block_on(async move {
757 for config in session_mgr.all_configs().await {
758 let name = config.name.clone();
759 global_for_merge.register_server(config).await;
760 if let Err(e) = global_for_merge.connect(&name).await {
761 tracing::warn!(
762 server = %name,
763 error = %e,
764 "Failed to connect session-level MCP server — skipping"
765 );
766 }
767 }
768 })
769 });
770 }
771 Err(_) => {
772 tracing::warn!(
773 "No async runtime available to merge session-level MCP servers \
774 into global manager — session MCP servers will not be available"
775 );
776 }
777 }
778 SessionOptions {
779 mcp_manager: Some(Arc::clone(&global)),
780 ..opts
781 }
782 }
783 (Some(global), None) => SessionOptions {
784 mcp_manager: Some(Arc::clone(global)),
785 ..opts
786 },
787 _ => opts,
788 };
789
790 let session_id = merged_opts
791 .session_id
792 .clone()
793 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
794 merged_opts.session_id = Some(session_id.clone());
795 let llm_client = self.resolve_session_llm_client(&merged_opts, Some(&session_id))?;
796
797 self.build_session(workspace.into(), llm_client, &merged_opts)
798 }
799
800 pub fn session_for_agent(
815 &self,
816 workspace: impl Into<String>,
817 def: &crate::subagent::AgentDefinition,
818 extra: Option<SessionOptions>,
819 ) -> Result<AgentSession> {
820 let mut opts = extra.unwrap_or_default();
821
822 if opts.permission_checker.is_none()
824 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
825 {
826 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
827 }
828
829 if opts.max_tool_rounds.is_none() {
831 if let Some(steps) = def.max_steps {
832 opts.max_tool_rounds = Some(steps);
833 }
834 }
835
836 if opts.model.is_none() {
838 if let Some(ref m) = def.model {
839 let provider = m.provider.as_deref().unwrap_or("anthropic");
840 opts.model = Some(format!("{}/{}", provider, m.model));
841 }
842 }
843
844 if let Some(ref prompt) = def.prompt {
851 let slots = opts
852 .prompt_slots
853 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
854 if slots.extra.is_none() {
855 slots.extra = Some(prompt.clone());
856 }
857 }
858
859 self.session(workspace, Some(opts))
860 }
861
862 pub fn resume_session(
870 &self,
871 session_id: &str,
872 options: SessionOptions,
873 ) -> Result<AgentSession> {
874 let store = options.session_store.clone().ok_or_else(|| {
875 crate::error::CodeError::Session(
876 "resume_session requires a session_store in SessionOptions".to_string(),
877 )
878 })?;
879
880 let data = match tokio::runtime::Handle::try_current() {
882 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
883 .map_err(|e| {
884 crate::error::CodeError::Session(format!(
885 "Failed to load session {}: {}",
886 session_id, e
887 ))
888 })?,
889 Err(_) => {
890 return Err(crate::error::CodeError::Session(
891 "No async runtime available for session resume".to_string(),
892 ))
893 }
894 };
895
896 let data = data.ok_or_else(|| {
897 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
898 })?;
899
900 let mut opts = options;
902 opts.session_id = Some(data.id.clone());
903 let llm_client = self.resolve_session_llm_client(&opts, Some(&data.id))?;
904
905 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
906
907 *write_or_recover(&session.history) = data.messages;
909 let artifacts = match tokio::runtime::Handle::try_current() {
910 Ok(handle) => {
911 tokio::task::block_in_place(|| handle.block_on(store.load_artifacts(&data.id)))
912 .map_err(|e| {
913 crate::error::CodeError::Session(format!(
914 "Failed to load artifacts for session {}: {}",
915 data.id, e
916 ))
917 })?
918 }
919 Err(_) => None,
920 };
921 if let Some(artifacts) = artifacts {
922 let target_store = session.tool_executor.artifact_store();
923 for artifact in artifacts.artifacts() {
924 target_store.put(artifact);
925 }
926 }
927
928 let trace_events = match tokio::runtime::Handle::try_current() {
929 Ok(handle) => {
930 tokio::task::block_in_place(|| handle.block_on(store.load_trace_events(&data.id)))
931 .map_err(|e| {
932 crate::error::CodeError::Session(format!(
933 "Failed to load trace events for session {}: {}",
934 data.id, e
935 ))
936 })?
937 }
938 Err(_) => None,
939 };
940 if let Some(events) = trace_events {
941 session.trace_sink.replace_events(events);
942 }
943
944 let verification_reports = match tokio::runtime::Handle::try_current() {
945 Ok(handle) => tokio::task::block_in_place(|| {
946 handle.block_on(store.load_verification_reports(&data.id))
947 })
948 .map_err(|e| {
949 crate::error::CodeError::Session(format!(
950 "Failed to load verification reports for session {}: {}",
951 data.id, e
952 ))
953 })?,
954 Err(_) => None,
955 };
956 if let Some(reports) = verification_reports {
957 *write_or_recover(&session.verification_reports) = reports;
958 }
959
960 Ok(session)
961 }
962
963 fn resolve_session_llm_client(
964 &self,
965 opts: &SessionOptions,
966 session_id: Option<&str>,
967 ) -> Result<Arc<dyn LlmClient>> {
968 let model_ref = if let Some(ref model) = opts.model {
969 model.as_str()
970 } else {
971 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
972 tracing::warn!(
973 "temperature/thinking_budget set without model override — these will be ignored. \
974 Use with_model() to apply LLM parameter overrides."
975 );
976 }
977 self.code_config
978 .default_model
979 .as_deref()
980 .context("default_model must be set in 'provider/model' format")?
981 };
982
983 let (provider_name, model_id) = model_ref
984 .split_once('/')
985 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
986
987 let mut llm_config = self
988 .code_config
989 .llm_config(provider_name, model_id)
990 .with_context(|| {
991 format!("provider '{provider_name}' or model '{model_id}' not found in config")
992 })?;
993
994 if opts.model.is_some() {
995 if let Some(temp) = opts.temperature {
996 llm_config = llm_config.with_temperature(temp);
997 }
998 if let Some(budget) = opts.thinking_budget {
999 llm_config = llm_config.with_thinking_budget(budget);
1000 }
1001 }
1002
1003 if let Some(session_id) = session_id {
1004 llm_config = llm_config.with_session_id(session_id);
1005 }
1006
1007 Ok(crate::llm::create_client_with_config(llm_config))
1008 }
1009
1010 fn build_session(
1011 &self,
1012 workspace: String,
1013 llm_client: Arc<dyn LlmClient>,
1014 opts: &SessionOptions,
1015 ) -> Result<AgentSession> {
1016 let canonical = safe_canonicalize(Path::new(&workspace));
1017
1018 let artifact_limits = opts.artifact_store_limits.unwrap_or_default();
1019 let tool_executor = Arc::new(ToolExecutor::new_with_artifact_limits(
1020 canonical.display().to_string(),
1021 artifact_limits,
1022 ));
1023 let trace_sink = crate::trace::InMemoryTraceSink::default();
1024 tool_executor.set_trace_sink(Arc::new(trace_sink.clone()));
1025
1026 if let Some(ref search_config) = self.code_config.search {
1028 tool_executor
1029 .registry()
1030 .set_search_config(search_config.clone());
1031 }
1032
1033 let agent_registry = {
1037 use crate::subagent::{load_agents_from_dir, AgentRegistry};
1038 use crate::tools::register_task_with_mcp;
1039 let registry = AgentRegistry::new();
1040 for dir in self
1041 .code_config
1042 .agent_dirs
1043 .iter()
1044 .chain(opts.agent_dirs.iter())
1045 {
1046 for agent in load_agents_from_dir(dir) {
1047 registry.register(agent);
1048 }
1049 }
1050 let registry = Arc::new(registry);
1051 register_task_with_mcp(
1052 tool_executor.registry(),
1053 Arc::clone(&llm_client),
1054 Arc::clone(®istry),
1055 canonical.display().to_string(),
1056 opts.mcp_manager.clone(),
1057 );
1058 registry
1059 };
1060
1061 if let Some(ref mcp) = opts.mcp_manager {
1064 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
1067 Arc::as_ptr(mcp),
1068 self.global_mcp
1069 .as_ref()
1070 .map(Arc::as_ptr)
1071 .unwrap_or(std::ptr::null()),
1072 ) {
1073 self.global_mcp_tools
1075 .lock()
1076 .expect("global_mcp_tools lock poisoned")
1077 .clone()
1078 } else {
1079 match tokio::runtime::Handle::try_current() {
1081 Ok(handle) => {
1082 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
1083 }
1084 Err(_) => {
1085 tracing::warn!(
1086 "No async runtime available for session-level MCP tools — \
1087 MCP tools will not be registered"
1088 );
1089 vec![]
1090 }
1091 }
1092 };
1093
1094 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
1095 std::collections::HashMap::new();
1096 for (server, tool) in all_tools {
1097 by_server.entry(server).or_default().push(tool);
1098 }
1099 for (server_name, tools) in by_server {
1100 for tool in
1101 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
1102 {
1103 tool_executor.register_dynamic_tool(tool);
1104 }
1105 }
1106 }
1107
1108 let tool_defs = tool_executor.definitions();
1109
1110 let prompt_slots = opts
1112 .prompt_slots
1113 .clone()
1114 .unwrap_or_else(|| self.config.prompt_slots.clone());
1115
1116 let mut context_providers = opts.context_providers.clone();
1117
1118 let agents_md_path = canonical.join("AGENTS.md");
1120 if agents_md_path.exists() && agents_md_path.is_file() {
1121 match std::fs::read_to_string(&agents_md_path) {
1122 Ok(content) if !content.trim().is_empty() => {
1123 tracing::info!(
1124 path = %agents_md_path.display(),
1125 "Auto-loaded AGENTS.md from workspace root"
1126 );
1127 let token_count = content.split_whitespace().count().max(1);
1128 let item = ContextItem::new(
1129 "agents_md",
1130 ContextType::Resource,
1131 format!("# Project Instructions (AGENTS.md)\n\n{}", content),
1132 )
1133 .with_source(format!("file://{}", agents_md_path.display()))
1134 .with_provenance("workspace_instructions")
1135 .with_priority(0.95)
1136 .with_trust(0.95)
1137 .with_freshness(1.0)
1138 .with_relevance(0.95)
1139 .with_token_count(token_count);
1140
1141 context_providers.push(Arc::new(
1142 StaticContextProvider::new("agents_md").with_item(item),
1143 ));
1144 }
1145 Ok(_) => {
1146 tracing::debug!(
1147 path = %agents_md_path.display(),
1148 "AGENTS.md exists but is empty — skipping"
1149 );
1150 }
1151 Err(e) => {
1152 tracing::warn!(
1153 path = %agents_md_path.display(),
1154 error = %e,
1155 "Failed to read AGENTS.md — skipping"
1156 );
1157 }
1158 }
1159 }
1160
1161 let base_registry = self
1165 .config
1166 .skill_registry
1167 .as_deref()
1168 .map(|r| r.fork())
1169 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
1170 if let Some(ref r) = opts.skill_registry {
1172 for skill in r.all() {
1173 base_registry.register_unchecked(skill);
1174 }
1175 }
1176 for dir in &opts.skill_dirs {
1178 if let Err(e) = base_registry.load_from_dir(dir) {
1179 tracing::warn!(
1180 dir = %dir.display(),
1181 error = %e,
1182 "Failed to load session skill dir — skipping"
1183 );
1184 }
1185 }
1186 let effective_registry = Arc::new(base_registry);
1187
1188 if !opts.plugins.is_empty() {
1191 use crate::plugin::PluginContext;
1192 let plugin_ctx = PluginContext::new()
1193 .with_llm(Arc::clone(&self.llm_client))
1194 .with_skill_registry(Arc::clone(&effective_registry));
1195 let plugin_registry = tool_executor.registry();
1196 for plugin in &opts.plugins {
1197 tracing::info!("Loading plugin '{}' v{}", plugin.name(), plugin.version());
1198 match plugin.load(plugin_registry, &plugin_ctx) {
1199 Ok(()) => {
1200 for skill in plugin.skills() {
1201 tracing::debug!(
1202 "Plugin '{}' registered skill '{}'",
1203 plugin.name(),
1204 skill.name
1205 );
1206 effective_registry.register_unchecked(skill);
1207 }
1208 }
1209 Err(e) => {
1210 tracing::error!("Plugin '{}' failed to load: {}", plugin.name(), e);
1211 }
1212 }
1213 }
1214 }
1215
1216 let skill_prompt = effective_registry.to_system_prompt();
1218 if !skill_prompt.is_empty() {
1219 let item = ContextItem::new("skills_catalog", ContextType::Skill, skill_prompt)
1220 .with_source("a3s://skills/catalog")
1221 .with_provenance("skill_registry")
1222 .with_priority(0.85)
1223 .with_trust(0.9)
1224 .with_freshness(1.0)
1225 .with_relevance(1.0);
1226 context_providers.push(Arc::new(
1227 StaticContextProvider::new("skills_catalog").with_item(item),
1228 ));
1229 }
1230
1231 let mut init_warning: Option<String> = None;
1233 let memory = {
1234 let store = if let Some(ref store) = opts.memory_store {
1235 Some(Arc::clone(store))
1236 } else if let Some(ref dir) = opts.file_memory_dir {
1237 match tokio::runtime::Handle::try_current() {
1238 Ok(handle) => {
1239 let dir = dir.clone();
1240 match tokio::task::block_in_place(|| {
1241 handle.block_on(FileMemoryStore::new(dir))
1242 }) {
1243 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1244 Err(e) => {
1245 let msg = format!("Failed to create file memory store: {}", e);
1246 tracing::warn!("{}", msg);
1247 init_warning = Some(msg);
1248 None
1249 }
1250 }
1251 }
1252 Err(_) => {
1253 let msg =
1254 "No async runtime available for file memory store — memory disabled"
1255 .to_string();
1256 tracing::warn!("{}", msg);
1257 init_warning = Some(msg);
1258 None
1259 }
1260 }
1261 } else {
1262 None
1263 };
1264 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1265 };
1266
1267 let base = self.config.clone();
1268 let config = AgentConfig {
1269 prompt_slots,
1270 tools: tool_defs,
1271 security_provider: opts.security_provider.clone(),
1272 permission_checker: opts.permission_checker.clone(),
1273 confirmation_manager: opts.confirmation_manager.clone(),
1274 context_providers,
1275 planning_mode: opts.planning_mode,
1276 goal_tracking: opts.goal_tracking,
1277 skill_registry: Some(Arc::clone(&effective_registry)),
1278 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1279 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1280 circuit_breaker_threshold: opts
1281 .circuit_breaker_threshold
1282 .unwrap_or(base.circuit_breaker_threshold),
1283 auto_compact: opts.auto_compact,
1284 auto_compact_threshold: opts
1285 .auto_compact_threshold
1286 .unwrap_or(crate::store::DEFAULT_AUTO_COMPACT_THRESHOLD),
1287 max_context_tokens: base.max_context_tokens,
1288 memory: memory.clone(),
1289 continuation_enabled: opts
1290 .continuation_enabled
1291 .unwrap_or(base.continuation_enabled),
1292 max_continuation_turns: opts
1293 .max_continuation_turns
1294 .unwrap_or(base.max_continuation_turns),
1295 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1296 ..base
1297 };
1298
1299 {
1303 use crate::tools::register_skill;
1304 register_skill(
1305 tool_executor.registry(),
1306 Arc::clone(&llm_client),
1307 Arc::clone(&effective_registry),
1308 Arc::clone(&tool_executor),
1309 config.clone(),
1310 );
1311 }
1312
1313 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1316 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1317 let session_id = uuid::Uuid::new_v4().to_string();
1318 let rt = tokio::runtime::Handle::try_current();
1319
1320 match rt {
1321 Ok(handle) => {
1322 let queue = tokio::task::block_in_place(|| {
1324 handle.block_on(SessionLaneQueue::new(
1325 &session_id,
1326 queue_config.clone(),
1327 agent_event_tx.clone(),
1328 ))
1329 });
1330 match queue {
1331 Ok(q) => {
1332 let q = Arc::new(q);
1334 let q2 = Arc::clone(&q);
1335 tokio::task::block_in_place(|| {
1336 handle.block_on(async { q2.start().await.ok() })
1337 });
1338 Some(q)
1339 }
1340 Err(e) => {
1341 tracing::warn!("Failed to create session lane queue: {}", e);
1342 None
1343 }
1344 }
1345 }
1346 Err(_) => {
1347 tracing::warn!(
1348 "No async runtime available for queue creation — queue disabled"
1349 );
1350 None
1351 }
1352 }
1353 } else {
1354 None
1355 };
1356
1357 let mut tool_context = ToolContext::new(canonical.clone());
1359 if let Some(ref search_config) = self.code_config.search {
1360 tool_context = tool_context.with_search_config(search_config.clone());
1361 }
1362 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1363
1364 if let Some(handle) = opts.sandbox_handle.clone() {
1366 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1367 tool_context = tool_context.with_sandbox(handle);
1368 }
1369
1370 let session_id = opts
1371 .session_id
1372 .clone()
1373 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1374
1375 let session_store = if opts.session_store.is_some() {
1377 opts.session_store.clone()
1378 } else if let Some(ref dir) = self.code_config.sessions_dir {
1379 match tokio::runtime::Handle::try_current() {
1380 Ok(handle) => {
1381 let dir = dir.clone();
1382 match tokio::task::block_in_place(|| {
1383 handle.block_on(crate::store::FileSessionStore::new(dir))
1384 }) {
1385 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1386 Err(e) => {
1387 tracing::warn!(
1388 "Failed to create session store from sessions_dir: {}",
1389 e
1390 );
1391 None
1392 }
1393 }
1394 }
1395 Err(_) => {
1396 tracing::warn!(
1397 "No async runtime for sessions_dir store — persistence disabled"
1398 );
1399 None
1400 }
1401 }
1402 } else {
1403 None
1404 };
1405
1406 let command_registry = CommandRegistry::new();
1407
1408 Ok(AgentSession {
1409 llm_client,
1410 tool_executor,
1411 tool_context,
1412 memory: config.memory.clone(),
1413 config,
1414 workspace: canonical,
1415 session_id,
1416 history: Arc::new(RwLock::new(Vec::new())),
1417 command_queue,
1418 session_store,
1419 auto_save: opts.auto_save,
1420 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1421 ahp_executor: opts.hook_executor.clone(),
1422 init_warning,
1423 command_registry: std::sync::Mutex::new(command_registry),
1424 model_name: opts
1425 .model
1426 .clone()
1427 .or_else(|| self.code_config.default_model.clone())
1428 .unwrap_or_else(|| "unknown".to_string()),
1429 mcp_manager: opts
1430 .mcp_manager
1431 .clone()
1432 .or_else(|| self.global_mcp.clone())
1433 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1434 agent_registry,
1435 cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
1436 active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
1437 trace_sink,
1438 verification_reports: Arc::new(RwLock::new(Vec::new())),
1439 })
1440 }
1441}
1442
1443#[derive(Debug, Clone)]
1452pub struct BtwResult {
1453 pub question: String,
1455 pub answer: String,
1457 pub usage: crate::llm::TokenUsage,
1459}
1460
1461pub struct AgentSession {
1471 llm_client: Arc<dyn LlmClient>,
1472 tool_executor: Arc<ToolExecutor>,
1473 tool_context: ToolContext,
1474 config: AgentConfig,
1475 workspace: PathBuf,
1476 session_id: String,
1478 history: Arc<RwLock<Vec<Message>>>,
1480 command_queue: Option<Arc<SessionLaneQueue>>,
1482 memory: Option<Arc<crate::memory::AgentMemory>>,
1484 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1486 auto_save: bool,
1488 hook_engine: Arc<crate::hooks::HookEngine>,
1490 ahp_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
1493 init_warning: Option<String>,
1495 command_registry: std::sync::Mutex<CommandRegistry>,
1498 model_name: String,
1500 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1502 agent_registry: Arc<crate::subagent::AgentRegistry>,
1504 cancel_token: Arc<tokio::sync::Mutex<Option<tokio_util::sync::CancellationToken>>>,
1507 active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1509 trace_sink: crate::trace::InMemoryTraceSink,
1511 verification_reports: Arc<RwLock<Vec<crate::verification::VerificationReport>>>,
1513}
1514
1515#[derive(Debug, Clone)]
1516struct ActiveToolSnapshot {
1517 tool_name: String,
1518 started_at_ms: u64,
1519}
1520
1521#[derive(Clone)]
1522struct SessionPersistenceContext {
1523 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1524 session_id: String,
1525 workspace: PathBuf,
1526 config: AgentConfig,
1527 tool_executor: Arc<ToolExecutor>,
1528 trace_sink: crate::trace::InMemoryTraceSink,
1529 history: Arc<RwLock<Vec<Message>>>,
1530 verification_reports: Arc<RwLock<Vec<crate::verification::VerificationReport>>>,
1531 auto_save: bool,
1532}
1533
1534impl SessionPersistenceContext {
1535 fn from_session(session: &AgentSession) -> Self {
1536 Self {
1537 session_store: session.session_store.clone(),
1538 session_id: session.session_id.clone(),
1539 workspace: session.workspace.clone(),
1540 config: session.config.clone(),
1541 tool_executor: Arc::clone(&session.tool_executor),
1542 trace_sink: session.trace_sink.clone(),
1543 history: Arc::clone(&session.history),
1544 verification_reports: Arc::clone(&session.verification_reports),
1545 auto_save: session.auto_save,
1546 }
1547 }
1548
1549 fn record_result(&self, result: &AgentResult) {
1550 *write_or_recover(&self.history) = result.messages.clone();
1551 if !result.verification_reports.is_empty() {
1552 write_or_recover(&self.verification_reports)
1553 .extend(result.verification_reports.clone());
1554 }
1555 }
1556
1557 async fn save(&self) -> Result<()> {
1558 let store = match &self.session_store {
1559 Some(store) => store,
1560 None => return Ok(()),
1561 };
1562
1563 let history = read_or_recover(&self.history).clone();
1564 let verification_reports = read_or_recover(&self.verification_reports).clone();
1565 let now = chrono::Utc::now().timestamp();
1566
1567 let data = crate::store::SessionData {
1568 id: self.session_id.clone(),
1569 config: crate::store::SessionConfig {
1570 name: String::new(),
1571 workspace: self.workspace.display().to_string(),
1572 system_prompt: Some(self.config.prompt_slots.build()),
1573 max_context_length: 200_000,
1574 auto_compact: false,
1575 auto_compact_threshold: crate::store::DEFAULT_AUTO_COMPACT_THRESHOLD,
1576 storage_type: crate::config::StorageBackend::File,
1577 queue_config: None,
1578 confirmation_policy: None,
1579 permission_policy: None,
1580 parent_id: None,
1581 security_config: None,
1582 hook_engine: None,
1583 planning_mode: self.config.planning_mode,
1584 goal_tracking: self.config.goal_tracking,
1585 },
1586 state: crate::store::SessionState::Active,
1587 messages: history,
1588 context_usage: crate::store::ContextUsage::default(),
1589 total_usage: crate::llm::TokenUsage::default(),
1590 total_cost: 0.0,
1591 model_name: None,
1592 cost_records: Vec::new(),
1593 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
1594 thinking_enabled: false,
1595 thinking_budget: None,
1596 created_at: now,
1597 updated_at: now,
1598 llm_config: None,
1599 tasks: Vec::new(),
1600 parent_id: None,
1601 };
1602
1603 store.save(&data).await?;
1604 store
1605 .save_artifacts(&self.session_id, &self.tool_executor.artifact_store())
1606 .await?;
1607 store
1608 .save_trace_events(&self.session_id, &self.trace_sink.events())
1609 .await?;
1610 store
1611 .save_verification_reports(&self.session_id, &verification_reports)
1612 .await?;
1613 tracing::debug!("Session {} saved", self.session_id);
1614 Ok(())
1615 }
1616
1617 async fn auto_save_if_enabled(&self) {
1618 if self.auto_save {
1619 if let Err(e) = self.save().await {
1620 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1621 }
1622 }
1623 }
1624}
1625
1626impl std::fmt::Debug for AgentSession {
1627 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1628 f.debug_struct("AgentSession")
1629 .field("session_id", &self.session_id)
1630 .field("workspace", &self.workspace.display().to_string())
1631 .field("auto_save", &self.auto_save)
1632 .finish()
1633 }
1634}
1635
1636impl AgentSession {
1637 fn now_ms() -> u64 {
1638 std::time::SystemTime::now()
1639 .duration_since(std::time::UNIX_EPOCH)
1640 .map(|d| d.as_millis() as u64)
1641 .unwrap_or(0)
1642 }
1643
1644 fn compact_json_value(value: &serde_json::Value) -> String {
1645 let raw = match value {
1646 serde_json::Value::Null => String::new(),
1647 serde_json::Value::String(s) => s.clone(),
1648 _ => serde_json::to_string(value).unwrap_or_default(),
1649 };
1650 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1651 if compact.len() > 180 {
1652 format!("{}...", truncate_utf8(&compact, 180))
1653 } else {
1654 compact
1655 }
1656 }
1657
1658 async fn apply_runtime_event(
1659 active_tools: &Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1660 event: &AgentEvent,
1661 ) {
1662 match event {
1663 AgentEvent::ToolStart { id, name } => {
1664 active_tools.write().await.insert(
1665 id.clone(),
1666 ActiveToolSnapshot {
1667 tool_name: name.clone(),
1668 started_at_ms: Self::now_ms(),
1669 },
1670 );
1671 }
1672 AgentEvent::ToolEnd { id, .. }
1673 | AgentEvent::PermissionDenied { tool_id: id, .. }
1674 | AgentEvent::ConfirmationRequired { tool_id: id, .. }
1675 | AgentEvent::ConfirmationReceived { tool_id: id, .. }
1676 | AgentEvent::ConfirmationTimeout { tool_id: id, .. } => {
1677 active_tools.write().await.remove(id);
1678 }
1679 _ => {}
1680 }
1681 }
1682
1683 async fn clear_runtime_tracking(&self) {
1684 self.active_tools.write().await.clear();
1685 }
1686
1687 fn build_agent_loop(&self) -> AgentLoop {
1691 let mut config = self.config.clone();
1692 config.hook_engine = Some(if let Some(ref ahp) = self.ahp_executor {
1693 ahp.clone()
1694 } else {
1695 Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>
1696 });
1697 config.tools = self.tool_executor.definitions();
1701 let mut agent_loop = AgentLoop::new(
1702 self.llm_client.clone(),
1703 self.tool_executor.clone(),
1704 self.tool_context.clone(),
1705 config,
1706 );
1707 if let Some(ref queue) = self.command_queue {
1708 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1709 }
1710 agent_loop
1711 }
1712
1713 fn build_command_context(&self) -> CommandContext {
1715 let history = read_or_recover(&self.history);
1716
1717 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1719
1720 let mut mcp_map: std::collections::HashMap<String, usize> =
1722 std::collections::HashMap::new();
1723 for name in &tool_names {
1724 if let Some(rest) = name.strip_prefix("mcp__") {
1725 if let Some((server, _)) = rest.split_once("__") {
1726 *mcp_map.entry(server.to_string()).or_default() += 1;
1727 }
1728 }
1729 }
1730 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1731 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1732
1733 CommandContext {
1734 session_id: self.session_id.clone(),
1735 workspace: self.workspace.display().to_string(),
1736 model: self.model_name.clone(),
1737 history_len: history.len(),
1738 total_tokens: 0,
1739 total_cost: 0.0,
1740 tool_names,
1741 mcp_servers,
1742 }
1743 }
1744
1745 pub fn command_registry(&self) -> std::sync::MutexGuard<'_, CommandRegistry> {
1749 self.command_registry
1750 .lock()
1751 .expect("command_registry lock poisoned")
1752 }
1753
1754 pub fn register_command(&self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1758 self.command_registry
1759 .lock()
1760 .expect("command_registry lock poisoned")
1761 .register(cmd);
1762 }
1763
1764 pub async fn close(&self) {
1766 let _ = self.cancel().await;
1767 }
1768
1769 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1778 if CommandRegistry::is_command(prompt) {
1780 let ctx = self.build_command_context();
1781 let output = self.command_registry().dispatch(prompt, &ctx);
1782 if let Some(output) = output {
1784 if let Some(CommandAction::BtwQuery(ref question)) = output.action {
1786 let result = self.btw(question).await?;
1787 return Ok(AgentResult {
1788 text: result.answer,
1789 messages: history
1790 .map(|h| h.to_vec())
1791 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1792 tool_calls_count: 0,
1793 usage: result.usage,
1794 verification_reports: Vec::new(),
1795 });
1796 }
1797 return Ok(AgentResult {
1798 text: output.text,
1799 messages: history
1800 .map(|h| h.to_vec())
1801 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1802 tool_calls_count: 0,
1803 usage: crate::llm::TokenUsage::default(),
1804 verification_reports: Vec::new(),
1805 });
1806 }
1807 }
1808
1809 if let Some(ref w) = self.init_warning {
1810 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1811 }
1812 let agent_loop = self.build_agent_loop();
1813 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
1814 let runtime_state = Arc::clone(&self.active_tools);
1815 let runtime_collector = tokio::spawn(async move {
1816 while let Some(event) = runtime_rx.recv().await {
1817 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1818 }
1819 });
1820
1821 let use_internal = history.is_none();
1822 let effective_history = match history {
1823 Some(h) => h.to_vec(),
1824 None => read_or_recover(&self.history).clone(),
1825 };
1826
1827 let cancel_token = tokio_util::sync::CancellationToken::new();
1828 *self.cancel_token.lock().await = Some(cancel_token.clone());
1829 let result = agent_loop
1830 .execute_with_session(
1831 &effective_history,
1832 prompt,
1833 Some(&self.session_id),
1834 Some(runtime_tx),
1835 Some(&cancel_token),
1836 )
1837 .await;
1838 *self.cancel_token.lock().await = None;
1839 let _ = runtime_collector.await;
1840 let result = result?;
1841
1842 if use_internal {
1845 *write_or_recover(&self.history) = result.messages.clone();
1846 self.record_verification_reports(result.verification_reports.clone());
1847
1848 if self.auto_save {
1850 if let Err(e) = self.save().await {
1851 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1852 }
1853 }
1854 }
1855
1856 self.clear_runtime_tracking().await;
1857
1858 Ok(result)
1859 }
1860
1861 async fn build_btw_runtime_context(&self) -> String {
1862 let mut sections = Vec::new();
1863
1864 let active_tools = {
1865 let tools = self.active_tools.read().await;
1866 let mut items = tools
1867 .iter()
1868 .map(|(tool_id, tool)| {
1869 let elapsed_ms = Self::now_ms().saturating_sub(tool.started_at_ms);
1870 format!(
1871 "- {} [{}] running_for={}ms",
1872 tool.tool_name, tool_id, elapsed_ms
1873 )
1874 })
1875 .collect::<Vec<_>>();
1876 items.sort();
1877 items
1878 };
1879 if !active_tools.is_empty() {
1880 sections.push(format!("[active tools]\n{}", active_tools.join("\n")));
1881 }
1882
1883 if let Some(cm) = &self.config.confirmation_manager {
1884 let pending = cm.pending_confirmations().await;
1885 if !pending.is_empty() {
1886 let mut lines = pending
1887 .into_iter()
1888 .map(
1889 |PendingConfirmationInfo {
1890 tool_id,
1891 tool_name,
1892 args,
1893 remaining_ms,
1894 }| {
1895 let arg_summary = Self::compact_json_value(&args);
1896 if arg_summary.is_empty() {
1897 format!(
1898 "- {} [{}] remaining={}ms",
1899 tool_name, tool_id, remaining_ms
1900 )
1901 } else {
1902 format!(
1903 "- {} [{}] remaining={}ms {}",
1904 tool_name, tool_id, remaining_ms, arg_summary
1905 )
1906 }
1907 },
1908 )
1909 .collect::<Vec<_>>();
1910 lines.sort();
1911 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1912 }
1913 }
1914
1915 if let Some(queue) = &self.command_queue {
1916 let stats = queue.stats().await;
1917 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1918 let mut lines = vec![format!(
1919 "active={}, pending={}, external_pending={}",
1920 stats.total_active, stats.total_pending, stats.external_pending
1921 )];
1922 let mut lanes = stats
1923 .lanes
1924 .into_values()
1925 .filter(|lane| lane.active > 0 || lane.pending > 0)
1926 .map(|lane| {
1927 format!(
1928 "- {:?}: active={}, pending={}, handler={:?}",
1929 lane.lane, lane.active, lane.pending, lane.handler_mode
1930 )
1931 })
1932 .collect::<Vec<_>>();
1933 lanes.sort();
1934 lines.extend(lanes);
1935 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1936 }
1937
1938 let external_tasks = queue.pending_external_tasks().await;
1939 if !external_tasks.is_empty() {
1940 let mut lines = external_tasks
1941 .into_iter()
1942 .take(6)
1943 .map(|task| {
1944 let payload_summary = Self::compact_json_value(&task.payload);
1945 if payload_summary.is_empty() {
1946 format!(
1947 "- {} {:?} remaining={}ms",
1948 task.command_type,
1949 task.lane,
1950 task.remaining_ms()
1951 )
1952 } else {
1953 format!(
1954 "- {} {:?} remaining={}ms {}",
1955 task.command_type,
1956 task.lane,
1957 task.remaining_ms(),
1958 payload_summary
1959 )
1960 }
1961 })
1962 .collect::<Vec<_>>();
1963 lines.sort();
1964 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1965 }
1966 }
1967
1968 if let Some(store) = &self.session_store {
1969 if let Ok(Some(session)) = store.load(&self.session_id).await {
1970 let active_tasks = session
1971 .tasks
1972 .into_iter()
1973 .filter(|task| task.status.is_active())
1974 .take(6)
1975 .map(|task| match task.tool {
1976 Some(tool) if !tool.is_empty() => {
1977 format!("- [{}] {} ({})", task.status, task.content, tool)
1978 }
1979 _ => format!("- [{}] {}", task.status, task.content),
1980 })
1981 .collect::<Vec<_>>();
1982 if !active_tasks.is_empty() {
1983 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
1984 }
1985 }
1986 }
1987
1988 sections.join("\n\n")
1989 }
1990
1991 pub async fn btw(&self, question: &str) -> Result<BtwResult> {
2009 self.btw_with_context(question, None).await
2010 }
2011
2012 pub async fn btw_with_context(
2017 &self,
2018 question: &str,
2019 runtime_context: Option<&str>,
2020 ) -> Result<BtwResult> {
2021 let question = question.trim();
2022 if question.is_empty() {
2023 return Err(crate::error::CodeError::Session(
2024 "btw: question cannot be empty".to_string(),
2025 ));
2026 }
2027
2028 let history_snapshot = read_or_recover(&self.history).clone();
2030
2031 let mut messages = history_snapshot;
2033 let mut injected_sections = Vec::new();
2034 let session_runtime = self.build_btw_runtime_context().await;
2035 if !session_runtime.is_empty() {
2036 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
2037 }
2038 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
2039 injected_sections.push(format!("[host runtime context]\n{}", extra));
2040 }
2041 if !injected_sections.is_empty() {
2042 let injected_context = format!(
2043 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
2044 injected_sections.join("\n\n")
2045 );
2046 messages.push(Message::user(&injected_context));
2047 }
2048 messages.push(Message::user(question));
2049
2050 let response = self
2051 .llm_client
2052 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2053 .await
2054 .map_err(|e| {
2055 crate::error::CodeError::Llm(format!("btw: ephemeral LLM call failed: {e}"))
2056 })?;
2057
2058 Ok(BtwResult {
2059 question: question.to_string(),
2060 answer: response.text(),
2061 usage: response.usage,
2062 })
2063 }
2064
2065 pub async fn send_with_attachments(
2070 &self,
2071 prompt: &str,
2072 attachments: &[crate::llm::Attachment],
2073 history: Option<&[Message]>,
2074 ) -> Result<AgentResult> {
2075 let use_internal = history.is_none();
2079 let mut effective_history = match history {
2080 Some(h) => h.to_vec(),
2081 None => read_or_recover(&self.history).clone(),
2082 };
2083 effective_history.push(Message::user_with_attachments(prompt, attachments));
2084
2085 let agent_loop = self.build_agent_loop();
2086 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2087 let runtime_state = Arc::clone(&self.active_tools);
2088 let runtime_collector = tokio::spawn(async move {
2089 while let Some(event) = runtime_rx.recv().await {
2090 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2091 }
2092 });
2093 let result = agent_loop
2094 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2095 .await?;
2096 let _ = runtime_collector.await;
2097
2098 if use_internal {
2099 *write_or_recover(&self.history) = result.messages.clone();
2100 self.record_verification_reports(result.verification_reports.clone());
2101 if self.auto_save {
2102 if let Err(e) = self.save().await {
2103 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2104 }
2105 }
2106 }
2107
2108 self.clear_runtime_tracking().await;
2109
2110 Ok(result)
2111 }
2112
2113 pub async fn stream_with_attachments(
2118 &self,
2119 prompt: &str,
2120 attachments: &[crate::llm::Attachment],
2121 history: Option<&[Message]>,
2122 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2123 let (tx, rx) = mpsc::channel(256);
2124 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2125 let use_internal = history.is_none();
2126 let mut effective_history = match history {
2127 Some(h) => h.to_vec(),
2128 None => read_or_recover(&self.history).clone(),
2129 };
2130 effective_history.push(Message::user_with_attachments(prompt, attachments));
2131
2132 let agent_loop = self.build_agent_loop();
2133 let persistence = use_internal.then(|| SessionPersistenceContext::from_session(self));
2134 let runtime_state = Arc::clone(&self.active_tools);
2135 let forwarder = tokio::spawn(async move {
2136 while let Some(event) = runtime_rx.recv().await {
2137 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2138 if tx.send(event).await.is_err() {
2139 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2142 break;
2143 }
2144 }
2145 });
2146 let handle = tokio::spawn(async move {
2147 let result = agent_loop
2148 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2149 .await;
2150 if let (Some(persistence), Ok(result)) = (persistence, result) {
2151 persistence.record_result(&result);
2152 persistence.auto_save_if_enabled().await;
2153 }
2154 });
2155 let active_tools = Arc::clone(&self.active_tools);
2156 let wrapped_handle = tokio::spawn(async move {
2157 let _ = handle.await;
2158 let _ = forwarder.await;
2159 active_tools.write().await.clear();
2160 });
2161
2162 Ok((rx, wrapped_handle))
2163 }
2164
2165 pub async fn stream(
2174 &self,
2175 prompt: &str,
2176 history: Option<&[Message]>,
2177 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2178 if CommandRegistry::is_command(prompt) {
2180 let ctx = self.build_command_context();
2181 let output = self.command_registry().dispatch(prompt, &ctx);
2182 if let Some(output) = output {
2184 let (tx, rx) = mpsc::channel(256);
2185
2186 if let Some(CommandAction::BtwQuery(question)) = output.action {
2188 let llm_client = self.llm_client.clone();
2190 let history_snapshot = read_or_recover(&self.history).clone();
2191 let handle = tokio::spawn(async move {
2192 let mut messages = history_snapshot;
2193 messages.push(Message::user(&question));
2194 match llm_client
2195 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2196 .await
2197 {
2198 Ok(response) => {
2199 let answer = response.text();
2200 let _ = tx
2201 .send(AgentEvent::BtwAnswer {
2202 question: question.clone(),
2203 answer: answer.clone(),
2204 usage: response.usage,
2205 })
2206 .await;
2207 let _ = tx
2208 .send(AgentEvent::End {
2209 text: answer,
2210 usage: crate::llm::TokenUsage::default(),
2211 verification_summary: Box::new(
2212 crate::verification::VerificationSummary::from_reports(
2213 &[],
2214 ),
2215 ),
2216 meta: None,
2217 })
2218 .await;
2219 }
2220 Err(e) => {
2221 let _ = tx
2222 .send(AgentEvent::Error {
2223 message: format!("btw failed: {e}"),
2224 })
2225 .await;
2226 }
2227 }
2228 });
2229 return Ok((rx, handle));
2230 }
2231
2232 let handle = tokio::spawn(async move {
2233 let _ = tx
2234 .send(AgentEvent::TextDelta {
2235 text: output.text.clone(),
2236 })
2237 .await;
2238 let _ = tx
2239 .send(AgentEvent::End {
2240 text: output.text.clone(),
2241 usage: crate::llm::TokenUsage::default(),
2242 verification_summary: Box::new(
2243 crate::verification::VerificationSummary::from_reports(&[]),
2244 ),
2245 meta: None,
2246 })
2247 .await;
2248 });
2249 return Ok((rx, handle));
2250 }
2251 }
2252
2253 let (tx, rx) = mpsc::channel(256);
2254 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2255 let agent_loop = self.build_agent_loop();
2256 let use_internal = history.is_none();
2257 let effective_history = match history {
2258 Some(h) => h.to_vec(),
2259 None => read_or_recover(&self.history).clone(),
2260 };
2261 let prompt = prompt.to_string();
2262 let session_id = self.session_id.clone();
2263 let persistence = use_internal.then(|| SessionPersistenceContext::from_session(self));
2264
2265 let cancel_token = tokio_util::sync::CancellationToken::new();
2266 *self.cancel_token.lock().await = Some(cancel_token.clone());
2267 let token_clone = cancel_token.clone();
2268 let runtime_state = Arc::clone(&self.active_tools);
2269 let forwarder = tokio::spawn(async move {
2270 while let Some(event) = runtime_rx.recv().await {
2271 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2272 if tx.send(event).await.is_err() {
2273 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2276 break;
2277 }
2278 }
2279 });
2280
2281 let handle = tokio::spawn(async move {
2282 let result = agent_loop
2283 .execute_with_session(
2284 &effective_history,
2285 &prompt,
2286 Some(&session_id),
2287 Some(runtime_tx),
2288 Some(&token_clone),
2289 )
2290 .await;
2291 if let (Some(persistence), Ok(result)) = (persistence, result) {
2292 persistence.record_result(&result);
2293 persistence.auto_save_if_enabled().await;
2294 }
2295 });
2296
2297 let cancel_token_ref = self.cancel_token.clone();
2299 let active_tools = Arc::clone(&self.active_tools);
2300 let wrapped_handle = tokio::spawn(async move {
2301 let _ = handle.await;
2302 let _ = forwarder.await;
2303 *cancel_token_ref.lock().await = None;
2304 active_tools.write().await.clear();
2305 });
2306
2307 Ok((rx, wrapped_handle))
2308 }
2309
2310 pub async fn cancel(&self) -> bool {
2317 let token = self.cancel_token.lock().await.clone();
2318 if let Some(token) = token {
2319 token.cancel();
2320 tracing::info!(session_id = %self.session_id, "Cancelled ongoing operation");
2321 true
2322 } else {
2323 tracing::debug!(session_id = %self.session_id, "No ongoing operation to cancel");
2324 false
2325 }
2326 }
2327
2328 pub fn history(&self) -> Vec<Message> {
2330 read_or_recover(&self.history).clone()
2331 }
2332
2333 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
2335 self.memory.as_ref()
2336 }
2337
2338 pub fn id(&self) -> &str {
2340 &self.session_id
2341 }
2342
2343 pub fn workspace(&self) -> &std::path::Path {
2345 &self.workspace
2346 }
2347
2348 pub fn init_warning(&self) -> Option<&str> {
2350 self.init_warning.as_deref()
2351 }
2352
2353 pub fn session_id(&self) -> &str {
2355 &self.session_id
2356 }
2357
2358 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
2364 self.tool_executor.definitions()
2365 }
2366
2367 pub fn tool_names(&self) -> Vec<String> {
2373 self.tool_executor
2374 .definitions()
2375 .into_iter()
2376 .map(|t| t.name)
2377 .collect()
2378 }
2379
2380 pub fn get_artifact(&self, artifact_uri: &str) -> Option<crate::tools::ToolArtifact> {
2382 self.tool_executor.get_artifact(artifact_uri)
2383 }
2384
2385 pub fn trace_events(&self) -> Vec<crate::trace::TraceEvent> {
2387 self.trace_sink.events()
2388 }
2389
2390 pub fn verification_reports(&self) -> Vec<crate::verification::VerificationReport> {
2392 read_or_recover(&self.verification_reports).clone()
2393 }
2394
2395 pub fn verification_summary(&self) -> crate::verification::VerificationSummary {
2397 crate::verification::VerificationSummary::from_reports(&self.verification_reports())
2398 }
2399
2400 pub fn verification_summary_text(&self) -> String {
2402 crate::verification::format_verification_summary(&self.verification_summary())
2403 }
2404
2405 pub fn record_verification_reports(
2407 &self,
2408 reports: impl IntoIterator<Item = crate::verification::VerificationReport>,
2409 ) {
2410 let mut target = write_or_recover(&self.verification_reports);
2411 target.extend(reports);
2412 }
2413
2414 pub fn register_hook(&self, hook: crate::hooks::Hook) {
2420 self.hook_engine.register(hook);
2421 }
2422
2423 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
2425 self.hook_engine.unregister(hook_id)
2426 }
2427
2428 pub fn register_hook_handler(
2430 &self,
2431 hook_id: &str,
2432 handler: Arc<dyn crate::hooks::HookHandler>,
2433 ) {
2434 self.hook_engine.register_handler(hook_id, handler);
2435 }
2436
2437 pub fn unregister_hook_handler(&self, hook_id: &str) {
2439 self.hook_engine.unregister_handler(hook_id);
2440 }
2441
2442 pub fn hook_count(&self) -> usize {
2444 self.hook_engine.hook_count()
2445 }
2446
2447 pub async fn save(&self) -> Result<()> {
2451 SessionPersistenceContext::from_session(self).save().await
2452 }
2453
2454 pub async fn read_file(&self, path: &str) -> Result<String> {
2456 let args = serde_json::json!({ "file_path": path });
2457 let result = self.tool_executor.execute("read", &args).await?;
2458 Ok(result.output)
2459 }
2460
2461 pub async fn bash(&self, command: &str) -> Result<String> {
2467 let args = serde_json::json!({ "command": command });
2468 let result = self
2469 .tool_executor
2470 .execute_with_context("bash", &args, &self.tool_context)
2471 .await?;
2472 Ok(result.output)
2473 }
2474
2475 pub async fn verify_commands(
2477 &self,
2478 subject: &str,
2479 commands: &[crate::verification::VerificationCommand],
2480 ) -> Result<crate::verification::VerificationReport> {
2481 let mut checks = Vec::with_capacity(commands.len());
2482
2483 for command in commands {
2484 let mut args = serde_json::json!({ "command": command.command });
2485 if let Some(timeout_ms) = command.timeout_ms {
2486 args["timeout"] = serde_json::json!(timeout_ms);
2487 }
2488
2489 let check = match self
2490 .tool_executor
2491 .execute_with_context("bash", &args, &self.tool_context)
2492 .await
2493 {
2494 Ok(result) => {
2495 let exit_code = result
2496 .metadata
2497 .as_ref()
2498 .and_then(|metadata| metadata.get("exit_code"))
2499 .and_then(|value| value.as_i64())
2500 .and_then(|value| i32::try_from(value).ok())
2501 .unwrap_or(result.exit_code);
2502 command.check_from_execution(exit_code, result.metadata.as_ref(), None)
2503 }
2504 Err(err) => command.check_from_execution(1, None, Some(&err.to_string())),
2505 };
2506 checks.push(check);
2507 }
2508
2509 let report = crate::verification::VerificationReport::new(subject, checks);
2510 self.record_verification_reports([report.clone()]);
2511 if self.auto_save {
2512 if let Err(e) = self.save().await {
2513 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2514 }
2515 }
2516
2517 Ok(report)
2518 }
2519
2520 pub fn verification_presets(&self) -> Vec<crate::verification::VerificationPreset> {
2522 crate::verification::verification_presets_for_workspace(&self.workspace)
2523 }
2524
2525 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
2527 let args = serde_json::json!({ "pattern": pattern });
2528 let result = self.tool_executor.execute("glob", &args).await?;
2529 let files: Vec<String> = result
2530 .output
2531 .lines()
2532 .filter(|l| !l.is_empty())
2533 .map(|l| l.to_string())
2534 .collect();
2535 Ok(files)
2536 }
2537
2538 pub async fn grep(&self, pattern: &str) -> Result<String> {
2540 let args = serde_json::json!({ "pattern": pattern });
2541 let result = self.tool_executor.execute("grep", &args).await?;
2542 Ok(result.output)
2543 }
2544
2545 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
2547 let result = self.tool_executor.execute(name, &args).await?;
2548 Ok(ToolCallResult {
2549 name: name.to_string(),
2550 output: result.output,
2551 exit_code: result.exit_code,
2552 metadata: result.metadata,
2553 })
2554 }
2555
2556 pub fn has_queue(&self) -> bool {
2562 self.command_queue.is_some()
2563 }
2564
2565 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
2569 if let Some(ref queue) = self.command_queue {
2570 queue.set_lane_handler(lane, config).await;
2571 }
2572 }
2573
2574 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
2578 if let Some(ref queue) = self.command_queue {
2579 queue.complete_external_task(task_id, result).await
2580 } else {
2581 false
2582 }
2583 }
2584
2585 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
2587 if let Some(ref queue) = self.command_queue {
2588 queue.pending_external_tasks().await
2589 } else {
2590 Vec::new()
2591 }
2592 }
2593
2594 pub async fn queue_stats(&self) -> SessionQueueStats {
2596 if let Some(ref queue) = self.command_queue {
2597 queue.stats().await
2598 } else {
2599 SessionQueueStats::default()
2600 }
2601 }
2602
2603 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
2605 if let Some(ref queue) = self.command_queue {
2606 queue.metrics_snapshot().await
2607 } else {
2608 None
2609 }
2610 }
2611
2612 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
2614 if let Some(ref queue) = self.command_queue {
2615 queue.dead_letters().await
2616 } else {
2617 Vec::new()
2618 }
2619 }
2620
2621 pub fn register_agent_dir(&self, dir: &std::path::Path) -> usize {
2634 use crate::subagent::load_agents_from_dir;
2635 let agents = load_agents_from_dir(dir);
2636 let count = agents.len();
2637 for agent in agents {
2638 tracing::info!(
2639 session_id = %self.session_id,
2640 agent = agent.name,
2641 dir = %dir.display(),
2642 "Dynamically registered agent"
2643 );
2644 self.agent_registry.register(agent);
2645 }
2646 count
2647 }
2648
2649 pub async fn add_mcp_server(
2656 &self,
2657 config: crate::mcp::McpServerConfig,
2658 ) -> crate::error::Result<usize> {
2659 let server_name = config.name.clone();
2660 self.mcp_manager.register_server(config).await;
2661 self.mcp_manager.connect(&server_name).await.map_err(|e| {
2662 crate::error::CodeError::Tool {
2663 tool: server_name.clone(),
2664 message: format!("Failed to connect MCP server: {}", e),
2665 }
2666 })?;
2667
2668 let tools = self.mcp_manager.get_server_tools(&server_name).await;
2669 let count = tools.len();
2670
2671 for tool in
2672 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
2673 {
2674 self.tool_executor.register_dynamic_tool(tool);
2675 }
2676
2677 tracing::info!(
2678 session_id = %self.session_id,
2679 server = server_name,
2680 tools = count,
2681 "MCP server added to live session"
2682 );
2683
2684 Ok(count)
2685 }
2686
2687 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
2692 self.tool_executor
2693 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
2694 self.mcp_manager
2695 .disconnect(server_name)
2696 .await
2697 .map_err(|e| crate::error::CodeError::Tool {
2698 tool: server_name.to_string(),
2699 message: format!("Failed to disconnect MCP server: {}", e),
2700 })?;
2701 tracing::info!(
2702 session_id = %self.session_id,
2703 server = server_name,
2704 "MCP server removed from live session"
2705 );
2706 Ok(())
2707 }
2708
2709 pub async fn mcp_status(
2711 &self,
2712 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
2713 self.mcp_manager.get_status().await
2714 }
2715}
2716
2717#[cfg(test)]
2722mod tests {
2723 use super::*;
2724 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
2725 use crate::llm::{ContentBlock, LlmResponse, StreamEvent, TokenUsage};
2726 use crate::store::SessionStore;
2727
2728 #[derive(Clone)]
2729 struct StaticStreamingClient {
2730 text: String,
2731 }
2732
2733 impl StaticStreamingClient {
2734 fn new(text: impl Into<String>) -> Self {
2735 Self { text: text.into() }
2736 }
2737
2738 fn response(&self) -> LlmResponse {
2739 LlmResponse {
2740 message: Message {
2741 role: "assistant".to_string(),
2742 content: vec![ContentBlock::Text {
2743 text: self.text.clone(),
2744 }],
2745 reasoning_content: None,
2746 },
2747 usage: TokenUsage {
2748 prompt_tokens: 1,
2749 completion_tokens: 1,
2750 total_tokens: 2,
2751 cache_read_tokens: None,
2752 cache_write_tokens: None,
2753 },
2754 stop_reason: Some("end_turn".to_string()),
2755 meta: None,
2756 }
2757 }
2758 }
2759
2760 #[derive(Clone)]
2761 struct FailingStreamingClient;
2762
2763 #[derive(Clone)]
2764 struct CancellableStreamingClient {
2765 text: String,
2766 }
2767
2768 impl CancellableStreamingClient {
2769 fn new(text: impl Into<String>) -> Self {
2770 Self { text: text.into() }
2771 }
2772 }
2773
2774 #[async_trait::async_trait]
2775 impl LlmClient for StaticStreamingClient {
2776 async fn complete(
2777 &self,
2778 _messages: &[Message],
2779 _system: Option<&str>,
2780 _tools: &[crate::llm::ToolDefinition],
2781 ) -> anyhow::Result<LlmResponse> {
2782 Ok(self.response())
2783 }
2784
2785 async fn complete_streaming(
2786 &self,
2787 _messages: &[Message],
2788 _system: Option<&str>,
2789 _tools: &[crate::llm::ToolDefinition],
2790 _cancel_token: tokio_util::sync::CancellationToken,
2791 ) -> anyhow::Result<mpsc::Receiver<StreamEvent>> {
2792 let (tx, rx) = mpsc::channel(8);
2793 let text = self.text.clone();
2794 let response = self.response();
2795 tokio::spawn(async move {
2796 let _ = tx.send(StreamEvent::TextDelta(text)).await;
2797 let _ = tx.send(StreamEvent::Done(response)).await;
2798 });
2799 Ok(rx)
2800 }
2801 }
2802
2803 #[async_trait::async_trait]
2804 impl LlmClient for FailingStreamingClient {
2805 async fn complete(
2806 &self,
2807 _messages: &[Message],
2808 _system: Option<&str>,
2809 _tools: &[crate::llm::ToolDefinition],
2810 ) -> anyhow::Result<LlmResponse> {
2811 anyhow::bail!("non-streaming fallback failed")
2812 }
2813
2814 async fn complete_streaming(
2815 &self,
2816 _messages: &[Message],
2817 _system: Option<&str>,
2818 _tools: &[crate::llm::ToolDefinition],
2819 _cancel_token: tokio_util::sync::CancellationToken,
2820 ) -> anyhow::Result<mpsc::Receiver<StreamEvent>> {
2821 anyhow::bail!("streaming setup failed")
2822 }
2823 }
2824
2825 #[async_trait::async_trait]
2826 impl LlmClient for CancellableStreamingClient {
2827 async fn complete(
2828 &self,
2829 _messages: &[Message],
2830 _system: Option<&str>,
2831 _tools: &[crate::llm::ToolDefinition],
2832 ) -> anyhow::Result<LlmResponse> {
2833 anyhow::bail!("cancellable client does not support fallback completion")
2834 }
2835
2836 async fn complete_streaming(
2837 &self,
2838 _messages: &[Message],
2839 _system: Option<&str>,
2840 _tools: &[crate::llm::ToolDefinition],
2841 cancel_token: tokio_util::sync::CancellationToken,
2842 ) -> anyhow::Result<mpsc::Receiver<StreamEvent>> {
2843 let (tx, rx) = mpsc::channel(8);
2844 let text = self.text.clone();
2845 tokio::spawn(async move {
2846 let _ = tx.send(StreamEvent::TextDelta(text)).await;
2847 cancel_token.cancelled().await;
2848 });
2849 Ok(rx)
2850 }
2851 }
2852
2853 fn test_config() -> CodeConfig {
2854 CodeConfig {
2855 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
2856 providers: vec![
2857 ProviderConfig {
2858 name: "anthropic".to_string(),
2859 api_key: Some("test-key".to_string()),
2860 base_url: None,
2861 headers: std::collections::HashMap::new(),
2862 session_id_header: None,
2863 models: vec![ModelConfig {
2864 id: "claude-sonnet-4-20250514".to_string(),
2865 name: "Claude Sonnet 4".to_string(),
2866 family: "claude-sonnet".to_string(),
2867 api_key: None,
2868 base_url: None,
2869 headers: std::collections::HashMap::new(),
2870 session_id_header: None,
2871 attachment: false,
2872 reasoning: false,
2873 tool_call: true,
2874 temperature: true,
2875 release_date: None,
2876 modalities: ModelModalities::default(),
2877 cost: Default::default(),
2878 limit: Default::default(),
2879 }],
2880 },
2881 ProviderConfig {
2882 name: "openai".to_string(),
2883 api_key: Some("test-openai-key".to_string()),
2884 base_url: None,
2885 headers: std::collections::HashMap::new(),
2886 session_id_header: None,
2887 models: vec![ModelConfig {
2888 id: "gpt-4o".to_string(),
2889 name: "GPT-4o".to_string(),
2890 family: "gpt-4".to_string(),
2891 api_key: None,
2892 base_url: None,
2893 headers: std::collections::HashMap::new(),
2894 session_id_header: None,
2895 attachment: false,
2896 reasoning: false,
2897 tool_call: true,
2898 temperature: true,
2899 release_date: None,
2900 modalities: ModelModalities::default(),
2901 cost: Default::default(),
2902 limit: Default::default(),
2903 }],
2904 },
2905 ],
2906 ..Default::default()
2907 }
2908 }
2909
2910 fn build_effective_registry_for_test(
2911 agent_registry: Option<Arc<crate::skills::SkillRegistry>>,
2912 opts: &SessionOptions,
2913 ) -> Arc<crate::skills::SkillRegistry> {
2914 let base_registry = agent_registry
2915 .as_deref()
2916 .map(|r| r.fork())
2917 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
2918 if let Some(ref r) = opts.skill_registry {
2919 for skill in r.all() {
2920 base_registry.register_unchecked(skill);
2921 }
2922 }
2923 for dir in &opts.skill_dirs {
2924 if let Err(e) = base_registry.load_from_dir(dir) {
2925 tracing::warn!(
2926 dir = %dir.display(),
2927 error = %e,
2928 "Failed to load session skill dir — skipping"
2929 );
2930 }
2931 }
2932 Arc::new(base_registry)
2933 }
2934
2935 #[tokio::test]
2936 async fn test_from_config() {
2937 let agent = Agent::from_config(test_config()).await;
2938 assert!(agent.is_ok());
2939 }
2940
2941 #[tokio::test]
2942 async fn test_session_default() {
2943 let agent = Agent::from_config(test_config()).await.unwrap();
2944 let session = agent.session("/tmp/test-workspace", None);
2945 assert!(session.is_ok());
2946 let debug = format!("{:?}", session.unwrap());
2947 assert!(debug.contains("AgentSession"));
2948 }
2949
2950 #[tokio::test]
2951 async fn test_session_routes_agents_md_through_context_provider() {
2952 let temp_dir = tempfile::tempdir().unwrap();
2953 std::fs::write(
2954 temp_dir.path().join("AGENTS.md"),
2955 "Always run focused tests before reporting completion.",
2956 )
2957 .unwrap();
2958
2959 let agent = Agent::from_config(test_config()).await.unwrap();
2960 let session = agent
2961 .session(temp_dir.path().display().to_string(), None)
2962 .unwrap();
2963
2964 let agents_provider = session
2965 .config
2966 .context_providers
2967 .iter()
2968 .find(|provider| provider.name() == "agents_md")
2969 .expect("AGENTS.md provider should be registered");
2970 assert!(!session
2971 .config
2972 .prompt_slots
2973 .extra
2974 .as_deref()
2975 .unwrap_or_default()
2976 .contains("Project Instructions (AGENTS.md)"));
2977
2978 let result = agents_provider
2979 .query(&crate::context::ContextQuery::new("complete the task"))
2980 .await
2981 .unwrap();
2982
2983 assert_eq!(result.items.len(), 1);
2984 assert_eq!(result.items[0].id, "agents_md");
2985 assert!(result.items[0]
2986 .content
2987 .contains("Always run focused tests before reporting completion."));
2988 assert_eq!(result.items[0].relevance, 0.95);
2989 }
2990
2991 #[tokio::test]
2992 async fn test_session_registers_agentic_tools_by_default() {
2993 let agent = Agent::from_config(test_config()).await.unwrap();
2996 let _session = agent.session("/tmp/test-workspace", None).unwrap();
2997 }
2999
3000 #[tokio::test]
3001 async fn test_session_can_disable_agentic_tools_via_config() {
3002 let mut config = test_config();
3005 config.agentic_search = Some(crate::config::AgenticSearchConfig {
3006 enabled: false,
3007 ..Default::default()
3008 });
3009 config.agentic_parse = Some(crate::config::AgenticParseConfig {
3010 enabled: false,
3011 ..Default::default()
3012 });
3013
3014 let agent = Agent::from_config(config).await.unwrap();
3015 let _session = agent.session("/tmp/test-workspace", None).unwrap();
3016 }
3018
3019 #[tokio::test]
3020 async fn test_session_with_model_override() {
3021 let agent = Agent::from_config(test_config()).await.unwrap();
3022 let opts = SessionOptions::new().with_model("openai/gpt-4o");
3023 let session = agent.session("/tmp/test-workspace", Some(opts));
3024 assert!(session.is_ok());
3025 }
3026
3027 #[tokio::test]
3028 async fn test_session_with_invalid_model_format() {
3029 let agent = Agent::from_config(test_config()).await.unwrap();
3030 let opts = SessionOptions::new().with_model("gpt-4o");
3031 let session = agent.session("/tmp/test-workspace", Some(opts));
3032 assert!(session.is_err());
3033 }
3034
3035 #[tokio::test]
3036 async fn test_session_with_model_not_found() {
3037 let agent = Agent::from_config(test_config()).await.unwrap();
3038 let opts = SessionOptions::new().with_model("openai/nonexistent");
3039 let session = agent.session("/tmp/test-workspace", Some(opts));
3040 assert!(session.is_err());
3041 }
3042
3043 #[tokio::test]
3044 async fn test_session_preserves_skill_scorer_from_agent_registry() {
3045 use crate::skills::feedback::{
3046 DefaultSkillScorer, SkillFeedback, SkillOutcome, SkillScorer,
3047 };
3048 use crate::skills::{Skill, SkillKind, SkillRegistry};
3049
3050 let registry = Arc::new(SkillRegistry::new());
3051 let scorer = Arc::new(DefaultSkillScorer::default());
3052 registry.set_scorer(scorer.clone());
3053
3054 registry.register_unchecked(Arc::new(Skill {
3055 name: "healthy-skill".to_string(),
3056 description: "healthy".to_string(),
3057 allowed_tools: None,
3058 disable_model_invocation: false,
3059 kind: SkillKind::Instruction,
3060 content: "healthy".to_string(),
3061 tags: vec![],
3062 version: None,
3063 }));
3064 registry.register_unchecked(Arc::new(Skill {
3065 name: "disabled-skill".to_string(),
3066 description: "disabled".to_string(),
3067 allowed_tools: None,
3068 disable_model_invocation: false,
3069 kind: SkillKind::Instruction,
3070 content: "disabled".to_string(),
3071 tags: vec![],
3072 version: None,
3073 }));
3074
3075 for _ in 0..5 {
3076 scorer.record(SkillFeedback {
3077 skill_name: "disabled-skill".to_string(),
3078 outcome: SkillOutcome::Failure,
3079 score_delta: -1.0,
3080 reason: "bad".to_string(),
3081 timestamp: 0,
3082 });
3083 }
3084
3085 let effective_registry =
3086 build_effective_registry_for_test(Some(registry), &SessionOptions::new());
3087 let matches = effective_registry.search("healthy disabled", 10);
3088 let names = matches
3089 .iter()
3090 .map(|skill| skill.name.as_str())
3091 .collect::<Vec<_>>();
3092
3093 assert!(effective_registry.scorer().is_some());
3094 assert!(names.contains(&"healthy-skill"));
3095 assert!(!names.contains(&"disabled-skill"));
3096 }
3097
3098 #[tokio::test]
3099 async fn test_session_skill_dirs_preserve_agent_registry_validator() {
3100 use crate::skills::validator::DefaultSkillValidator;
3101 use crate::skills::SkillRegistry;
3102
3103 let registry = Arc::new(SkillRegistry::new());
3104 registry.set_validator(Arc::new(DefaultSkillValidator::default()));
3105
3106 let temp_dir = tempfile::tempdir().unwrap();
3107 let invalid_skill = temp_dir.path().join("invalid.md");
3108 std::fs::write(
3109 &invalid_skill,
3110 r#"---
3111name: BadName
3112description: "invalid skill name"
3113kind: instruction
3114---
3115# Invalid Skill
3116"#,
3117 )
3118 .unwrap();
3119
3120 let opts = SessionOptions::new().with_skill_dirs([temp_dir.path()]);
3121 let effective_registry = build_effective_registry_for_test(Some(registry), &opts);
3122 assert!(effective_registry.get("BadName").is_none());
3123 }
3124
3125 #[tokio::test]
3126 async fn test_session_skill_registry_overrides_agent_registry_without_polluting_parent() {
3127 use crate::skills::{Skill, SkillKind, SkillRegistry};
3128
3129 let registry = Arc::new(SkillRegistry::new());
3130 registry.register_unchecked(Arc::new(Skill {
3131 name: "shared-skill".to_string(),
3132 description: "agent level".to_string(),
3133 allowed_tools: None,
3134 disable_model_invocation: false,
3135 kind: SkillKind::Instruction,
3136 content: "agent content".to_string(),
3137 tags: vec![],
3138 version: None,
3139 }));
3140
3141 let session_registry = Arc::new(SkillRegistry::new());
3142 session_registry.register_unchecked(Arc::new(Skill {
3143 name: "shared-skill".to_string(),
3144 description: "session level".to_string(),
3145 allowed_tools: None,
3146 disable_model_invocation: false,
3147 kind: SkillKind::Instruction,
3148 content: "session content".to_string(),
3149 tags: vec![],
3150 version: None,
3151 }));
3152
3153 let opts = SessionOptions::new().with_skill_registry(session_registry);
3154 let effective_registry = build_effective_registry_for_test(Some(registry.clone()), &opts);
3155
3156 assert_eq!(
3157 effective_registry.get("shared-skill").unwrap().content,
3158 "session content"
3159 );
3160 assert_eq!(
3161 registry.get("shared-skill").unwrap().content,
3162 "agent content"
3163 );
3164 }
3165
3166 #[tokio::test]
3167 async fn test_session_skill_dirs_override_session_registry_and_skip_invalid_entries() {
3168 use crate::skills::{Skill, SkillKind, SkillRegistry};
3169
3170 let session_registry = Arc::new(SkillRegistry::new());
3171 session_registry.register_unchecked(Arc::new(Skill {
3172 name: "shared-skill".to_string(),
3173 description: "session registry".to_string(),
3174 allowed_tools: None,
3175 disable_model_invocation: false,
3176 kind: SkillKind::Instruction,
3177 content: "registry content".to_string(),
3178 tags: vec![],
3179 version: None,
3180 }));
3181
3182 let temp_dir = tempfile::tempdir().unwrap();
3183 std::fs::write(
3184 temp_dir.path().join("shared.md"),
3185 r#"---
3186name: shared-skill
3187description: "skill dir override"
3188kind: instruction
3189---
3190# Shared Skill
3191dir content
3192"#,
3193 )
3194 .unwrap();
3195 std::fs::write(temp_dir.path().join("README.md"), "# not a skill").unwrap();
3196
3197 let opts = SessionOptions::new()
3198 .with_skill_registry(session_registry)
3199 .with_skill_dirs([temp_dir.path()]);
3200 let effective_registry = build_effective_registry_for_test(None, &opts);
3201
3202 assert_eq!(
3203 effective_registry.get("shared-skill").unwrap().description,
3204 "skill dir override"
3205 );
3206 assert!(effective_registry.get("README").is_none());
3207 }
3208
3209 #[tokio::test]
3210 async fn test_session_plugin_skills_are_loaded_into_session_registry_only() {
3211 use crate::plugin::{Plugin, PluginContext};
3212 use crate::skills::{Skill, SkillKind, SkillRegistry};
3213 use crate::tools::ToolRegistry;
3214
3215 struct SessionOnlySkillPlugin;
3216
3217 impl Plugin for SessionOnlySkillPlugin {
3218 fn name(&self) -> &str {
3219 "session-only-skill"
3220 }
3221
3222 fn version(&self) -> &str {
3223 "0.1.0"
3224 }
3225
3226 fn tool_names(&self) -> &[&str] {
3227 &[]
3228 }
3229
3230 fn load(
3231 &self,
3232 _registry: &Arc<ToolRegistry>,
3233 _ctx: &PluginContext,
3234 ) -> anyhow::Result<()> {
3235 Ok(())
3236 }
3237
3238 fn skills(&self) -> Vec<Arc<Skill>> {
3239 vec![Arc::new(Skill {
3240 name: "plugin-session-skill".to_string(),
3241 description: "plugin skill".to_string(),
3242 allowed_tools: None,
3243 disable_model_invocation: false,
3244 kind: SkillKind::Instruction,
3245 content: "plugin content".to_string(),
3246 tags: vec!["plugin".to_string()],
3247 version: None,
3248 })]
3249 }
3250 }
3251
3252 let mut agent = Agent::from_config(test_config()).await.unwrap();
3253 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3254 agent.config.skill_registry = Some(Arc::clone(&agent_registry));
3255
3256 let opts = SessionOptions::new().with_plugin(SessionOnlySkillPlugin);
3257 let session = agent.session("/tmp/test-workspace", Some(opts)).unwrap();
3258
3259 let session_registry = session.config.skill_registry.as_ref().unwrap();
3260 assert!(session_registry.get("plugin-session-skill").is_some());
3261 assert!(agent_registry.get("plugin-session-skill").is_none());
3262 }
3263
3264 #[tokio::test]
3265 async fn test_session_specific_skills_do_not_leak_across_sessions() {
3266 use crate::skills::{Skill, SkillKind, SkillRegistry};
3267
3268 let mut agent = Agent::from_config(test_config()).await.unwrap();
3269 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3270 agent.config.skill_registry = Some(agent_registry);
3271
3272 let session_registry = Arc::new(SkillRegistry::new());
3273 session_registry.register_unchecked(Arc::new(Skill {
3274 name: "session-only".to_string(),
3275 description: "only for first session".to_string(),
3276 allowed_tools: None,
3277 disable_model_invocation: false,
3278 kind: SkillKind::Instruction,
3279 content: "session one".to_string(),
3280 tags: vec![],
3281 version: None,
3282 }));
3283
3284 let session_one = agent
3285 .session(
3286 "/tmp/test-workspace",
3287 Some(SessionOptions::new().with_skill_registry(session_registry)),
3288 )
3289 .unwrap();
3290 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3291
3292 assert!(session_one
3293 .config
3294 .skill_registry
3295 .as_ref()
3296 .unwrap()
3297 .get("session-only")
3298 .is_some());
3299 assert!(session_two
3300 .config
3301 .skill_registry
3302 .as_ref()
3303 .unwrap()
3304 .get("session-only")
3305 .is_none());
3306 }
3307
3308 #[tokio::test]
3309 async fn test_plugin_skills_do_not_leak_across_sessions() {
3310 use crate::plugin::{Plugin, PluginContext};
3311 use crate::skills::{Skill, SkillKind, SkillRegistry};
3312 use crate::tools::ToolRegistry;
3313
3314 struct LeakyPlugin;
3315
3316 impl Plugin for LeakyPlugin {
3317 fn name(&self) -> &str {
3318 "leaky-plugin"
3319 }
3320
3321 fn version(&self) -> &str {
3322 "0.1.0"
3323 }
3324
3325 fn tool_names(&self) -> &[&str] {
3326 &[]
3327 }
3328
3329 fn load(
3330 &self,
3331 _registry: &Arc<ToolRegistry>,
3332 _ctx: &PluginContext,
3333 ) -> anyhow::Result<()> {
3334 Ok(())
3335 }
3336
3337 fn skills(&self) -> Vec<Arc<Skill>> {
3338 vec![Arc::new(Skill {
3339 name: "plugin-only".to_string(),
3340 description: "plugin only".to_string(),
3341 allowed_tools: None,
3342 disable_model_invocation: false,
3343 kind: SkillKind::Instruction,
3344 content: "plugin skill".to_string(),
3345 tags: vec![],
3346 version: None,
3347 })]
3348 }
3349 }
3350
3351 let mut agent = Agent::from_config(test_config()).await.unwrap();
3352 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3353
3354 let session_one = agent
3355 .session(
3356 "/tmp/test-workspace",
3357 Some(SessionOptions::new().with_plugin(LeakyPlugin)),
3358 )
3359 .unwrap();
3360 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3361
3362 assert!(session_one
3363 .config
3364 .skill_registry
3365 .as_ref()
3366 .unwrap()
3367 .get("plugin-only")
3368 .is_some());
3369 assert!(session_two
3370 .config
3371 .skill_registry
3372 .as_ref()
3373 .unwrap()
3374 .get("plugin-only")
3375 .is_none());
3376 }
3377
3378 #[tokio::test]
3379 async fn test_session_for_agent_applies_definition_and_keeps_skill_overrides_isolated() {
3380 use crate::skills::{Skill, SkillKind, SkillRegistry};
3381 use crate::subagent::AgentDefinition;
3382
3383 let mut agent = Agent::from_config(test_config()).await.unwrap();
3384 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3385
3386 let definition = AgentDefinition::new("reviewer", "Review code")
3387 .with_prompt("Agent definition prompt")
3388 .with_max_steps(7);
3389
3390 let session_registry = Arc::new(SkillRegistry::new());
3391 session_registry.register_unchecked(Arc::new(Skill {
3392 name: "agent-session-skill".to_string(),
3393 description: "agent session only".to_string(),
3394 allowed_tools: None,
3395 disable_model_invocation: false,
3396 kind: SkillKind::Instruction,
3397 content: "agent session content".to_string(),
3398 tags: vec![],
3399 version: None,
3400 }));
3401
3402 let session_one = agent
3403 .session_for_agent(
3404 "/tmp/test-workspace",
3405 &definition,
3406 Some(SessionOptions::new().with_skill_registry(session_registry)),
3407 )
3408 .unwrap();
3409 let session_two = agent
3410 .session_for_agent("/tmp/test-workspace", &definition, None)
3411 .unwrap();
3412
3413 assert_eq!(session_one.config.max_tool_rounds, 7);
3414 let extra = session_one.config.prompt_slots.extra.as_deref().unwrap();
3415 assert!(extra.contains("Agent definition prompt"));
3416 assert!(!extra.contains("agent-session-skill"));
3417 assert!(session_one
3418 .config
3419 .context_providers
3420 .iter()
3421 .any(|provider| provider.name() == "skills_catalog"));
3422 assert!(session_one
3423 .config
3424 .skill_registry
3425 .as_ref()
3426 .unwrap()
3427 .get("agent-session-skill")
3428 .is_some());
3429 assert!(session_two
3430 .config
3431 .skill_registry
3432 .as_ref()
3433 .unwrap()
3434 .get("agent-session-skill")
3435 .is_none());
3436 }
3437
3438 #[tokio::test]
3439 async fn test_session_for_agent_preserves_existing_prompt_slots_when_injecting_definition_prompt(
3440 ) {
3441 use crate::prompts::SystemPromptSlots;
3442 use crate::subagent::AgentDefinition;
3443
3444 let agent = Agent::from_config(test_config()).await.unwrap();
3445 let definition = AgentDefinition::new("planner", "Plan work")
3446 .with_prompt("Definition extra prompt")
3447 .with_max_steps(3);
3448
3449 let opts = SessionOptions::new().with_prompt_slots(SystemPromptSlots {
3450 style: None,
3451 role: Some("Custom role".to_string()),
3452 guidelines: None,
3453 response_style: None,
3454 extra: None,
3455 });
3456
3457 let session = agent
3458 .session_for_agent("/tmp/test-workspace", &definition, Some(opts))
3459 .unwrap();
3460
3461 assert_eq!(
3462 session.config.prompt_slots.role.as_deref(),
3463 Some("Custom role")
3464 );
3465 assert!(session
3466 .config
3467 .prompt_slots
3468 .extra
3469 .as_deref()
3470 .unwrap()
3471 .contains("Definition extra prompt"));
3472 assert_eq!(session.config.max_tool_rounds, 3);
3473 }
3474
3475 #[tokio::test]
3476 async fn test_new_with_acl_string() {
3477 let acl = r#"
3478 default_model = "anthropic/claude-sonnet-4-20250514"
3479 providers "anthropic" {
3480 apiKey = "test-key"
3481 models "claude-sonnet-4-20250514" {
3482 name = "Claude Sonnet 4"
3483 }
3484 }
3485 "#;
3486 let agent = Agent::new(acl).await;
3487 assert!(agent.is_ok());
3488 }
3489
3490 #[tokio::test]
3491 async fn test_create_alias_acl() {
3492 let acl = r#"
3493 default_model = "anthropic/claude-sonnet-4-20250514"
3494 providers "anthropic" {
3495 apiKey = "test-key"
3496 models "claude-sonnet-4-20250514" {
3497 name = "Claude Sonnet 4"
3498 }
3499 }
3500 "#;
3501 let agent = Agent::create(acl).await;
3502 assert!(agent.is_ok());
3503 }
3504
3505 #[tokio::test]
3506 async fn test_create_and_new_produce_same_result() {
3507 let acl = r#"
3508 default_model = "anthropic/claude-sonnet-4-20250514"
3509 providers "anthropic" {
3510 apiKey = "test-key"
3511 models "claude-sonnet-4-20250514" {
3512 name = "Claude Sonnet 4"
3513 }
3514 }
3515 "#;
3516 let agent_new = Agent::new(acl).await;
3517 let agent_create = Agent::create(acl).await;
3518 assert!(agent_new.is_ok());
3519 assert!(agent_create.is_ok());
3520
3521 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
3523 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
3524 assert!(session_new.is_ok());
3525 assert!(session_create.is_ok());
3526 }
3527
3528 #[tokio::test]
3529 async fn test_new_with_existing_acl_file_uses_file_loading() {
3530 let temp_dir = tempfile::tempdir().unwrap();
3531 let config_path = temp_dir.path().join("agent.acl");
3532 std::fs::write(&config_path, "providers {").unwrap();
3533
3534 let err = Agent::new(config_path.display().to_string())
3535 .await
3536 .unwrap_err();
3537 let msg = err.to_string();
3538
3539 assert!(msg.contains("Failed to load config"));
3540 assert!(msg.contains("agent.acl"));
3541 assert!(!msg.contains("Failed to parse config as ACL string"));
3542 }
3543
3544 #[tokio::test]
3545 async fn test_new_with_missing_acl_file_reports_not_found() {
3546 let temp_dir = tempfile::tempdir().unwrap();
3547 let missing_path = temp_dir.path().join("agent.acl");
3548
3549 let err = Agent::new(missing_path.display().to_string())
3550 .await
3551 .unwrap_err();
3552 let msg = err.to_string();
3553
3554 assert!(msg.contains("Config file not found"));
3555 assert!(msg.contains("agent.acl"));
3556 assert!(!msg.contains("Failed to parse config as ACL string"));
3557 }
3558
3559 #[tokio::test]
3560 async fn test_new_rejects_hcl_files() {
3561 let temp_dir = tempfile::tempdir().unwrap();
3562 let config_path = temp_dir.path().join("agent.hcl");
3563 std::fs::write(&config_path, "default_model = \"openai/test\"").unwrap();
3564
3565 let err = Agent::new(config_path.display().to_string())
3566 .await
3567 .unwrap_err();
3568 let msg = err.to_string();
3569
3570 assert!(msg.contains("HCL config files are not supported in 2.0"));
3571 assert!(msg.contains(".acl"));
3572 }
3573
3574 #[test]
3575 fn test_from_config_requires_default_model() {
3576 let rt = tokio::runtime::Runtime::new().unwrap();
3577 let config = CodeConfig {
3578 providers: vec![ProviderConfig {
3579 name: "anthropic".to_string(),
3580 api_key: Some("test-key".to_string()),
3581 base_url: None,
3582 headers: std::collections::HashMap::new(),
3583 session_id_header: None,
3584 models: vec![],
3585 }],
3586 ..Default::default()
3587 };
3588 let result = rt.block_on(Agent::from_config(config));
3589 assert!(result.is_err());
3590 }
3591
3592 #[tokio::test]
3593 async fn test_history_empty_on_new_session() {
3594 let agent = Agent::from_config(test_config()).await.unwrap();
3595 let session = agent.session("/tmp/test-workspace", None).unwrap();
3596 assert!(session.history().is_empty());
3597 }
3598
3599 #[tokio::test]
3600 async fn test_stream_updates_history_and_auto_saves() {
3601 let store = Arc::new(crate::store::MemorySessionStore::new());
3602 let agent = Agent::from_config(test_config()).await.unwrap();
3603 let opts = SessionOptions::new()
3604 .with_session_store(store.clone())
3605 .with_session_id("stream-history-test")
3606 .with_auto_save(true);
3607 let session = agent
3608 .build_session(
3609 "/tmp/test-stream-history".into(),
3610 Arc::new(StaticStreamingClient::new("streamed answer")),
3611 &opts,
3612 )
3613 .unwrap();
3614
3615 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
3616 let mut saw_end = false;
3617 while let Some(event) = rx.recv().await {
3618 if matches!(event, AgentEvent::End { .. }) {
3619 saw_end = true;
3620 break;
3621 }
3622 }
3623 handle.await.unwrap();
3624
3625 assert!(saw_end);
3626 let history = session.history();
3627 assert_eq!(history.len(), 2);
3628 assert_eq!(history[0].text(), "hello");
3629 assert_eq!(history[1].text(), "streamed answer");
3630
3631 let saved = store
3632 .load("stream-history-test")
3633 .await
3634 .unwrap()
3635 .expect("saved session");
3636 assert_eq!(saved.messages.len(), 2);
3637 assert_eq!(saved.messages[1].text(), "streamed answer");
3638 }
3639
3640 #[tokio::test]
3641 async fn test_stream_with_custom_history_does_not_update_session_history() {
3642 let agent = Agent::from_config(test_config()).await.unwrap();
3643 let session = agent
3644 .build_session(
3645 "/tmp/test-stream-custom-history".into(),
3646 Arc::new(StaticStreamingClient::new("custom history answer")),
3647 &SessionOptions::new(),
3648 )
3649 .unwrap();
3650 let custom_history = vec![Message::user("custom prompt")];
3651
3652 let (mut rx, handle) = session
3653 .stream("ignored", Some(&custom_history))
3654 .await
3655 .unwrap();
3656 while let Some(event) = rx.recv().await {
3657 if matches!(event, AgentEvent::End { .. }) {
3658 break;
3659 }
3660 }
3661 handle.await.unwrap();
3662
3663 assert!(session.history().is_empty());
3664 }
3665
3666 #[tokio::test]
3667 async fn test_stream_error_does_not_update_history_or_auto_save() {
3668 let store = Arc::new(crate::store::MemorySessionStore::new());
3669 let agent = Agent::from_config(test_config()).await.unwrap();
3670 let opts = SessionOptions::new()
3671 .with_session_store(store.clone())
3672 .with_session_id("stream-error-test")
3673 .with_auto_save(true);
3674 let session = agent
3675 .build_session(
3676 "/tmp/test-stream-error".into(),
3677 Arc::new(FailingStreamingClient),
3678 &opts,
3679 )
3680 .unwrap();
3681
3682 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
3683 let mut saw_error = false;
3684 while let Some(event) = rx.recv().await {
3685 if matches!(event, AgentEvent::Error { .. }) {
3686 saw_error = true;
3687 break;
3688 }
3689 }
3690 handle.await.unwrap();
3691
3692 assert!(saw_error);
3693 assert!(session.history().is_empty());
3694 assert!(store.load("stream-error-test").await.unwrap().is_none());
3695 }
3696
3697 #[tokio::test]
3698 async fn test_stream_cancel_does_not_update_history_or_auto_save() {
3699 let store = Arc::new(crate::store::MemorySessionStore::new());
3700 let agent = Agent::from_config(test_config()).await.unwrap();
3701 let opts = SessionOptions::new()
3702 .with_session_store(store.clone())
3703 .with_session_id("stream-cancel-test")
3704 .with_auto_save(true);
3705 let session = agent
3706 .build_session(
3707 "/tmp/test-stream-cancel".into(),
3708 Arc::new(CancellableStreamingClient::new("partial answer")),
3709 &opts,
3710 )
3711 .unwrap();
3712
3713 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
3714 let mut saw_delta = false;
3715 for _ in 0..16 {
3716 let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
3717 .await
3718 .expect("stream event before timeout")
3719 .expect("stream should stay open until cancelled");
3720 if matches!(event, AgentEvent::TextDelta { ref text } if text == "partial answer") {
3721 saw_delta = true;
3722 break;
3723 }
3724 }
3725 assert!(saw_delta);
3726 assert!(session.cancel().await);
3727
3728 while rx.recv().await.is_some() {}
3729 handle.await.unwrap();
3730
3731 assert!(session.history().is_empty());
3732 assert!(store.load("stream-cancel-test").await.unwrap().is_none());
3733 assert!(!session.cancel().await);
3734 }
3735
3736 #[tokio::test]
3737 async fn test_session_options_with_agent_dir() {
3738 let opts = SessionOptions::new()
3739 .with_agent_dir("/tmp/agents")
3740 .with_agent_dir("/tmp/more-agents");
3741 assert_eq!(opts.agent_dirs.len(), 2);
3742 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
3743 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
3744 }
3745
3746 #[test]
3751 fn test_session_options_with_queue_config() {
3752 let qc = SessionQueueConfig::default().with_lane_features();
3753 let opts = SessionOptions::new().with_queue_config(qc.clone());
3754 assert!(opts.queue_config.is_some());
3755
3756 let config = opts.queue_config.unwrap();
3757 assert!(config.enable_dlq);
3758 assert!(config.enable_metrics);
3759 assert!(config.enable_alerts);
3760 assert_eq!(config.default_timeout_ms, Some(60_000));
3761 }
3762
3763 #[tokio::test]
3764 async fn test_session_uses_single_delegation_tool_surface() {
3765 let agent = Agent::from_config(test_config()).await.unwrap();
3766 let session = agent
3767 .session("/tmp/test-workspace-delegation-tools", None)
3768 .unwrap();
3769 let names = session.tool_names();
3770
3771 assert!(names.contains(&"task".to_string()));
3772 assert!(names.contains(&"parallel_task".to_string()));
3773 assert!(!names.contains(&"run_team".to_string()));
3774 }
3775
3776 #[tokio::test(flavor = "multi_thread")]
3777 async fn test_session_with_queue_config() {
3778 let agent = Agent::from_config(test_config()).await.unwrap();
3779 let qc = SessionQueueConfig::default();
3780 let opts = SessionOptions::new().with_queue_config(qc);
3781 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
3782 assert!(session.is_ok());
3783 let session = session.unwrap();
3784 assert!(session.has_queue());
3785 }
3786
3787 #[tokio::test]
3788 async fn test_session_without_queue_config() {
3789 let agent = Agent::from_config(test_config()).await.unwrap();
3790 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
3791 assert!(!session.has_queue());
3792 }
3793
3794 #[tokio::test]
3795 async fn test_session_queue_stats_without_queue() {
3796 let agent = Agent::from_config(test_config()).await.unwrap();
3797 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
3798 let stats = session.queue_stats().await;
3799 assert_eq!(stats.total_pending, 0);
3801 assert_eq!(stats.total_active, 0);
3802 }
3803
3804 #[tokio::test(flavor = "multi_thread")]
3805 async fn test_session_queue_stats_with_queue() {
3806 let agent = Agent::from_config(test_config()).await.unwrap();
3807 let qc = SessionQueueConfig::default();
3808 let opts = SessionOptions::new().with_queue_config(qc);
3809 let session = agent
3810 .session("/tmp/test-workspace-qstats", Some(opts))
3811 .unwrap();
3812 let stats = session.queue_stats().await;
3813 assert_eq!(stats.total_pending, 0);
3815 assert_eq!(stats.total_active, 0);
3816 }
3817
3818 #[tokio::test(flavor = "multi_thread")]
3819 async fn test_session_pending_external_tasks_empty() {
3820 let agent = Agent::from_config(test_config()).await.unwrap();
3821 let qc = SessionQueueConfig::default();
3822 let opts = SessionOptions::new().with_queue_config(qc);
3823 let session = agent
3824 .session("/tmp/test-workspace-ext", Some(opts))
3825 .unwrap();
3826 let tasks = session.pending_external_tasks().await;
3827 assert!(tasks.is_empty());
3828 }
3829
3830 #[tokio::test(flavor = "multi_thread")]
3831 async fn test_session_dead_letters_empty() {
3832 let agent = Agent::from_config(test_config()).await.unwrap();
3833 let qc = SessionQueueConfig::default().with_dlq(Some(100));
3834 let opts = SessionOptions::new().with_queue_config(qc);
3835 let session = agent
3836 .session("/tmp/test-workspace-dlq", Some(opts))
3837 .unwrap();
3838 let dead = session.dead_letters().await;
3839 assert!(dead.is_empty());
3840 }
3841
3842 #[tokio::test(flavor = "multi_thread")]
3843 async fn test_session_queue_metrics_disabled() {
3844 let agent = Agent::from_config(test_config()).await.unwrap();
3845 let qc = SessionQueueConfig::default();
3847 let opts = SessionOptions::new().with_queue_config(qc);
3848 let session = agent
3849 .session("/tmp/test-workspace-nomet", Some(opts))
3850 .unwrap();
3851 let metrics = session.queue_metrics().await;
3852 assert!(metrics.is_none());
3853 }
3854
3855 #[tokio::test(flavor = "multi_thread")]
3856 async fn test_session_queue_metrics_enabled() {
3857 let agent = Agent::from_config(test_config()).await.unwrap();
3858 let qc = SessionQueueConfig::default().with_metrics();
3859 let opts = SessionOptions::new().with_queue_config(qc);
3860 let session = agent
3861 .session("/tmp/test-workspace-met", Some(opts))
3862 .unwrap();
3863 let metrics = session.queue_metrics().await;
3864 assert!(metrics.is_some());
3865 }
3866
3867 #[tokio::test(flavor = "multi_thread")]
3868 async fn test_session_set_lane_handler() {
3869 let agent = Agent::from_config(test_config()).await.unwrap();
3870 let qc = SessionQueueConfig::default();
3871 let opts = SessionOptions::new().with_queue_config(qc);
3872 let session = agent
3873 .session("/tmp/test-workspace-handler", Some(opts))
3874 .unwrap();
3875
3876 session
3878 .set_lane_handler(
3879 SessionLane::Execute,
3880 LaneHandlerConfig {
3881 mode: crate::queue::TaskHandlerMode::External,
3882 timeout_ms: 30_000,
3883 },
3884 )
3885 .await;
3886
3887 }
3890
3891 #[tokio::test(flavor = "multi_thread")]
3896 async fn test_session_has_id() {
3897 let agent = Agent::from_config(test_config()).await.unwrap();
3898 let session = agent.session("/tmp/test-ws-id", None).unwrap();
3899 assert!(!session.session_id().is_empty());
3901 assert_eq!(session.session_id().len(), 36); }
3903
3904 #[tokio::test(flavor = "multi_thread")]
3905 async fn test_session_explicit_id() {
3906 let agent = Agent::from_config(test_config()).await.unwrap();
3907 let opts = SessionOptions::new().with_session_id("my-session-42");
3908 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
3909 assert_eq!(session.session_id(), "my-session-42");
3910 }
3911
3912 #[tokio::test(flavor = "multi_thread")]
3913 async fn test_session_artifact_store_limits_option() {
3914 let agent = Agent::from_config(test_config()).await.unwrap();
3915 let opts =
3916 SessionOptions::new().with_artifact_store_limits(crate::tools::ArtifactStoreLimits {
3917 max_artifacts: 3,
3918 max_bytes: 4096,
3919 });
3920 let session = agent
3921 .session("/tmp/test-ws-artifact-limits", Some(opts))
3922 .unwrap();
3923
3924 let limits = session.tool_executor.artifact_store().limits();
3925 assert_eq!(limits.max_artifacts, 3);
3926 assert_eq!(limits.max_bytes, 4096);
3927 }
3928
3929 #[tokio::test(flavor = "multi_thread")]
3930 async fn test_session_save_no_store() {
3931 let agent = Agent::from_config(test_config()).await.unwrap();
3932 let session = agent.session("/tmp/test-ws-save", None).unwrap();
3933 session.save().await.unwrap();
3935 }
3936
3937 #[tokio::test(flavor = "multi_thread")]
3938 async fn test_session_save_and_load() {
3939 let store = Arc::new(crate::store::MemorySessionStore::new());
3940 let agent = Agent::from_config(test_config()).await.unwrap();
3941
3942 let opts = SessionOptions::new()
3943 .with_session_store(store.clone())
3944 .with_session_id("persist-test");
3945 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
3946
3947 session.save().await.unwrap();
3949
3950 assert!(store.exists("persist-test").await.unwrap());
3952
3953 let data = store.load("persist-test").await.unwrap().unwrap();
3954 assert_eq!(data.id, "persist-test");
3955 assert!(data.messages.is_empty());
3956 }
3957
3958 #[tokio::test(flavor = "multi_thread")]
3959 async fn test_session_save_with_history() {
3960 let store = Arc::new(crate::store::MemorySessionStore::new());
3961 let agent = Agent::from_config(test_config()).await.unwrap();
3962
3963 let opts = SessionOptions::new()
3964 .with_session_store(store.clone())
3965 .with_session_id("history-test");
3966 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
3967
3968 {
3970 let mut h = session.history.write().unwrap();
3971 h.push(Message::user("Hello"));
3972 h.push(Message::user("How are you?"));
3973 }
3974
3975 session.save().await.unwrap();
3976
3977 let data = store.load("history-test").await.unwrap().unwrap();
3978 assert_eq!(data.messages.len(), 2);
3979 }
3980
3981 #[tokio::test(flavor = "multi_thread")]
3982 async fn test_resume_session() {
3983 let store = Arc::new(crate::store::MemorySessionStore::new());
3984 let agent = Agent::from_config(test_config()).await.unwrap();
3985
3986 let opts = SessionOptions::new()
3988 .with_session_store(store.clone())
3989 .with_session_id("resume-test");
3990 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
3991 {
3992 let mut h = session.history.write().unwrap();
3993 h.push(Message::user("What is Rust?"));
3994 h.push(Message::user("Tell me more"));
3995 }
3996 session.save().await.unwrap();
3997
3998 let opts2 = SessionOptions::new().with_session_store(store.clone());
4000 let resumed = agent.resume_session("resume-test", opts2).unwrap();
4001
4002 assert_eq!(resumed.session_id(), "resume-test");
4003 let history = resumed.history();
4004 assert_eq!(history.len(), 2);
4005 assert_eq!(history[0].text(), "What is Rust?");
4006 }
4007
4008 #[tokio::test(flavor = "multi_thread")]
4009 async fn test_resume_session_restores_artifacts() {
4010 let store = Arc::new(crate::store::MemorySessionStore::new());
4011 let agent = Agent::from_config(test_config()).await.unwrap();
4012
4013 let opts = SessionOptions::new()
4014 .with_session_store(store.clone())
4015 .with_session_id("resume-artifacts-test");
4016 let session = agent.session("/tmp/test-ws-artifacts", Some(opts)).unwrap();
4017 session
4018 .tool_executor
4019 .artifact_store()
4020 .put(crate::tools::ToolArtifact {
4021 artifact_id: "tool-output:test:a".to_string(),
4022 artifact_uri: "a3s://tool-output/test/a".to_string(),
4023 tool_name: "test".to_string(),
4024 content: "artifact content".to_string(),
4025 original_bytes: 16,
4026 shown_bytes: 4,
4027 });
4028
4029 session.save().await.unwrap();
4030 let opts2 = SessionOptions::new().with_session_store(store.clone());
4031 let resumed = agent
4032 .resume_session("resume-artifacts-test", opts2)
4033 .unwrap();
4034
4035 let artifact = resumed
4036 .get_artifact("a3s://tool-output/test/a")
4037 .expect("artifact");
4038 assert_eq!(artifact.content, "artifact content");
4039 }
4040
4041 #[tokio::test(flavor = "multi_thread")]
4042 async fn test_resume_session_restores_trace_events() {
4043 let store = Arc::new(crate::store::MemorySessionStore::new());
4044 let agent = Agent::from_config(test_config()).await.unwrap();
4045 let event = crate::trace::TraceEvent::tool_execution(
4046 "read",
4047 true,
4048 0,
4049 std::time::Duration::from_millis(3),
4050 32,
4051 Some(&serde_json::json!({
4052 "artifact": {
4053 "artifact_uri": "a3s://tool-output/read/abc"
4054 }
4055 })),
4056 );
4057
4058 let opts = SessionOptions::new()
4059 .with_session_store(store.clone())
4060 .with_session_id("resume-trace-test");
4061 let session = agent.session("/tmp/test-ws-trace", Some(opts)).unwrap();
4062 session.trace_sink.replace_events(vec![event.clone()]);
4063 session.save().await.unwrap();
4064
4065 let opts2 = SessionOptions::new().with_session_store(store.clone());
4066 let resumed = agent.resume_session("resume-trace-test", opts2).unwrap();
4067
4068 assert_eq!(resumed.trace_events(), vec![event]);
4069 }
4070
4071 #[tokio::test(flavor = "multi_thread")]
4072 async fn test_resume_session_restores_verification_reports() {
4073 let store = Arc::new(crate::store::MemorySessionStore::new());
4074 let agent = Agent::from_config(test_config()).await.unwrap();
4075 let report = crate::verification::VerificationReport::new(
4076 "program:test",
4077 vec![crate::verification::VerificationCheck::required(
4078 "check:test",
4079 "test",
4080 "Run tests",
4081 )
4082 .with_status(crate::verification::VerificationStatus::Passed)],
4083 );
4084
4085 let opts = SessionOptions::new()
4086 .with_session_store(store.clone())
4087 .with_session_id("resume-verification-test");
4088 let session = agent
4089 .session("/tmp/test-ws-verification", Some(opts))
4090 .unwrap();
4091 session.record_verification_reports([report.clone()]);
4092 session.save().await.unwrap();
4093
4094 let opts2 = SessionOptions::new().with_session_store(store.clone());
4095 let resumed = agent
4096 .resume_session("resume-verification-test", opts2)
4097 .unwrap();
4098
4099 assert_eq!(resumed.verification_reports(), vec![report]);
4100 assert_eq!(
4101 resumed.verification_summary().status,
4102 crate::verification::VerificationStatus::Passed
4103 );
4104 assert!(resumed
4105 .verification_summary_text()
4106 .contains("Verification passed"));
4107 }
4108
4109 #[tokio::test(flavor = "multi_thread")]
4110 async fn test_verify_commands_builds_report_from_bash_results() {
4111 let temp_dir = tempfile::tempdir().unwrap();
4112 let agent = Agent::from_config(test_config()).await.unwrap();
4113 let session = agent
4114 .session(temp_dir.path().display().to_string(), None)
4115 .unwrap();
4116 let commands = vec![
4117 crate::verification::VerificationCommand::required(
4118 "check:smoke",
4119 "smoke",
4120 "Run smoke command",
4121 "printf ok",
4122 ),
4123 crate::verification::VerificationCommand::required(
4124 "check:failure",
4125 "smoke",
4126 "Run failing command",
4127 "exit 7",
4128 ),
4129 ];
4130
4131 let report = session.verify_commands("turn", &commands).await.unwrap();
4132
4133 assert_eq!(report.subject, "turn");
4134 assert_eq!(
4135 report.status,
4136 crate::verification::VerificationStatus::Failed
4137 );
4138 assert_eq!(
4139 report.checks[0].status,
4140 crate::verification::VerificationStatus::Passed
4141 );
4142 assert_eq!(
4143 report.checks[1].status,
4144 crate::verification::VerificationStatus::Failed
4145 );
4146 assert_eq!(
4147 report.checks[1].residual_risk.as_deref(),
4148 Some("verification command exited with code 7: exit 7")
4149 );
4150 assert_eq!(session.verification_reports(), vec![report]);
4151 assert_eq!(
4152 session.verification_summary().status,
4153 crate::verification::VerificationStatus::Failed
4154 );
4155 }
4156
4157 #[tokio::test(flavor = "multi_thread")]
4158 async fn test_session_verification_presets_reflect_workspace() {
4159 let temp_dir = tempfile::tempdir().unwrap();
4160 std::fs::write(
4161 temp_dir.path().join("package.json"),
4162 r#"{"scripts":{"test":"vitest","typecheck":"tsc --noEmit"}}"#,
4163 )
4164 .unwrap();
4165 let agent = Agent::from_config(test_config()).await.unwrap();
4166 let session = agent
4167 .session(temp_dir.path().display().to_string(), None)
4168 .unwrap();
4169
4170 let presets = session.verification_presets();
4171
4172 assert_eq!(presets.len(), 1);
4173 assert_eq!(presets[0].project_kind, "node");
4174 assert_eq!(presets[0].commands[0].command, "npm test");
4175 assert_eq!(presets[0].commands[1].command, "npm run typecheck");
4176 }
4177
4178 #[tokio::test(flavor = "multi_thread")]
4179 async fn test_resume_session_not_found() {
4180 let store = Arc::new(crate::store::MemorySessionStore::new());
4181 let agent = Agent::from_config(test_config()).await.unwrap();
4182
4183 let opts = SessionOptions::new().with_session_store(store.clone());
4184 let result = agent.resume_session("nonexistent", opts);
4185 assert!(result.is_err());
4186 assert!(result.unwrap_err().to_string().contains("not found"));
4187 }
4188
4189 #[tokio::test(flavor = "multi_thread")]
4190 async fn test_resume_session_no_store() {
4191 let agent = Agent::from_config(test_config()).await.unwrap();
4192 let opts = SessionOptions::new();
4193 let result = agent.resume_session("any-id", opts);
4194 assert!(result.is_err());
4195 assert!(result.unwrap_err().to_string().contains("session_store"));
4196 }
4197
4198 #[tokio::test(flavor = "multi_thread")]
4199 async fn test_file_session_store_persistence() {
4200 let dir = tempfile::TempDir::new().unwrap();
4201 let store = Arc::new(
4202 crate::store::FileSessionStore::new(dir.path())
4203 .await
4204 .unwrap(),
4205 );
4206 let agent = Agent::from_config(test_config()).await.unwrap();
4207
4208 let opts = SessionOptions::new()
4210 .with_session_store(store.clone())
4211 .with_session_id("file-persist");
4212 let session = agent
4213 .session("/tmp/test-ws-file-persist", Some(opts))
4214 .unwrap();
4215 {
4216 let mut h = session.history.write().unwrap();
4217 h.push(Message::user("test message"));
4218 }
4219 session.save().await.unwrap();
4220
4221 let store2 = Arc::new(
4223 crate::store::FileSessionStore::new(dir.path())
4224 .await
4225 .unwrap(),
4226 );
4227 let data = store2.load("file-persist").await.unwrap().unwrap();
4228 assert_eq!(data.messages.len(), 1);
4229 }
4230
4231 #[tokio::test(flavor = "multi_thread")]
4232 async fn test_session_options_builders() {
4233 let opts = SessionOptions::new()
4234 .with_session_id("test-id")
4235 .with_auto_save(true);
4236 assert_eq!(opts.session_id, Some("test-id".to_string()));
4237 assert!(opts.auto_save);
4238 }
4239
4240 #[tokio::test(flavor = "multi_thread")]
4245 async fn test_session_with_memory_store() {
4246 use a3s_memory::InMemoryStore;
4247 let store = Arc::new(InMemoryStore::new());
4248 let agent = Agent::from_config(test_config()).await.unwrap();
4249 let opts = SessionOptions::new().with_memory(store);
4250 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
4251 assert!(session.memory().is_some());
4252 }
4253
4254 #[tokio::test(flavor = "multi_thread")]
4255 async fn test_session_without_memory_store() {
4256 let agent = Agent::from_config(test_config()).await.unwrap();
4257 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
4258 assert!(session.memory().is_none());
4259 }
4260
4261 #[tokio::test(flavor = "multi_thread")]
4262 async fn test_session_memory_wired_into_config() {
4263 use a3s_memory::InMemoryStore;
4264 let store = Arc::new(InMemoryStore::new());
4265 let agent = Agent::from_config(test_config()).await.unwrap();
4266 let opts = SessionOptions::new().with_memory(store);
4267 let session = agent
4268 .session("/tmp/test-ws-mem-config", Some(opts))
4269 .unwrap();
4270 assert!(session.memory().is_some());
4272 }
4273
4274 #[tokio::test(flavor = "multi_thread")]
4275 async fn test_session_with_file_memory() {
4276 let dir = tempfile::TempDir::new().unwrap();
4277 let agent = Agent::from_config(test_config()).await.unwrap();
4278 let opts = SessionOptions::new().with_file_memory(dir.path());
4279 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
4280 assert!(session.memory().is_some());
4281 }
4282
4283 #[tokio::test(flavor = "multi_thread")]
4284 async fn test_memory_remember_and_recall() {
4285 use a3s_memory::InMemoryStore;
4286 let store = Arc::new(InMemoryStore::new());
4287 let agent = Agent::from_config(test_config()).await.unwrap();
4288 let opts = SessionOptions::new().with_memory(store);
4289 let session = agent
4290 .session("/tmp/test-ws-mem-recall", Some(opts))
4291 .unwrap();
4292
4293 let memory = session.memory().unwrap();
4294 memory
4295 .remember_success("write a file", &["write".to_string()], "done")
4296 .await
4297 .unwrap();
4298
4299 let results = memory.recall_similar("write", 5).await.unwrap();
4300 assert!(!results.is_empty());
4301 let stats = memory.stats().await.unwrap();
4302 assert_eq!(stats.long_term_count, 1);
4303 }
4304
4305 #[tokio::test(flavor = "multi_thread")]
4310 async fn test_session_tool_timeout_configured() {
4311 let agent = Agent::from_config(test_config()).await.unwrap();
4312 let opts = SessionOptions::new().with_tool_timeout(5000);
4313 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
4314 assert!(!session.id().is_empty());
4315 }
4316
4317 #[tokio::test(flavor = "multi_thread")]
4322 async fn test_session_without_queue_builds_ok() {
4323 let agent = Agent::from_config(test_config()).await.unwrap();
4324 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
4325 assert!(!session.id().is_empty());
4326 }
4327
4328 #[tokio::test(flavor = "multi_thread")]
4333 async fn test_concurrent_history_reads() {
4334 let agent = Agent::from_config(test_config()).await.unwrap();
4335 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
4336
4337 let handles: Vec<_> = (0..10)
4338 .map(|_| {
4339 let s = Arc::clone(&session);
4340 tokio::spawn(async move { s.history().len() })
4341 })
4342 .collect();
4343
4344 for h in handles {
4345 h.await.unwrap();
4346 }
4347 }
4348
4349 #[tokio::test(flavor = "multi_thread")]
4354 async fn test_session_no_init_warning_without_file_memory() {
4355 let agent = Agent::from_config(test_config()).await.unwrap();
4356 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
4357 assert!(session.init_warning().is_none());
4358 }
4359
4360 #[tokio::test(flavor = "multi_thread")]
4361 async fn test_register_agent_dir_loads_agents_into_live_session() {
4362 let temp_dir = tempfile::tempdir().unwrap();
4363
4364 std::fs::write(
4366 temp_dir.path().join("my-agent.yaml"),
4367 "name: my-dynamic-agent\ndescription: Dynamically registered agent\n",
4368 )
4369 .unwrap();
4370
4371 let agent = Agent::from_config(test_config()).await.unwrap();
4372 let session = agent.session(".", None).unwrap();
4373
4374 assert!(!session.agent_registry.exists("my-dynamic-agent"));
4376
4377 let count = session.register_agent_dir(temp_dir.path());
4378 assert_eq!(count, 1);
4379 assert!(session.agent_registry.exists("my-dynamic-agent"));
4380 }
4381
4382 #[tokio::test(flavor = "multi_thread")]
4383 async fn test_register_agent_dir_empty_dir_returns_zero() {
4384 let temp_dir = tempfile::tempdir().unwrap();
4385 let agent = Agent::from_config(test_config()).await.unwrap();
4386 let session = agent.session(".", None).unwrap();
4387 let count = session.register_agent_dir(temp_dir.path());
4388 assert_eq!(count, 0);
4389 }
4390
4391 #[tokio::test(flavor = "multi_thread")]
4392 async fn test_register_agent_dir_nonexistent_returns_zero() {
4393 let agent = Agent::from_config(test_config()).await.unwrap();
4394 let session = agent.session(".", None).unwrap();
4395 let count = session.register_agent_dir(std::path::Path::new("/nonexistent/path/abc"));
4396 assert_eq!(count, 0);
4397 }
4398
4399 #[tokio::test(flavor = "multi_thread")]
4400 async fn test_session_with_mcp_manager_builds_ok() {
4401 use crate::mcp::manager::McpManager;
4402 let mcp = Arc::new(McpManager::new());
4403 let agent = Agent::from_config(test_config()).await.unwrap();
4404 let opts = SessionOptions::new().with_mcp(mcp);
4405 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
4407 assert!(!session.id().is_empty());
4408 }
4409
4410 #[test]
4411 fn test_session_command_is_available_from_queue_module() {
4412 use crate::queue::SessionCommand;
4414 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
4415 }
4416}