1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{CommandAction, CommandContext, CommandRegistry};
21use crate::config::CodeConfig;
22use crate::context::{ContextItem, ContextType, StaticContextProvider};
23use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
24use crate::hitl::PendingConfirmationInfo;
25use crate::llm::{LlmClient, Message};
26use crate::prompts::{PlanningMode, SystemPromptSlots};
27use crate::queue::{
28 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
29 SessionQueueStats,
30};
31use crate::session_lane_queue::SessionLaneQueue;
32use crate::text::truncate_utf8;
33use crate::tools::{ToolContext, ToolExecutor};
34use a3s_lane::{DeadLetter, MetricsSnapshot};
35use a3s_memory::{FileMemoryStore, MemoryStore};
36use anyhow::Context;
37use std::collections::HashMap;
38use std::path::{Path, PathBuf};
39use std::sync::{Arc, RwLock};
40use tokio::sync::{broadcast, mpsc};
41use tokio::task::JoinHandle;
42
43fn safe_canonicalize(path: &Path) -> PathBuf {
46 match std::fs::canonicalize(path) {
47 Ok(p) => strip_unc_prefix(p),
48 Err(_) => path.to_path_buf(),
49 }
50}
51
52fn strip_unc_prefix(path: PathBuf) -> PathBuf {
55 #[cfg(windows)]
56 {
57 let s = path.to_string_lossy();
58 if let Some(stripped) = s.strip_prefix(r"\\?\") {
59 return PathBuf::from(stripped);
60 }
61 }
62 path
63}
64
65#[derive(Debug, Clone)]
71pub struct ToolCallResult {
72 pub name: String,
73 pub output: String,
74 pub exit_code: i32,
75 pub metadata: Option<serde_json::Value>,
76}
77
78#[derive(Clone, Default)]
84pub struct SessionOptions {
85 pub model: Option<String>,
87 pub agent_dirs: Vec<PathBuf>,
90 pub queue_config: Option<SessionQueueConfig>,
95 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
97 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
99 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
101 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
103 pub planning_mode: PlanningMode,
105 pub goal_tracking: bool,
107 pub skill_dirs: Vec<PathBuf>,
110 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
112 pub memory_store: Option<Arc<dyn MemoryStore>>,
114 pub(crate) file_memory_dir: Option<PathBuf>,
116 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
118 pub session_id: Option<String>,
120 pub auto_save: bool,
122 pub artifact_store_limits: Option<crate::tools::ArtifactStoreLimits>,
124 pub max_parse_retries: Option<u32>,
127 pub tool_timeout_ms: Option<u64>,
130 pub circuit_breaker_threshold: Option<u32>,
134 pub sandbox_handle: Option<Arc<dyn crate::sandbox::BashSandbox>>,
140 pub auto_compact: bool,
142 pub auto_compact_threshold: Option<f32>,
145 pub continuation_enabled: Option<bool>,
148 pub max_continuation_turns: Option<u32>,
151 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
156 pub temperature: Option<f32>,
158 pub thinking_budget: Option<usize>,
160 pub max_tool_rounds: Option<usize>,
166 pub prompt_slots: Option<SystemPromptSlots>,
172 pub hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
179}
180
181impl std::fmt::Debug for SessionOptions {
182 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
183 f.debug_struct("SessionOptions")
184 .field("model", &self.model)
185 .field("agent_dirs", &self.agent_dirs)
186 .field("skill_dirs", &self.skill_dirs)
187 .field("queue_config", &self.queue_config)
188 .field("security_provider", &self.security_provider.is_some())
189 .field("context_providers", &self.context_providers.len())
190 .field("confirmation_manager", &self.confirmation_manager.is_some())
191 .field("permission_checker", &self.permission_checker.is_some())
192 .field("planning_mode", &self.planning_mode)
193 .field("goal_tracking", &self.goal_tracking)
194 .field(
195 "skill_registry",
196 &self
197 .skill_registry
198 .as_ref()
199 .map(|r| format!("{} skills", r.len())),
200 )
201 .field("memory_store", &self.memory_store.is_some())
202 .field("session_store", &self.session_store.is_some())
203 .field("session_id", &self.session_id)
204 .field("auto_save", &self.auto_save)
205 .field("artifact_store_limits", &self.artifact_store_limits)
206 .field("max_parse_retries", &self.max_parse_retries)
207 .field("tool_timeout_ms", &self.tool_timeout_ms)
208 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
209 .field("sandbox_handle", &self.sandbox_handle.is_some())
210 .field("auto_compact", &self.auto_compact)
211 .field("auto_compact_threshold", &self.auto_compact_threshold)
212 .field("continuation_enabled", &self.continuation_enabled)
213 .field("max_continuation_turns", &self.max_continuation_turns)
214 .field("mcp_manager", &self.mcp_manager.is_some())
215 .field("temperature", &self.temperature)
216 .field("thinking_budget", &self.thinking_budget)
217 .field("max_tool_rounds", &self.max_tool_rounds)
218 .field("prompt_slots", &self.prompt_slots.is_some())
219 .finish()
220 }
221}
222
223impl SessionOptions {
224 pub fn new() -> Self {
225 Self::default()
226 }
227
228 pub fn with_model(mut self, model: impl Into<String>) -> Self {
229 self.model = Some(model.into());
230 self
231 }
232
233 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
234 self.agent_dirs.push(dir.into());
235 self
236 }
237
238 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
239 self.queue_config = Some(config);
240 self
241 }
242
243 pub fn with_default_security(mut self) -> Self {
245 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
246 self
247 }
248
249 pub fn with_security_provider(
251 mut self,
252 provider: Arc<dyn crate::security::SecurityProvider>,
253 ) -> Self {
254 self.security_provider = Some(provider);
255 self
256 }
257
258 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
260 let config = crate::context::FileSystemContextConfig::new(root_path);
261 self.context_providers
262 .push(Arc::new(crate::context::FileSystemContextProvider::new(
263 config,
264 )));
265 self
266 }
267
268 pub fn with_context_provider(
270 mut self,
271 provider: Arc<dyn crate::context::ContextProvider>,
272 ) -> Self {
273 self.context_providers.push(provider);
274 self
275 }
276
277 pub fn with_confirmation_manager(
279 mut self,
280 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
281 ) -> Self {
282 self.confirmation_manager = Some(manager);
283 self
284 }
285
286 pub fn with_permission_checker(
288 mut self,
289 checker: Arc<dyn crate::permissions::PermissionChecker>,
290 ) -> Self {
291 self.permission_checker = Some(checker);
292 self
293 }
294
295 pub fn with_planning_mode(mut self, mode: PlanningMode) -> Self {
297 self.planning_mode = mode;
298 self
299 }
300
301 pub fn with_planning(mut self, enabled: bool) -> Self {
303 self.planning_mode = if enabled {
304 PlanningMode::Enabled
305 } else {
306 PlanningMode::Disabled
307 };
308 self
309 }
310
311 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
313 self.goal_tracking = enabled;
314 self
315 }
316
317 pub fn with_builtin_skills(mut self) -> Self {
319 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
320 self
321 }
322
323 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
325 self.skill_registry = Some(registry);
326 self
327 }
328
329 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
332 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
333 self
334 }
335
336 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
338 let registry = self
339 .skill_registry
340 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
341 if let Err(e) = registry.load_from_dir(&dir) {
342 tracing::warn!(
343 dir = %dir.as_ref().display(),
344 error = %e,
345 "Failed to load skills from directory — continuing without them"
346 );
347 }
348 self.skill_registry = Some(registry);
349 self
350 }
351
352 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
354 self.memory_store = Some(store);
355 self
356 }
357
358 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
364 self.file_memory_dir = Some(dir.into());
365 self
366 }
367
368 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
370 self.session_store = Some(store);
371 self
372 }
373
374 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
376 let dir = dir.into();
377 match tokio::runtime::Handle::try_current() {
378 Ok(handle) => {
379 match tokio::task::block_in_place(|| {
380 handle.block_on(crate::store::FileSessionStore::new(dir))
381 }) {
382 Ok(store) => {
383 self.session_store =
384 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
385 }
386 Err(e) => {
387 tracing::warn!("Failed to create file session store: {}", e);
388 }
389 }
390 }
391 Err(_) => {
392 tracing::warn!(
393 "No async runtime available for file session store — persistence disabled"
394 );
395 }
396 }
397 self
398 }
399
400 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
402 self.session_id = Some(id.into());
403 self
404 }
405
406 pub fn with_auto_save(mut self, enabled: bool) -> Self {
408 self.auto_save = enabled;
409 self
410 }
411
412 pub fn with_artifact_store_limits(mut self, limits: crate::tools::ArtifactStoreLimits) -> Self {
414 self.artifact_store_limits = Some(limits);
415 self
416 }
417
418 pub fn with_parse_retries(mut self, max: u32) -> Self {
424 self.max_parse_retries = Some(max);
425 self
426 }
427
428 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
434 self.tool_timeout_ms = Some(timeout_ms);
435 self
436 }
437
438 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
444 self.circuit_breaker_threshold = Some(threshold);
445 self
446 }
447
448 pub fn with_resilience_defaults(self) -> Self {
454 self.with_parse_retries(2)
455 .with_tool_timeout(120_000)
456 .with_circuit_breaker(3)
457 }
458
459 pub fn with_sandbox_handle(mut self, handle: Arc<dyn crate::sandbox::BashSandbox>) -> Self {
467 self.sandbox_handle = Some(handle);
468 self
469 }
470
471 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
476 self.auto_compact = enabled;
477 self
478 }
479
480 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
482 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
483 self
484 }
485
486 pub fn with_continuation(mut self, enabled: bool) -> Self {
491 self.continuation_enabled = Some(enabled);
492 self
493 }
494
495 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
497 self.max_continuation_turns = Some(turns);
498 self
499 }
500
501 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
506 self.mcp_manager = Some(manager);
507 self
508 }
509
510 pub fn with_temperature(mut self, temperature: f32) -> Self {
511 self.temperature = Some(temperature);
512 self
513 }
514
515 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
516 self.thinking_budget = Some(budget);
517 self
518 }
519
520 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
525 self.max_tool_rounds = Some(rounds);
526 self
527 }
528
529 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
534 self.prompt_slots = Some(slots);
535 self
536 }
537
538 pub fn with_hook_executor(mut self, executor: Arc<dyn crate::hooks::HookExecutor>) -> Self {
544 self.hook_executor = Some(executor);
545 self
546 }
547}
548
549pub struct Agent {
558 code_config: CodeConfig,
559 config: AgentConfig,
560 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
562 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
565}
566
567impl std::fmt::Debug for Agent {
568 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
569 f.debug_struct("Agent").finish()
570 }
571}
572
573impl Agent {
574 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
578 let source = config_source.into();
579
580 let expanded = if let Some(rest) = source.strip_prefix("~/") {
582 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
583 if let Some(home) = home {
584 PathBuf::from(home).join(rest).display().to_string()
585 } else {
586 source.clone()
587 }
588 } else {
589 source.clone()
590 };
591
592 let path = Path::new(&expanded);
593
594 let ext = path.extension().and_then(|ext| ext.to_str());
595
596 let config = if matches!(ext, Some("acl")) {
597 if !path.exists() {
598 return Err(CodeError::Config(format!(
599 "Config file not found: {}",
600 path.display()
601 )));
602 }
603
604 CodeConfig::from_file(path)
605 .with_context(|| format!("Failed to load config: {}", path.display()))?
606 } else if matches!(ext, Some("hcl")) {
607 return Err(CodeError::Config(
608 "HCL config files are not supported in 2.0; rename the file to .acl".into(),
609 ));
610 } else if source.trim().starts_with('{') {
611 return Err(CodeError::Config(
612 "JSON config is not supported; use ACL-compatible .acl config".into(),
613 ));
614 } else if matches!(ext, Some("json")) {
615 return Err(CodeError::Config(
616 "JSON config files are not supported; use .acl".into(),
617 ));
618 } else {
619 CodeConfig::from_acl(&source).context("Failed to parse config as ACL string")?
620 };
621
622 Self::from_config(config).await
623 }
624
625 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
630 Self::new(config_source).await
631 }
632
633 pub async fn from_config(config: CodeConfig) -> Result<Self> {
635 config
636 .default_llm_config()
637 .context("default_model must be set in 'provider/model' format with a valid API key")?;
638
639 let agent_config = AgentConfig {
640 max_tool_rounds: config
641 .max_tool_rounds
642 .unwrap_or(AgentConfig::default().max_tool_rounds),
643 ..AgentConfig::default()
644 };
645
646 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
648 (None, vec![])
649 } else {
650 let manager = Arc::new(crate::mcp::manager::McpManager::new());
651 for server in &config.mcp_servers {
652 if !server.enabled {
653 continue;
654 }
655 manager.register_server(server.clone()).await;
656 if let Err(e) = manager.connect(&server.name).await {
657 tracing::warn!(
658 server = %server.name,
659 error = %e,
660 "Failed to connect to MCP server — skipping"
661 );
662 }
663 }
664 let tools = manager.get_all_tools().await;
666 (Some(manager), tools)
667 };
668
669 let mut agent = Agent {
670 code_config: config,
671 config: agent_config,
672 global_mcp,
673 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
674 };
675
676 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
678 for dir in &agent.code_config.skill_dirs.clone() {
679 if let Err(e) = registry.load_from_dir(dir) {
680 tracing::warn!(
681 dir = %dir.display(),
682 error = %e,
683 "Failed to load skills from directory — skipping"
684 );
685 }
686 }
687 agent.config.skill_registry = Some(registry);
688
689 Ok(agent)
690 }
691
692 pub async fn refresh_mcp_tools(&self) -> Result<()> {
700 if let Some(ref mcp) = self.global_mcp {
701 let fresh = mcp.get_all_tools().await;
702 *self
703 .global_mcp_tools
704 .lock()
705 .expect("global_mcp_tools lock poisoned") = fresh;
706 }
707 Ok(())
708 }
709
710 pub fn session(
715 &self,
716 workspace: impl Into<String>,
717 options: Option<SessionOptions>,
718 ) -> Result<AgentSession> {
719 let opts = options.unwrap_or_default();
720
721 let mut merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
724 (Some(global), Some(session)) => {
725 let global = Arc::clone(global);
726 let session_mgr = Arc::clone(session);
727 match tokio::runtime::Handle::try_current() {
728 Ok(handle) => {
729 let global_for_merge = Arc::clone(&global);
730 tokio::task::block_in_place(|| {
731 handle.block_on(async move {
732 for config in session_mgr.all_configs().await {
733 let name = config.name.clone();
734 global_for_merge.register_server(config).await;
735 if let Err(e) = global_for_merge.connect(&name).await {
736 tracing::warn!(
737 server = %name,
738 error = %e,
739 "Failed to connect session-level MCP server — skipping"
740 );
741 }
742 }
743 })
744 });
745 }
746 Err(_) => {
747 tracing::warn!(
748 "No async runtime available to merge session-level MCP servers \
749 into global manager — session MCP servers will not be available"
750 );
751 }
752 }
753 SessionOptions {
754 mcp_manager: Some(Arc::clone(&global)),
755 ..opts
756 }
757 }
758 (Some(global), None) => SessionOptions {
759 mcp_manager: Some(Arc::clone(global)),
760 ..opts
761 },
762 _ => opts,
763 };
764
765 let session_id = merged_opts
766 .session_id
767 .clone()
768 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
769 merged_opts.session_id = Some(session_id.clone());
770 let llm_client = self.resolve_session_llm_client(&merged_opts, Some(&session_id))?;
771
772 self.build_session(workspace.into(), llm_client, &merged_opts)
773 }
774
775 pub fn session_for_agent(
790 &self,
791 workspace: impl Into<String>,
792 def: &crate::subagent::AgentDefinition,
793 extra: Option<SessionOptions>,
794 ) -> Result<AgentSession> {
795 let mut opts = extra.unwrap_or_default();
796
797 if opts.permission_checker.is_none()
799 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
800 {
801 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
802 }
803
804 if opts.max_tool_rounds.is_none() {
806 if let Some(steps) = def.max_steps {
807 opts.max_tool_rounds = Some(steps);
808 }
809 }
810
811 if opts.model.is_none() {
813 if let Some(ref m) = def.model {
814 let provider = m.provider.as_deref().unwrap_or("anthropic");
815 opts.model = Some(format!("{}/{}", provider, m.model));
816 }
817 }
818
819 if let Some(ref prompt) = def.prompt {
826 let slots = opts
827 .prompt_slots
828 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
829 if slots.extra.is_none() {
830 slots.extra = Some(prompt.clone());
831 }
832 }
833
834 self.session(workspace, Some(opts))
835 }
836
837 pub fn resume_session(
845 &self,
846 session_id: &str,
847 options: SessionOptions,
848 ) -> Result<AgentSession> {
849 let store = options.session_store.clone().ok_or_else(|| {
850 crate::error::CodeError::Session(
851 "resume_session requires a session_store in SessionOptions".to_string(),
852 )
853 })?;
854
855 let data = match tokio::runtime::Handle::try_current() {
857 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
858 .map_err(|e| {
859 crate::error::CodeError::Session(format!(
860 "Failed to load session {}: {}",
861 session_id, e
862 ))
863 })?,
864 Err(_) => {
865 return Err(crate::error::CodeError::Session(
866 "No async runtime available for session resume".to_string(),
867 ))
868 }
869 };
870
871 let data = data.ok_or_else(|| {
872 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
873 })?;
874
875 let mut opts = options;
877 opts.session_id = Some(data.id.clone());
878 let llm_client = self.resolve_session_llm_client(&opts, Some(&data.id))?;
879
880 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
881
882 *write_or_recover(&session.history) = data.messages;
884 let artifacts = match tokio::runtime::Handle::try_current() {
885 Ok(handle) => {
886 tokio::task::block_in_place(|| handle.block_on(store.load_artifacts(&data.id)))
887 .map_err(|e| {
888 crate::error::CodeError::Session(format!(
889 "Failed to load artifacts for session {}: {}",
890 data.id, e
891 ))
892 })?
893 }
894 Err(_) => None,
895 };
896 if let Some(artifacts) = artifacts {
897 let target_store = session.tool_executor.artifact_store();
898 for artifact in artifacts.artifacts() {
899 target_store.put(artifact);
900 }
901 }
902
903 let trace_events = match tokio::runtime::Handle::try_current() {
904 Ok(handle) => {
905 tokio::task::block_in_place(|| handle.block_on(store.load_trace_events(&data.id)))
906 .map_err(|e| {
907 crate::error::CodeError::Session(format!(
908 "Failed to load trace events for session {}: {}",
909 data.id, e
910 ))
911 })?
912 }
913 Err(_) => None,
914 };
915 if let Some(events) = trace_events {
916 session.trace_sink.replace_events(events);
917 }
918
919 let run_records = match tokio::runtime::Handle::try_current() {
920 Ok(handle) => {
921 tokio::task::block_in_place(|| handle.block_on(store.load_run_records(&data.id)))
922 .map_err(|e| {
923 crate::error::CodeError::Session(format!(
924 "Failed to load run records for session {}: {}",
925 data.id, e
926 ))
927 })?
928 }
929 Err(_) => None,
930 };
931 if let Some(records) = run_records {
932 if let Ok(handle) = tokio::runtime::Handle::try_current() {
933 tokio::task::block_in_place(|| {
934 handle.block_on(session.run_store.replace_records(records))
935 });
936 }
937 }
938
939 let verification_reports = match tokio::runtime::Handle::try_current() {
940 Ok(handle) => tokio::task::block_in_place(|| {
941 handle.block_on(store.load_verification_reports(&data.id))
942 })
943 .map_err(|e| {
944 crate::error::CodeError::Session(format!(
945 "Failed to load verification reports for session {}: {}",
946 data.id, e
947 ))
948 })?,
949 Err(_) => None,
950 };
951 if let Some(reports) = verification_reports {
952 *write_or_recover(&session.verification_reports) = reports;
953 }
954
955 Ok(session)
956 }
957
958 fn resolve_session_llm_client(
959 &self,
960 opts: &SessionOptions,
961 session_id: Option<&str>,
962 ) -> Result<Arc<dyn LlmClient>> {
963 let model_ref = if let Some(ref model) = opts.model {
964 model.as_str()
965 } else {
966 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
967 tracing::warn!(
968 "temperature/thinking_budget set without model override — these will be ignored. \
969 Use with_model() to apply LLM parameter overrides."
970 );
971 }
972 self.code_config
973 .default_model
974 .as_deref()
975 .context("default_model must be set in 'provider/model' format")?
976 };
977
978 let (provider_name, model_id) = model_ref
979 .split_once('/')
980 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
981
982 let mut llm_config = self
983 .code_config
984 .llm_config(provider_name, model_id)
985 .with_context(|| {
986 format!("provider '{provider_name}' or model '{model_id}' not found in config")
987 })?;
988
989 if opts.model.is_some() {
990 if let Some(temp) = opts.temperature {
991 llm_config = llm_config.with_temperature(temp);
992 }
993 if let Some(budget) = opts.thinking_budget {
994 llm_config = llm_config.with_thinking_budget(budget);
995 }
996 }
997
998 if let Some(session_id) = session_id {
999 llm_config = llm_config.with_session_id(session_id);
1000 }
1001
1002 Ok(crate::llm::create_client_with_config(llm_config))
1003 }
1004
1005 fn build_session(
1006 &self,
1007 workspace: String,
1008 llm_client: Arc<dyn LlmClient>,
1009 opts: &SessionOptions,
1010 ) -> Result<AgentSession> {
1011 let canonical = safe_canonicalize(Path::new(&workspace));
1012
1013 let artifact_limits = opts.artifact_store_limits.unwrap_or_default();
1014 let tool_executor = Arc::new(ToolExecutor::new_with_artifact_limits(
1015 canonical.display().to_string(),
1016 artifact_limits,
1017 ));
1018 let trace_sink = crate::trace::InMemoryTraceSink::default();
1019 tool_executor.set_trace_sink(Arc::new(trace_sink.clone()));
1020
1021 if let Some(ref search_config) = self.code_config.search {
1023 tool_executor
1024 .registry()
1025 .set_search_config(search_config.clone());
1026 }
1027
1028 let agent_registry = {
1032 use crate::subagent::{load_agents_from_dir, AgentRegistry};
1033 use crate::tools::register_task_with_mcp;
1034 let registry = AgentRegistry::new();
1035 for dir in self
1036 .code_config
1037 .agent_dirs
1038 .iter()
1039 .chain(opts.agent_dirs.iter())
1040 {
1041 for agent in load_agents_from_dir(dir) {
1042 registry.register(agent);
1043 }
1044 }
1045 let registry = Arc::new(registry);
1046 register_task_with_mcp(
1047 tool_executor.registry(),
1048 Arc::clone(&llm_client),
1049 Arc::clone(®istry),
1050 canonical.display().to_string(),
1051 opts.mcp_manager.clone(),
1052 );
1053 registry
1054 };
1055
1056 if let Some(ref mcp) = opts.mcp_manager {
1059 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
1062 Arc::as_ptr(mcp),
1063 self.global_mcp
1064 .as_ref()
1065 .map(Arc::as_ptr)
1066 .unwrap_or(std::ptr::null()),
1067 ) {
1068 self.global_mcp_tools
1070 .lock()
1071 .expect("global_mcp_tools lock poisoned")
1072 .clone()
1073 } else {
1074 match tokio::runtime::Handle::try_current() {
1076 Ok(handle) => {
1077 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
1078 }
1079 Err(_) => {
1080 tracing::warn!(
1081 "No async runtime available for session-level MCP tools — \
1082 MCP tools will not be registered"
1083 );
1084 vec![]
1085 }
1086 }
1087 };
1088
1089 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
1090 std::collections::HashMap::new();
1091 for (server, tool) in all_tools {
1092 by_server.entry(server).or_default().push(tool);
1093 }
1094 for (server_name, tools) in by_server {
1095 for tool in
1096 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
1097 {
1098 tool_executor.register_dynamic_tool(tool);
1099 }
1100 }
1101 }
1102
1103 let tool_defs = tool_executor.definitions();
1104
1105 let prompt_slots = opts
1107 .prompt_slots
1108 .clone()
1109 .unwrap_or_else(|| self.config.prompt_slots.clone());
1110
1111 let mut context_providers = opts.context_providers.clone();
1112
1113 let agents_md_path = canonical.join("AGENTS.md");
1115 if agents_md_path.exists() && agents_md_path.is_file() {
1116 match std::fs::read_to_string(&agents_md_path) {
1117 Ok(content) if !content.trim().is_empty() => {
1118 tracing::info!(
1119 path = %agents_md_path.display(),
1120 "Auto-loaded AGENTS.md from workspace root"
1121 );
1122 let token_count = content.split_whitespace().count().max(1);
1123 let item = ContextItem::new(
1124 "agents_md",
1125 ContextType::Resource,
1126 format!("# Project Instructions (AGENTS.md)\n\n{}", content),
1127 )
1128 .with_source(format!("file://{}", agents_md_path.display()))
1129 .with_provenance("workspace_instructions")
1130 .with_priority(0.95)
1131 .with_trust(0.95)
1132 .with_freshness(1.0)
1133 .with_relevance(0.95)
1134 .with_token_count(token_count);
1135
1136 context_providers.push(Arc::new(
1137 StaticContextProvider::new("agents_md").with_item(item),
1138 ));
1139 }
1140 Ok(_) => {
1141 tracing::debug!(
1142 path = %agents_md_path.display(),
1143 "AGENTS.md exists but is empty — skipping"
1144 );
1145 }
1146 Err(e) => {
1147 tracing::warn!(
1148 path = %agents_md_path.display(),
1149 error = %e,
1150 "Failed to read AGENTS.md — skipping"
1151 );
1152 }
1153 }
1154 }
1155
1156 let base_registry = self
1160 .config
1161 .skill_registry
1162 .as_deref()
1163 .map(|r| r.fork())
1164 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
1165 if let Some(ref r) = opts.skill_registry {
1167 for skill in r.all() {
1168 base_registry.register_unchecked(skill);
1169 }
1170 }
1171 for dir in &opts.skill_dirs {
1173 if let Err(e) = base_registry.load_from_dir(dir) {
1174 tracing::warn!(
1175 dir = %dir.display(),
1176 error = %e,
1177 "Failed to load session skill dir — skipping"
1178 );
1179 }
1180 }
1181 let effective_registry = Arc::new(base_registry);
1182
1183 let skill_prompt = effective_registry.to_system_prompt();
1185 if !skill_prompt.is_empty() {
1186 let item = ContextItem::new("skills_catalog", ContextType::Skill, skill_prompt)
1187 .with_source("a3s://skills/catalog")
1188 .with_provenance("skill_registry")
1189 .with_priority(0.85)
1190 .with_trust(0.9)
1191 .with_freshness(1.0)
1192 .with_relevance(1.0);
1193 context_providers.push(Arc::new(
1194 StaticContextProvider::new("skills_catalog").with_item(item),
1195 ));
1196 }
1197
1198 let mut init_warning: Option<String> = None;
1200 let memory = {
1201 let store = if let Some(ref store) = opts.memory_store {
1202 Some(Arc::clone(store))
1203 } else if let Some(ref dir) = opts.file_memory_dir {
1204 match tokio::runtime::Handle::try_current() {
1205 Ok(handle) => {
1206 let dir = dir.clone();
1207 match tokio::task::block_in_place(|| {
1208 handle.block_on(FileMemoryStore::new(dir))
1209 }) {
1210 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1211 Err(e) => {
1212 let msg = format!("Failed to create file memory store: {}", e);
1213 tracing::warn!("{}", msg);
1214 init_warning = Some(msg);
1215 None
1216 }
1217 }
1218 }
1219 Err(_) => {
1220 let msg =
1221 "No async runtime available for file memory store — memory disabled"
1222 .to_string();
1223 tracing::warn!("{}", msg);
1224 init_warning = Some(msg);
1225 None
1226 }
1227 }
1228 } else {
1229 None
1230 };
1231 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1232 };
1233
1234 let base = self.config.clone();
1235 let config = AgentConfig {
1236 prompt_slots,
1237 tools: tool_defs,
1238 security_provider: opts.security_provider.clone(),
1239 permission_checker: opts.permission_checker.clone(),
1240 confirmation_manager: opts.confirmation_manager.clone(),
1241 context_providers,
1242 planning_mode: opts.planning_mode,
1243 goal_tracking: opts.goal_tracking,
1244 skill_registry: Some(Arc::clone(&effective_registry)),
1245 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1246 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1247 circuit_breaker_threshold: opts
1248 .circuit_breaker_threshold
1249 .unwrap_or(base.circuit_breaker_threshold),
1250 auto_compact: opts.auto_compact,
1251 auto_compact_threshold: opts
1252 .auto_compact_threshold
1253 .unwrap_or(crate::store::DEFAULT_AUTO_COMPACT_THRESHOLD),
1254 max_context_tokens: base.max_context_tokens,
1255 memory: memory.clone(),
1256 continuation_enabled: opts
1257 .continuation_enabled
1258 .unwrap_or(base.continuation_enabled),
1259 max_continuation_turns: opts
1260 .max_continuation_turns
1261 .unwrap_or(base.max_continuation_turns),
1262 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1263 ..base
1264 };
1265
1266 {
1270 use crate::tools::register_skill;
1271 register_skill(
1272 tool_executor.registry(),
1273 Arc::clone(&llm_client),
1274 Arc::clone(&effective_registry),
1275 Arc::clone(&tool_executor),
1276 config.clone(),
1277 );
1278 }
1279
1280 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1283 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1284 let session_id = uuid::Uuid::new_v4().to_string();
1285 let rt = tokio::runtime::Handle::try_current();
1286
1287 match rt {
1288 Ok(handle) => {
1289 let queue = tokio::task::block_in_place(|| {
1291 handle.block_on(SessionLaneQueue::new(
1292 &session_id,
1293 queue_config.clone(),
1294 agent_event_tx.clone(),
1295 ))
1296 });
1297 match queue {
1298 Ok(q) => {
1299 let q = Arc::new(q);
1301 let q2 = Arc::clone(&q);
1302 tokio::task::block_in_place(|| {
1303 handle.block_on(async { q2.start().await.ok() })
1304 });
1305 Some(q)
1306 }
1307 Err(e) => {
1308 tracing::warn!("Failed to create session lane queue: {}", e);
1309 None
1310 }
1311 }
1312 }
1313 Err(_) => {
1314 tracing::warn!(
1315 "No async runtime available for queue creation — queue disabled"
1316 );
1317 None
1318 }
1319 }
1320 } else {
1321 None
1322 };
1323
1324 let mut tool_context = ToolContext::new(canonical.clone());
1326 if let Some(ref search_config) = self.code_config.search {
1327 tool_context = tool_context.with_search_config(search_config.clone());
1328 }
1329 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1330
1331 if let Some(handle) = opts.sandbox_handle.clone() {
1333 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1334 tool_context = tool_context.with_sandbox(handle);
1335 }
1336
1337 let session_id = opts
1338 .session_id
1339 .clone()
1340 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1341
1342 let session_store = if opts.session_store.is_some() {
1344 opts.session_store.clone()
1345 } else if let Some(ref dir) = self.code_config.sessions_dir {
1346 match tokio::runtime::Handle::try_current() {
1347 Ok(handle) => {
1348 let dir = dir.clone();
1349 match tokio::task::block_in_place(|| {
1350 handle.block_on(crate::store::FileSessionStore::new(dir))
1351 }) {
1352 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1353 Err(e) => {
1354 tracing::warn!(
1355 "Failed to create session store from sessions_dir: {}",
1356 e
1357 );
1358 None
1359 }
1360 }
1361 }
1362 Err(_) => {
1363 tracing::warn!(
1364 "No async runtime for sessions_dir store — persistence disabled"
1365 );
1366 None
1367 }
1368 }
1369 } else {
1370 None
1371 };
1372
1373 let command_registry = CommandRegistry::new();
1374
1375 Ok(AgentSession {
1376 llm_client,
1377 tool_executor,
1378 tool_context,
1379 memory: config.memory.clone(),
1380 config,
1381 workspace: canonical,
1382 session_id,
1383 history: Arc::new(RwLock::new(Vec::new())),
1384 command_queue,
1385 session_store,
1386 auto_save: opts.auto_save,
1387 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1388 ahp_executor: opts.hook_executor.clone(),
1389 init_warning,
1390 command_registry: std::sync::Mutex::new(command_registry),
1391 model_name: opts
1392 .model
1393 .clone()
1394 .or_else(|| self.code_config.default_model.clone())
1395 .unwrap_or_else(|| "unknown".to_string()),
1396 mcp_manager: opts
1397 .mcp_manager
1398 .clone()
1399 .or_else(|| self.global_mcp.clone())
1400 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1401 agent_registry,
1402 cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
1403 current_run_id: Arc::new(tokio::sync::Mutex::new(None)),
1404 run_store: Arc::new(crate::run::InMemoryRunStore::new()),
1405 active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
1406 trace_sink,
1407 verification_reports: Arc::new(RwLock::new(Vec::new())),
1408 })
1409 }
1410}
1411
1412#[derive(Debug, Clone)]
1421pub struct BtwResult {
1422 pub question: String,
1424 pub answer: String,
1426 pub usage: crate::llm::TokenUsage,
1428}
1429
1430pub struct AgentSession {
1440 llm_client: Arc<dyn LlmClient>,
1441 tool_executor: Arc<ToolExecutor>,
1442 tool_context: ToolContext,
1443 config: AgentConfig,
1444 workspace: PathBuf,
1445 session_id: String,
1447 history: Arc<RwLock<Vec<Message>>>,
1449 command_queue: Option<Arc<SessionLaneQueue>>,
1451 memory: Option<Arc<crate::memory::AgentMemory>>,
1453 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1455 auto_save: bool,
1457 hook_engine: Arc<crate::hooks::HookEngine>,
1459 ahp_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
1462 init_warning: Option<String>,
1464 command_registry: std::sync::Mutex<CommandRegistry>,
1467 model_name: String,
1469 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1471 agent_registry: Arc<crate::subagent::AgentRegistry>,
1473 cancel_token: Arc<tokio::sync::Mutex<Option<tokio_util::sync::CancellationToken>>>,
1476 current_run_id: Arc<tokio::sync::Mutex<Option<String>>>,
1478 run_store: Arc<crate::run::InMemoryRunStore>,
1480 active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1482 trace_sink: crate::trace::InMemoryTraceSink,
1484 verification_reports: Arc<RwLock<Vec<crate::verification::VerificationReport>>>,
1486}
1487
1488#[derive(Debug, Clone)]
1489struct ActiveToolSnapshot {
1490 tool_name: String,
1491 started_at_ms: u64,
1492}
1493
1494#[derive(Clone)]
1495struct SessionPersistenceContext {
1496 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1497 session_id: String,
1498 workspace: PathBuf,
1499 config: AgentConfig,
1500 tool_executor: Arc<ToolExecutor>,
1501 trace_sink: crate::trace::InMemoryTraceSink,
1502 run_store: Arc<crate::run::InMemoryRunStore>,
1503 history: Arc<RwLock<Vec<Message>>>,
1504 verification_reports: Arc<RwLock<Vec<crate::verification::VerificationReport>>>,
1505 auto_save: bool,
1506}
1507
1508impl SessionPersistenceContext {
1509 fn from_session(session: &AgentSession) -> Self {
1510 Self {
1511 session_store: session.session_store.clone(),
1512 session_id: session.session_id.clone(),
1513 workspace: session.workspace.clone(),
1514 config: session.config.clone(),
1515 tool_executor: Arc::clone(&session.tool_executor),
1516 trace_sink: session.trace_sink.clone(),
1517 run_store: Arc::clone(&session.run_store),
1518 history: Arc::clone(&session.history),
1519 verification_reports: Arc::clone(&session.verification_reports),
1520 auto_save: session.auto_save,
1521 }
1522 }
1523
1524 fn record_result(&self, result: &AgentResult) {
1525 *write_or_recover(&self.history) = result.messages.clone();
1526 if !result.verification_reports.is_empty() {
1527 write_or_recover(&self.verification_reports)
1528 .extend(result.verification_reports.clone());
1529 }
1530 }
1531
1532 async fn save(&self) -> Result<()> {
1533 let store = match &self.session_store {
1534 Some(store) => store,
1535 None => return Ok(()),
1536 };
1537
1538 let history = read_or_recover(&self.history).clone();
1539 let verification_reports = read_or_recover(&self.verification_reports).clone();
1540 let now = chrono::Utc::now().timestamp();
1541
1542 let data = crate::store::SessionData {
1543 id: self.session_id.clone(),
1544 config: crate::store::SessionConfig {
1545 name: String::new(),
1546 workspace: self.workspace.display().to_string(),
1547 system_prompt: Some(self.config.prompt_slots.build()),
1548 max_context_length: 200_000,
1549 auto_compact: false,
1550 auto_compact_threshold: crate::store::DEFAULT_AUTO_COMPACT_THRESHOLD,
1551 storage_type: crate::config::StorageBackend::File,
1552 queue_config: None,
1553 confirmation_policy: None,
1554 permission_policy: None,
1555 parent_id: None,
1556 security_config: None,
1557 hook_engine: None,
1558 planning_mode: self.config.planning_mode,
1559 goal_tracking: self.config.goal_tracking,
1560 },
1561 state: crate::store::SessionState::Active,
1562 messages: history,
1563 context_usage: crate::store::ContextUsage::default(),
1564 total_usage: crate::llm::TokenUsage::default(),
1565 total_cost: 0.0,
1566 model_name: None,
1567 cost_records: Vec::new(),
1568 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
1569 thinking_enabled: false,
1570 thinking_budget: None,
1571 created_at: now,
1572 updated_at: now,
1573 llm_config: None,
1574 tasks: Vec::new(),
1575 parent_id: None,
1576 };
1577
1578 store.save(&data).await?;
1579 store
1580 .save_artifacts(&self.session_id, &self.tool_executor.artifact_store())
1581 .await?;
1582 store
1583 .save_trace_events(&self.session_id, &self.trace_sink.events())
1584 .await?;
1585 store
1586 .save_run_records(&self.session_id, &self.run_store.records().await)
1587 .await?;
1588 store
1589 .save_verification_reports(&self.session_id, &verification_reports)
1590 .await?;
1591 tracing::debug!("Session {} saved", self.session_id);
1592 Ok(())
1593 }
1594
1595 async fn auto_save_if_enabled(&self) {
1596 if self.auto_save {
1597 if let Err(e) = self.save().await {
1598 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1599 }
1600 }
1601 }
1602}
1603
1604impl std::fmt::Debug for AgentSession {
1605 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1606 f.debug_struct("AgentSession")
1607 .field("session_id", &self.session_id)
1608 .field("workspace", &self.workspace.display().to_string())
1609 .field("auto_save", &self.auto_save)
1610 .finish()
1611 }
1612}
1613
1614impl AgentSession {
1615 fn now_ms() -> u64 {
1616 std::time::SystemTime::now()
1617 .duration_since(std::time::UNIX_EPOCH)
1618 .map(|d| d.as_millis() as u64)
1619 .unwrap_or(0)
1620 }
1621
1622 fn compact_json_value(value: &serde_json::Value) -> String {
1623 let raw = match value {
1624 serde_json::Value::Null => String::new(),
1625 serde_json::Value::String(s) => s.clone(),
1626 _ => serde_json::to_string(value).unwrap_or_default(),
1627 };
1628 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1629 if compact.len() > 180 {
1630 format!("{}...", truncate_utf8(&compact, 180))
1631 } else {
1632 compact
1633 }
1634 }
1635
1636 async fn start_run(&self, prompt: &str) -> crate::run::RunHandle {
1637 let snapshot = self.run_store.create_run(&self.session_id, prompt).await;
1638 *self.current_run_id.lock().await = Some(snapshot.id.clone());
1639 crate::run::RunHandle::new(
1640 snapshot.id,
1641 self.session_id.clone(),
1642 Arc::clone(&self.run_store),
1643 Arc::clone(&self.cancel_token),
1644 Arc::clone(&self.current_run_id),
1645 self.ahp_executor.clone(),
1646 )
1647 }
1648
1649 async fn finish_run_if_current(&self, run_id: &str) {
1650 let mut current = self.current_run_id.lock().await;
1651 if current.as_deref() == Some(run_id) {
1652 *current = None;
1653 }
1654 }
1655
1656 async fn record_runtime_event(
1657 run_store: &Arc<crate::run::InMemoryRunStore>,
1658 run_id: &str,
1659 session_id: &str,
1660 hook_executor: &Option<Arc<dyn crate::hooks::HookExecutor>>,
1661 event: &AgentEvent,
1662 ) {
1663 let _ = run_store.record_event(run_id, event.clone()).await;
1664 if let Some(executor) = hook_executor {
1665 executor.record_agent_event(event, run_id, session_id).await;
1666 }
1667 }
1668
1669 async fn apply_runtime_event(
1670 active_tools: &Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1671 event: &AgentEvent,
1672 ) {
1673 match event {
1674 AgentEvent::ToolStart { id, name } => {
1675 active_tools.write().await.insert(
1676 id.clone(),
1677 ActiveToolSnapshot {
1678 tool_name: name.clone(),
1679 started_at_ms: Self::now_ms(),
1680 },
1681 );
1682 }
1683 AgentEvent::ToolEnd { id, .. }
1684 | AgentEvent::PermissionDenied { tool_id: id, .. }
1685 | AgentEvent::ConfirmationRequired { tool_id: id, .. }
1686 | AgentEvent::ConfirmationReceived { tool_id: id, .. }
1687 | AgentEvent::ConfirmationTimeout { tool_id: id, .. } => {
1688 active_tools.write().await.remove(id);
1689 }
1690 _ => {}
1691 }
1692 }
1693
1694 async fn clear_runtime_tracking(&self) {
1695 self.active_tools.write().await.clear();
1696 }
1697
1698 fn build_agent_loop(&self) -> AgentLoop {
1702 let mut config = self.config.clone();
1703 config.hook_engine = Some(if let Some(ref ahp) = self.ahp_executor {
1704 ahp.clone()
1705 } else {
1706 Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>
1707 });
1708 config.tools = self.tool_executor.definitions();
1712 let mut agent_loop = AgentLoop::new(
1713 self.llm_client.clone(),
1714 self.tool_executor.clone(),
1715 self.tool_context.clone(),
1716 config,
1717 );
1718 if let Some(ref queue) = self.command_queue {
1719 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1720 }
1721 agent_loop
1722 }
1723
1724 fn build_command_context(&self) -> CommandContext {
1726 let history = read_or_recover(&self.history);
1727
1728 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1730
1731 let mut mcp_map: std::collections::HashMap<String, usize> =
1733 std::collections::HashMap::new();
1734 for name in &tool_names {
1735 if let Some(rest) = name.strip_prefix("mcp__") {
1736 if let Some((server, _)) = rest.split_once("__") {
1737 *mcp_map.entry(server.to_string()).or_default() += 1;
1738 }
1739 }
1740 }
1741 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1742 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1743
1744 CommandContext {
1745 session_id: self.session_id.clone(),
1746 workspace: self.workspace.display().to_string(),
1747 model: self.model_name.clone(),
1748 history_len: history.len(),
1749 total_tokens: 0,
1750 total_cost: 0.0,
1751 tool_names,
1752 mcp_servers,
1753 }
1754 }
1755
1756 pub fn command_registry(&self) -> std::sync::MutexGuard<'_, CommandRegistry> {
1760 self.command_registry
1761 .lock()
1762 .expect("command_registry lock poisoned")
1763 }
1764
1765 pub fn register_command(&self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1769 self.command_registry
1770 .lock()
1771 .expect("command_registry lock poisoned")
1772 .register(cmd);
1773 }
1774
1775 pub async fn close(&self) {
1777 let _ = self.cancel().await;
1778 }
1779
1780 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1789 if CommandRegistry::is_command(prompt) {
1791 let ctx = self.build_command_context();
1792 let output = self.command_registry().dispatch(prompt, &ctx);
1793 if let Some(output) = output {
1795 if let Some(CommandAction::BtwQuery(ref question)) = output.action {
1797 let result = self.btw(question).await?;
1798 return Ok(AgentResult {
1799 text: result.answer,
1800 messages: history
1801 .map(|h| h.to_vec())
1802 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1803 tool_calls_count: 0,
1804 usage: result.usage,
1805 verification_reports: Vec::new(),
1806 });
1807 }
1808 return Ok(AgentResult {
1809 text: output.text,
1810 messages: history
1811 .map(|h| h.to_vec())
1812 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1813 tool_calls_count: 0,
1814 usage: crate::llm::TokenUsage::default(),
1815 verification_reports: Vec::new(),
1816 });
1817 }
1818 }
1819
1820 if let Some(ref w) = self.init_warning {
1821 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1822 }
1823 let run = self.start_run(prompt).await;
1824 let run_id = run.id().to_string();
1825 let agent_loop = self.build_agent_loop();
1826 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
1827 let runtime_state = Arc::clone(&self.active_tools);
1828 let run_store = Arc::clone(&self.run_store);
1829 let collector_run_id = run_id.clone();
1830 let collector_session_id = self.session_id.clone();
1831 let collector_hook_executor = self.ahp_executor.clone();
1832 let runtime_collector = tokio::spawn(async move {
1833 while let Some(event) = runtime_rx.recv().await {
1834 AgentSession::record_runtime_event(
1835 &run_store,
1836 &collector_run_id,
1837 &collector_session_id,
1838 &collector_hook_executor,
1839 &event,
1840 )
1841 .await;
1842 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1843 }
1844 });
1845
1846 let use_internal = history.is_none();
1847 let effective_history = match history {
1848 Some(h) => h.to_vec(),
1849 None => read_or_recover(&self.history).clone(),
1850 };
1851
1852 let cancel_token = tokio_util::sync::CancellationToken::new();
1853 *self.cancel_token.lock().await = Some(cancel_token.clone());
1854 let result = agent_loop
1855 .execute_with_session(
1856 &effective_history,
1857 prompt,
1858 Some(&self.session_id),
1859 Some(runtime_tx),
1860 Some(&cancel_token),
1861 )
1862 .await;
1863 *self.cancel_token.lock().await = None;
1864 let _ = runtime_collector.await;
1865 let result = match result {
1866 Ok(result) => result,
1867 Err(error) => {
1868 let _ = self.run_store.mark_failed(&run_id, error.to_string()).await;
1869 self.clear_runtime_tracking().await;
1870 self.finish_run_if_current(&run_id).await;
1871 return Err(error.into());
1872 }
1873 };
1874
1875 if use_internal {
1878 *write_or_recover(&self.history) = result.messages.clone();
1879 self.record_verification_reports(result.verification_reports.clone());
1880
1881 if self.auto_save {
1883 if let Err(e) = self.save().await {
1884 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1885 }
1886 }
1887 }
1888
1889 self.clear_runtime_tracking().await;
1890 self.finish_run_if_current(&run_id).await;
1891
1892 Ok(result)
1893 }
1894
1895 async fn build_btw_runtime_context(&self) -> String {
1896 let mut sections = Vec::new();
1897
1898 let active_tools = {
1899 let tools = self.active_tools.read().await;
1900 let mut items = tools
1901 .iter()
1902 .map(|(tool_id, tool)| {
1903 let elapsed_ms = Self::now_ms().saturating_sub(tool.started_at_ms);
1904 format!(
1905 "- {} [{}] running_for={}ms",
1906 tool.tool_name, tool_id, elapsed_ms
1907 )
1908 })
1909 .collect::<Vec<_>>();
1910 items.sort();
1911 items
1912 };
1913 if !active_tools.is_empty() {
1914 sections.push(format!("[active tools]\n{}", active_tools.join("\n")));
1915 }
1916
1917 if let Some(cm) = &self.config.confirmation_manager {
1918 let pending = cm.pending_confirmations().await;
1919 if !pending.is_empty() {
1920 let mut lines = pending
1921 .into_iter()
1922 .map(
1923 |PendingConfirmationInfo {
1924 tool_id,
1925 tool_name,
1926 args,
1927 remaining_ms,
1928 }| {
1929 let arg_summary = Self::compact_json_value(&args);
1930 if arg_summary.is_empty() {
1931 format!(
1932 "- {} [{}] remaining={}ms",
1933 tool_name, tool_id, remaining_ms
1934 )
1935 } else {
1936 format!(
1937 "- {} [{}] remaining={}ms {}",
1938 tool_name, tool_id, remaining_ms, arg_summary
1939 )
1940 }
1941 },
1942 )
1943 .collect::<Vec<_>>();
1944 lines.sort();
1945 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1946 }
1947 }
1948
1949 if let Some(queue) = &self.command_queue {
1950 let stats = queue.stats().await;
1951 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1952 let mut lines = vec![format!(
1953 "active={}, pending={}, external_pending={}",
1954 stats.total_active, stats.total_pending, stats.external_pending
1955 )];
1956 let mut lanes = stats
1957 .lanes
1958 .into_values()
1959 .filter(|lane| lane.active > 0 || lane.pending > 0)
1960 .map(|lane| {
1961 format!(
1962 "- {:?}: active={}, pending={}, handler={:?}",
1963 lane.lane, lane.active, lane.pending, lane.handler_mode
1964 )
1965 })
1966 .collect::<Vec<_>>();
1967 lanes.sort();
1968 lines.extend(lanes);
1969 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1970 }
1971
1972 let external_tasks = queue.pending_external_tasks().await;
1973 if !external_tasks.is_empty() {
1974 let mut lines = external_tasks
1975 .into_iter()
1976 .take(6)
1977 .map(|task| {
1978 let payload_summary = Self::compact_json_value(&task.payload);
1979 if payload_summary.is_empty() {
1980 format!(
1981 "- {} {:?} remaining={}ms",
1982 task.command_type,
1983 task.lane,
1984 task.remaining_ms()
1985 )
1986 } else {
1987 format!(
1988 "- {} {:?} remaining={}ms {}",
1989 task.command_type,
1990 task.lane,
1991 task.remaining_ms(),
1992 payload_summary
1993 )
1994 }
1995 })
1996 .collect::<Vec<_>>();
1997 lines.sort();
1998 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
1999 }
2000 }
2001
2002 if let Some(store) = &self.session_store {
2003 if let Ok(Some(session)) = store.load(&self.session_id).await {
2004 let active_tasks = session
2005 .tasks
2006 .into_iter()
2007 .filter(|task| task.status.is_active())
2008 .take(6)
2009 .map(|task| match task.tool {
2010 Some(tool) if !tool.is_empty() => {
2011 format!("- [{}] {} ({})", task.status, task.content, tool)
2012 }
2013 _ => format!("- [{}] {}", task.status, task.content),
2014 })
2015 .collect::<Vec<_>>();
2016 if !active_tasks.is_empty() {
2017 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
2018 }
2019 }
2020 }
2021
2022 sections.join("\n\n")
2023 }
2024
2025 pub async fn btw(&self, question: &str) -> Result<BtwResult> {
2043 self.btw_with_context(question, None).await
2044 }
2045
2046 pub async fn btw_with_context(
2051 &self,
2052 question: &str,
2053 runtime_context: Option<&str>,
2054 ) -> Result<BtwResult> {
2055 let question = question.trim();
2056 if question.is_empty() {
2057 return Err(crate::error::CodeError::Session(
2058 "btw: question cannot be empty".to_string(),
2059 ));
2060 }
2061
2062 let history_snapshot = read_or_recover(&self.history).clone();
2064
2065 let mut messages = history_snapshot;
2067 let mut injected_sections = Vec::new();
2068 let session_runtime = self.build_btw_runtime_context().await;
2069 if !session_runtime.is_empty() {
2070 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
2071 }
2072 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
2073 injected_sections.push(format!("[host runtime context]\n{}", extra));
2074 }
2075 if !injected_sections.is_empty() {
2076 let injected_context = format!(
2077 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
2078 injected_sections.join("\n\n")
2079 );
2080 messages.push(Message::user(&injected_context));
2081 }
2082 messages.push(Message::user(question));
2083
2084 let response = self
2085 .llm_client
2086 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2087 .await
2088 .map_err(|e| {
2089 crate::error::CodeError::Llm(format!("btw: ephemeral LLM call failed: {e}"))
2090 })?;
2091
2092 Ok(BtwResult {
2093 question: question.to_string(),
2094 answer: response.text(),
2095 usage: response.usage,
2096 })
2097 }
2098
2099 pub async fn send_with_attachments(
2104 &self,
2105 prompt: &str,
2106 attachments: &[crate::llm::Attachment],
2107 history: Option<&[Message]>,
2108 ) -> Result<AgentResult> {
2109 let use_internal = history.is_none();
2113 let mut effective_history = match history {
2114 Some(h) => h.to_vec(),
2115 None => read_or_recover(&self.history).clone(),
2116 };
2117 effective_history.push(Message::user_with_attachments(prompt, attachments));
2118
2119 let run = self.start_run(prompt).await;
2120 let run_id = run.id().to_string();
2121 let agent_loop = self.build_agent_loop();
2122 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2123 let runtime_state = Arc::clone(&self.active_tools);
2124 let run_store = Arc::clone(&self.run_store);
2125 let collector_run_id = run_id.clone();
2126 let collector_session_id = self.session_id.clone();
2127 let collector_hook_executor = self.ahp_executor.clone();
2128 let runtime_collector = tokio::spawn(async move {
2129 while let Some(event) = runtime_rx.recv().await {
2130 AgentSession::record_runtime_event(
2131 &run_store,
2132 &collector_run_id,
2133 &collector_session_id,
2134 &collector_hook_executor,
2135 &event,
2136 )
2137 .await;
2138 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2139 }
2140 });
2141
2142 let cancel_token = tokio_util::sync::CancellationToken::new();
2143 *self.cancel_token.lock().await = Some(cancel_token.clone());
2144 let result = agent_loop
2145 .execute_from_messages(
2146 effective_history,
2147 Some(&self.session_id),
2148 Some(runtime_tx),
2149 Some(&cancel_token),
2150 )
2151 .await;
2152 *self.cancel_token.lock().await = None;
2153 let _ = runtime_collector.await;
2154 let result = match result {
2155 Ok(result) => result,
2156 Err(error) => {
2157 let _ = self.run_store.mark_failed(&run_id, error.to_string()).await;
2158 self.clear_runtime_tracking().await;
2159 self.finish_run_if_current(&run_id).await;
2160 return Err(error.into());
2161 }
2162 };
2163
2164 if use_internal {
2165 *write_or_recover(&self.history) = result.messages.clone();
2166 self.record_verification_reports(result.verification_reports.clone());
2167 if self.auto_save {
2168 if let Err(e) = self.save().await {
2169 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2170 }
2171 }
2172 }
2173
2174 self.clear_runtime_tracking().await;
2175 self.finish_run_if_current(&run_id).await;
2176
2177 Ok(result)
2178 }
2179
2180 pub async fn stream_with_attachments(
2185 &self,
2186 prompt: &str,
2187 attachments: &[crate::llm::Attachment],
2188 history: Option<&[Message]>,
2189 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2190 let (tx, rx) = mpsc::channel(256);
2191 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2192 let use_internal = history.is_none();
2193 let mut effective_history = match history {
2194 Some(h) => h.to_vec(),
2195 None => read_or_recover(&self.history).clone(),
2196 };
2197 effective_history.push(Message::user_with_attachments(prompt, attachments));
2198
2199 let run = self.start_run(prompt).await;
2200 let run_id = run.id().to_string();
2201 let agent_loop = self.build_agent_loop();
2202 let persistence = use_internal.then(|| SessionPersistenceContext::from_session(self));
2203 let session_id = self.session_id.clone();
2204 let cancel_token = tokio_util::sync::CancellationToken::new();
2205 *self.cancel_token.lock().await = Some(cancel_token.clone());
2206 let token_clone = cancel_token.clone();
2207 let runtime_state = Arc::clone(&self.active_tools);
2208 let run_store = Arc::clone(&self.run_store);
2209 let forwarder_run_id = run_id.clone();
2210 let forwarder_session_id = self.session_id.clone();
2211 let forwarder_hook_executor = self.ahp_executor.clone();
2212 let should_auto_save = Arc::new(std::sync::atomic::AtomicBool::new(false));
2213 let forwarder = tokio::spawn(async move {
2214 while let Some(event) = runtime_rx.recv().await {
2215 AgentSession::record_runtime_event(
2216 &run_store,
2217 &forwarder_run_id,
2218 &forwarder_session_id,
2219 &forwarder_hook_executor,
2220 &event,
2221 )
2222 .await;
2223 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2224 if tx.send(event).await.is_err() {
2225 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2228 break;
2229 }
2230 }
2231 });
2232 let run_store = Arc::clone(&self.run_store);
2233 let worker_run_id = run_id.clone();
2234 let persistence_for_worker = persistence.clone();
2235 let should_auto_save_for_worker = Arc::clone(&should_auto_save);
2236 let handle = tokio::spawn(async move {
2237 let result = agent_loop
2238 .execute_from_messages(
2239 effective_history,
2240 Some(&session_id),
2241 Some(runtime_tx),
2242 Some(&token_clone),
2243 )
2244 .await;
2245 match result {
2246 Ok(result) => {
2247 if let Some(persistence) = persistence_for_worker {
2248 persistence.record_result(&result);
2249 should_auto_save_for_worker
2250 .store(true, std::sync::atomic::Ordering::Release);
2251 }
2252 }
2253 Err(error) => {
2254 let _ = run_store
2255 .mark_failed(&worker_run_id, error.to_string())
2256 .await;
2257 }
2258 }
2259 });
2260 let active_tools = Arc::clone(&self.active_tools);
2261 let current_run_id = Arc::clone(&self.current_run_id);
2262 let cancel_token_ref = self.cancel_token.clone();
2263 let wrapped_handle = tokio::spawn(async move {
2264 let _ = handle.await;
2265 let _ = forwarder.await;
2266 if should_auto_save.load(std::sync::atomic::Ordering::Acquire) {
2267 if let Some(persistence) = persistence {
2268 persistence.auto_save_if_enabled().await;
2269 }
2270 }
2271 *cancel_token_ref.lock().await = None;
2272 active_tools.write().await.clear();
2273 let mut current = current_run_id.lock().await;
2274 if current.as_deref() == Some(run_id.as_str()) {
2275 *current = None;
2276 }
2277 });
2278
2279 Ok((rx, wrapped_handle))
2280 }
2281
2282 pub async fn stream(
2291 &self,
2292 prompt: &str,
2293 history: Option<&[Message]>,
2294 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2295 if CommandRegistry::is_command(prompt) {
2297 let ctx = self.build_command_context();
2298 let output = self.command_registry().dispatch(prompt, &ctx);
2299 if let Some(output) = output {
2301 let (tx, rx) = mpsc::channel(256);
2302
2303 if let Some(CommandAction::BtwQuery(question)) = output.action {
2305 let llm_client = self.llm_client.clone();
2307 let history_snapshot = read_or_recover(&self.history).clone();
2308 let handle = tokio::spawn(async move {
2309 let mut messages = history_snapshot;
2310 messages.push(Message::user(&question));
2311 match llm_client
2312 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2313 .await
2314 {
2315 Ok(response) => {
2316 let answer = response.text();
2317 let _ = tx
2318 .send(AgentEvent::BtwAnswer {
2319 question: question.clone(),
2320 answer: answer.clone(),
2321 usage: response.usage,
2322 })
2323 .await;
2324 let _ = tx
2325 .send(AgentEvent::End {
2326 text: answer,
2327 usage: crate::llm::TokenUsage::default(),
2328 verification_summary: Box::new(
2329 crate::verification::VerificationSummary::from_reports(
2330 &[],
2331 ),
2332 ),
2333 meta: None,
2334 })
2335 .await;
2336 }
2337 Err(e) => {
2338 let _ = tx
2339 .send(AgentEvent::Error {
2340 message: format!("btw failed: {e}"),
2341 })
2342 .await;
2343 }
2344 }
2345 });
2346 return Ok((rx, handle));
2347 }
2348
2349 let handle = tokio::spawn(async move {
2350 let _ = tx
2351 .send(AgentEvent::TextDelta {
2352 text: output.text.clone(),
2353 })
2354 .await;
2355 let _ = tx
2356 .send(AgentEvent::End {
2357 text: output.text.clone(),
2358 usage: crate::llm::TokenUsage::default(),
2359 verification_summary: Box::new(
2360 crate::verification::VerificationSummary::from_reports(&[]),
2361 ),
2362 meta: None,
2363 })
2364 .await;
2365 });
2366 return Ok((rx, handle));
2367 }
2368 }
2369
2370 let (tx, rx) = mpsc::channel(256);
2371 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2372 let agent_loop = self.build_agent_loop();
2373 let use_internal = history.is_none();
2374 let effective_history = match history {
2375 Some(h) => h.to_vec(),
2376 None => read_or_recover(&self.history).clone(),
2377 };
2378 let run = self.start_run(prompt).await;
2379 let run_id = run.id().to_string();
2380 let prompt = prompt.to_string();
2381 let session_id = self.session_id.clone();
2382 let persistence = use_internal.then(|| SessionPersistenceContext::from_session(self));
2383
2384 let cancel_token = tokio_util::sync::CancellationToken::new();
2385 *self.cancel_token.lock().await = Some(cancel_token.clone());
2386 let token_clone = cancel_token.clone();
2387 let runtime_state = Arc::clone(&self.active_tools);
2388 let run_store = Arc::clone(&self.run_store);
2389 let forwarder_run_id = run_id.clone();
2390 let forwarder_session_id = self.session_id.clone();
2391 let forwarder_hook_executor = self.ahp_executor.clone();
2392 let should_auto_save = Arc::new(std::sync::atomic::AtomicBool::new(false));
2393 let forwarder = tokio::spawn(async move {
2394 while let Some(event) = runtime_rx.recv().await {
2395 AgentSession::record_runtime_event(
2396 &run_store,
2397 &forwarder_run_id,
2398 &forwarder_session_id,
2399 &forwarder_hook_executor,
2400 &event,
2401 )
2402 .await;
2403 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2404 if tx.send(event).await.is_err() {
2405 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2408 break;
2409 }
2410 }
2411 });
2412
2413 let run_store = Arc::clone(&self.run_store);
2414 let worker_run_id = run_id.clone();
2415 let persistence_for_worker = persistence.clone();
2416 let should_auto_save_for_worker = Arc::clone(&should_auto_save);
2417 let handle = tokio::spawn(async move {
2418 let result = agent_loop
2419 .execute_with_session(
2420 &effective_history,
2421 &prompt,
2422 Some(&session_id),
2423 Some(runtime_tx),
2424 Some(&token_clone),
2425 )
2426 .await;
2427 match result {
2428 Ok(result) => {
2429 if let Some(persistence) = persistence_for_worker {
2430 persistence.record_result(&result);
2431 should_auto_save_for_worker
2432 .store(true, std::sync::atomic::Ordering::Release);
2433 }
2434 }
2435 Err(error) => {
2436 let _ = run_store
2437 .mark_failed(&worker_run_id, error.to_string())
2438 .await;
2439 }
2440 }
2441 });
2442
2443 let cancel_token_ref = self.cancel_token.clone();
2445 let active_tools = Arc::clone(&self.active_tools);
2446 let current_run_id = Arc::clone(&self.current_run_id);
2447 let wrapped_handle = tokio::spawn(async move {
2448 let _ = handle.await;
2449 let _ = forwarder.await;
2450 if should_auto_save.load(std::sync::atomic::Ordering::Acquire) {
2451 if let Some(persistence) = persistence {
2452 persistence.auto_save_if_enabled().await;
2453 }
2454 }
2455 *cancel_token_ref.lock().await = None;
2456 active_tools.write().await.clear();
2457 let mut current = current_run_id.lock().await;
2458 if current.as_deref() == Some(run_id.as_str()) {
2459 *current = None;
2460 }
2461 });
2462
2463 Ok((rx, wrapped_handle))
2464 }
2465
2466 pub async fn cancel(&self) -> bool {
2473 let token = self.cancel_token.lock().await.clone();
2474 if let Some(token) = token {
2475 token.cancel();
2476 if let Some(run_id) = self.current_run_id.lock().await.clone() {
2477 let _ = self.run_store.mark_cancelled(&run_id).await;
2478 if let Some(executor) = &self.ahp_executor {
2479 executor
2480 .record_run_cancelled(&run_id, &self.session_id, Some("cancelled by host"))
2481 .await;
2482 }
2483 }
2484 tracing::info!(session_id = %self.session_id, "Cancelled ongoing operation");
2485 true
2486 } else {
2487 tracing::debug!(session_id = %self.session_id, "No ongoing operation to cancel");
2488 false
2489 }
2490 }
2491
2492 pub async fn cancel_run(&self, run_id: &str) -> bool {
2497 match self.current_run().await {
2498 Some(run) if run.id() == run_id => run.cancel().await,
2499 _ => false,
2500 }
2501 }
2502
2503 pub async fn runs(&self) -> Vec<crate::run::RunSnapshot> {
2505 self.run_store.list().await
2506 }
2507
2508 pub async fn run_snapshot(&self, run_id: &str) -> Option<crate::run::RunSnapshot> {
2510 self.run_store.snapshot(run_id).await
2511 }
2512
2513 pub async fn run_events(&self, run_id: &str) -> Vec<crate::run::RunEventRecord> {
2515 self.run_store.events(run_id).await
2516 }
2517
2518 pub async fn current_run(&self) -> Option<crate::run::RunHandle> {
2520 let run_id = self.current_run_id.lock().await.clone()?;
2521 let snapshot = self.run_store.snapshot(&run_id).await?;
2522 Some(crate::run::RunHandle::new(
2523 snapshot.id,
2524 snapshot.session_id,
2525 Arc::clone(&self.run_store),
2526 Arc::clone(&self.cancel_token),
2527 Arc::clone(&self.current_run_id),
2528 self.ahp_executor.clone(),
2529 ))
2530 }
2531
2532 pub fn history(&self) -> Vec<Message> {
2534 read_or_recover(&self.history).clone()
2535 }
2536
2537 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
2539 self.memory.as_ref()
2540 }
2541
2542 pub fn id(&self) -> &str {
2544 &self.session_id
2545 }
2546
2547 pub fn workspace(&self) -> &std::path::Path {
2549 &self.workspace
2550 }
2551
2552 pub fn init_warning(&self) -> Option<&str> {
2554 self.init_warning.as_deref()
2555 }
2556
2557 pub fn session_id(&self) -> &str {
2559 &self.session_id
2560 }
2561
2562 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
2568 self.tool_executor.definitions()
2569 }
2570
2571 pub fn tool_names(&self) -> Vec<String> {
2577 self.tool_executor
2578 .definitions()
2579 .into_iter()
2580 .map(|t| t.name)
2581 .collect()
2582 }
2583
2584 pub fn get_artifact(&self, artifact_uri: &str) -> Option<crate::tools::ToolArtifact> {
2586 self.tool_executor.get_artifact(artifact_uri)
2587 }
2588
2589 pub fn trace_events(&self) -> Vec<crate::trace::TraceEvent> {
2591 self.trace_sink.events()
2592 }
2593
2594 pub fn verification_reports(&self) -> Vec<crate::verification::VerificationReport> {
2596 read_or_recover(&self.verification_reports).clone()
2597 }
2598
2599 pub fn verification_summary(&self) -> crate::verification::VerificationSummary {
2601 crate::verification::VerificationSummary::from_reports(&self.verification_reports())
2602 }
2603
2604 pub fn verification_summary_text(&self) -> String {
2606 crate::verification::format_verification_summary(&self.verification_summary())
2607 }
2608
2609 pub fn record_verification_reports(
2611 &self,
2612 reports: impl IntoIterator<Item = crate::verification::VerificationReport>,
2613 ) {
2614 let mut target = write_or_recover(&self.verification_reports);
2615 target.extend(reports);
2616 }
2617
2618 pub fn register_hook(&self, hook: crate::hooks::Hook) {
2624 self.hook_engine.register(hook);
2625 }
2626
2627 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
2629 self.hook_engine.unregister(hook_id)
2630 }
2631
2632 pub fn register_hook_handler(
2634 &self,
2635 hook_id: &str,
2636 handler: Arc<dyn crate::hooks::HookHandler>,
2637 ) {
2638 self.hook_engine.register_handler(hook_id, handler);
2639 }
2640
2641 pub fn unregister_hook_handler(&self, hook_id: &str) {
2643 self.hook_engine.unregister_handler(hook_id);
2644 }
2645
2646 pub fn hook_count(&self) -> usize {
2648 self.hook_engine.hook_count()
2649 }
2650
2651 pub async fn save(&self) -> Result<()> {
2655 SessionPersistenceContext::from_session(self).save().await
2656 }
2657
2658 pub async fn read_file(&self, path: &str) -> Result<String> {
2660 let args = serde_json::json!({ "file_path": path });
2661 let result = self.tool_executor.execute("read", &args).await?;
2662 Ok(result.output)
2663 }
2664
2665 pub async fn bash(&self, command: &str) -> Result<String> {
2671 let args = serde_json::json!({ "command": command });
2672 let result = self
2673 .tool_executor
2674 .execute_with_context("bash", &args, &self.tool_context)
2675 .await?;
2676 Ok(result.output)
2677 }
2678
2679 pub async fn verify_commands(
2681 &self,
2682 subject: &str,
2683 commands: &[crate::verification::VerificationCommand],
2684 ) -> Result<crate::verification::VerificationReport> {
2685 let mut checks = Vec::with_capacity(commands.len());
2686
2687 for command in commands {
2688 let mut args = serde_json::json!({ "command": command.command });
2689 if let Some(timeout_ms) = command.timeout_ms {
2690 args["timeout"] = serde_json::json!(timeout_ms);
2691 }
2692
2693 let check = match self
2694 .tool_executor
2695 .execute_with_context("bash", &args, &self.tool_context)
2696 .await
2697 {
2698 Ok(result) => {
2699 let exit_code = result
2700 .metadata
2701 .as_ref()
2702 .and_then(|metadata| metadata.get("exit_code"))
2703 .and_then(|value| value.as_i64())
2704 .and_then(|value| i32::try_from(value).ok())
2705 .unwrap_or(result.exit_code);
2706 command.check_from_execution(exit_code, result.metadata.as_ref(), None)
2707 }
2708 Err(err) => command.check_from_execution(1, None, Some(&err.to_string())),
2709 };
2710 checks.push(check);
2711 }
2712
2713 let report = crate::verification::VerificationReport::new(subject, checks);
2714 self.record_verification_reports([report.clone()]);
2715 if self.auto_save {
2716 if let Err(e) = self.save().await {
2717 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2718 }
2719 }
2720
2721 Ok(report)
2722 }
2723
2724 pub fn verification_presets(&self) -> Vec<crate::verification::VerificationPreset> {
2726 crate::verification::verification_presets_for_workspace(&self.workspace)
2727 }
2728
2729 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
2731 let args = serde_json::json!({ "pattern": pattern });
2732 let result = self.tool_executor.execute("glob", &args).await?;
2733 let files: Vec<String> = result
2734 .output
2735 .lines()
2736 .filter(|l| !l.is_empty())
2737 .map(|l| l.to_string())
2738 .collect();
2739 Ok(files)
2740 }
2741
2742 pub async fn grep(&self, pattern: &str) -> Result<String> {
2744 let args = serde_json::json!({ "pattern": pattern });
2745 let result = self.tool_executor.execute("grep", &args).await?;
2746 Ok(result.output)
2747 }
2748
2749 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
2751 let result = self.tool_executor.execute(name, &args).await?;
2752 Ok(ToolCallResult {
2753 name: name.to_string(),
2754 output: result.output,
2755 exit_code: result.exit_code,
2756 metadata: result.metadata,
2757 })
2758 }
2759
2760 pub fn has_queue(&self) -> bool {
2766 self.command_queue.is_some()
2767 }
2768
2769 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
2773 if let Some(ref queue) = self.command_queue {
2774 queue.set_lane_handler(lane, config).await;
2775 }
2776 }
2777
2778 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
2782 if let Some(ref queue) = self.command_queue {
2783 queue.complete_external_task(task_id, result).await
2784 } else {
2785 false
2786 }
2787 }
2788
2789 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
2791 if let Some(ref queue) = self.command_queue {
2792 queue.pending_external_tasks().await
2793 } else {
2794 Vec::new()
2795 }
2796 }
2797
2798 pub async fn queue_stats(&self) -> SessionQueueStats {
2800 if let Some(ref queue) = self.command_queue {
2801 queue.stats().await
2802 } else {
2803 SessionQueueStats::default()
2804 }
2805 }
2806
2807 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
2809 if let Some(ref queue) = self.command_queue {
2810 queue.metrics_snapshot().await
2811 } else {
2812 None
2813 }
2814 }
2815
2816 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
2818 if let Some(ref queue) = self.command_queue {
2819 queue.dead_letters().await
2820 } else {
2821 Vec::new()
2822 }
2823 }
2824
2825 pub fn register_agent_dir(&self, dir: &std::path::Path) -> usize {
2838 use crate::subagent::load_agents_from_dir;
2839 let agents = load_agents_from_dir(dir);
2840 let count = agents.len();
2841 for agent in agents {
2842 tracing::info!(
2843 session_id = %self.session_id,
2844 agent = agent.name,
2845 dir = %dir.display(),
2846 "Dynamically registered agent"
2847 );
2848 self.agent_registry.register(agent);
2849 }
2850 count
2851 }
2852
2853 pub async fn add_mcp_server(
2860 &self,
2861 config: crate::mcp::McpServerConfig,
2862 ) -> crate::error::Result<usize> {
2863 let server_name = config.name.clone();
2864 self.mcp_manager.register_server(config).await;
2865 self.mcp_manager.connect(&server_name).await.map_err(|e| {
2866 crate::error::CodeError::Tool {
2867 tool: server_name.clone(),
2868 message: format!("Failed to connect MCP server: {}", e),
2869 }
2870 })?;
2871
2872 let tools = self.mcp_manager.get_server_tools(&server_name).await;
2873 let count = tools.len();
2874
2875 for tool in
2876 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
2877 {
2878 self.tool_executor.register_dynamic_tool(tool);
2879 }
2880
2881 tracing::info!(
2882 session_id = %self.session_id,
2883 server = server_name,
2884 tools = count,
2885 "MCP server added to live session"
2886 );
2887
2888 Ok(count)
2889 }
2890
2891 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
2896 self.tool_executor
2897 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
2898 self.mcp_manager
2899 .disconnect(server_name)
2900 .await
2901 .map_err(|e| crate::error::CodeError::Tool {
2902 tool: server_name.to_string(),
2903 message: format!("Failed to disconnect MCP server: {}", e),
2904 })?;
2905 tracing::info!(
2906 session_id = %self.session_id,
2907 server = server_name,
2908 "MCP server removed from live session"
2909 );
2910 Ok(())
2911 }
2912
2913 pub async fn mcp_status(
2915 &self,
2916 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
2917 self.mcp_manager.get_status().await
2918 }
2919}
2920
2921#[cfg(test)]
2926mod tests {
2927 use super::*;
2928 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
2929 use crate::llm::{ContentBlock, LlmResponse, StreamEvent, TokenUsage};
2930 use crate::store::SessionStore;
2931
2932 #[derive(Clone)]
2933 struct StaticStreamingClient {
2934 text: String,
2935 }
2936
2937 impl StaticStreamingClient {
2938 fn new(text: impl Into<String>) -> Self {
2939 Self { text: text.into() }
2940 }
2941
2942 fn response(&self) -> LlmResponse {
2943 LlmResponse {
2944 message: Message {
2945 role: "assistant".to_string(),
2946 content: vec![ContentBlock::Text {
2947 text: self.text.clone(),
2948 }],
2949 reasoning_content: None,
2950 },
2951 usage: TokenUsage {
2952 prompt_tokens: 1,
2953 completion_tokens: 1,
2954 total_tokens: 2,
2955 cache_read_tokens: None,
2956 cache_write_tokens: None,
2957 },
2958 stop_reason: Some("end_turn".to_string()),
2959 meta: None,
2960 }
2961 }
2962 }
2963
2964 #[derive(Clone)]
2965 struct FailingStreamingClient;
2966
2967 #[derive(Clone)]
2968 struct CancellableStreamingClient {
2969 text: String,
2970 }
2971
2972 #[derive(Debug, Default)]
2973 struct RecordingRuntimeHook {
2974 events: std::sync::Mutex<Vec<(String, String, AgentEvent)>>,
2975 }
2976
2977 #[derive(Debug, Default)]
2978 struct CapturingContextProvider {
2979 session_ids: std::sync::Mutex<Vec<Option<String>>>,
2980 }
2981
2982 #[async_trait::async_trait]
2983 impl crate::context::ContextProvider for CapturingContextProvider {
2984 fn name(&self) -> &str {
2985 "capturing-context"
2986 }
2987
2988 async fn query(
2989 &self,
2990 query: &crate::context::ContextQuery,
2991 ) -> anyhow::Result<crate::context::ContextResult> {
2992 self.session_ids
2993 .lock()
2994 .unwrap()
2995 .push(query.session_id.clone());
2996 Ok(crate::context::ContextResult::new(self.name()))
2997 }
2998 }
2999
3000 #[async_trait::async_trait]
3001 impl crate::hooks::HookExecutor for RecordingRuntimeHook {
3002 async fn fire(&self, _event: &crate::hooks::HookEvent) -> crate::hooks::HookResult {
3003 crate::hooks::HookResult::Continue(None)
3004 }
3005
3006 async fn record_agent_event(&self, event: &AgentEvent, run_id: &str, session_id: &str) {
3007 self.events.lock().unwrap().push((
3008 run_id.to_string(),
3009 session_id.to_string(),
3010 event.clone(),
3011 ));
3012 }
3013 }
3014
3015 impl CancellableStreamingClient {
3016 fn new(text: impl Into<String>) -> Self {
3017 Self { text: text.into() }
3018 }
3019 }
3020
3021 #[async_trait::async_trait]
3022 impl LlmClient for StaticStreamingClient {
3023 async fn complete(
3024 &self,
3025 _messages: &[Message],
3026 _system: Option<&str>,
3027 _tools: &[crate::llm::ToolDefinition],
3028 ) -> anyhow::Result<LlmResponse> {
3029 Ok(self.response())
3030 }
3031
3032 async fn complete_streaming(
3033 &self,
3034 _messages: &[Message],
3035 _system: Option<&str>,
3036 _tools: &[crate::llm::ToolDefinition],
3037 _cancel_token: tokio_util::sync::CancellationToken,
3038 ) -> anyhow::Result<mpsc::Receiver<StreamEvent>> {
3039 let (tx, rx) = mpsc::channel(8);
3040 let text = self.text.clone();
3041 let response = self.response();
3042 tokio::spawn(async move {
3043 let _ = tx.send(StreamEvent::TextDelta(text)).await;
3044 let _ = tx.send(StreamEvent::Done(response)).await;
3045 });
3046 Ok(rx)
3047 }
3048 }
3049
3050 #[async_trait::async_trait]
3051 impl LlmClient for FailingStreamingClient {
3052 async fn complete(
3053 &self,
3054 _messages: &[Message],
3055 _system: Option<&str>,
3056 _tools: &[crate::llm::ToolDefinition],
3057 ) -> anyhow::Result<LlmResponse> {
3058 anyhow::bail!("non-streaming fallback failed")
3059 }
3060
3061 async fn complete_streaming(
3062 &self,
3063 _messages: &[Message],
3064 _system: Option<&str>,
3065 _tools: &[crate::llm::ToolDefinition],
3066 _cancel_token: tokio_util::sync::CancellationToken,
3067 ) -> anyhow::Result<mpsc::Receiver<StreamEvent>> {
3068 anyhow::bail!("streaming setup failed")
3069 }
3070 }
3071
3072 #[async_trait::async_trait]
3073 impl LlmClient for CancellableStreamingClient {
3074 async fn complete(
3075 &self,
3076 _messages: &[Message],
3077 _system: Option<&str>,
3078 _tools: &[crate::llm::ToolDefinition],
3079 ) -> anyhow::Result<LlmResponse> {
3080 anyhow::bail!("cancellable client does not support fallback completion")
3081 }
3082
3083 async fn complete_streaming(
3084 &self,
3085 _messages: &[Message],
3086 _system: Option<&str>,
3087 _tools: &[crate::llm::ToolDefinition],
3088 cancel_token: tokio_util::sync::CancellationToken,
3089 ) -> anyhow::Result<mpsc::Receiver<StreamEvent>> {
3090 let (tx, rx) = mpsc::channel(8);
3091 let text = self.text.clone();
3092 tokio::spawn(async move {
3093 let _ = tx.send(StreamEvent::TextDelta(text)).await;
3094 cancel_token.cancelled().await;
3095 });
3096 Ok(rx)
3097 }
3098 }
3099
3100 fn test_config() -> CodeConfig {
3101 CodeConfig {
3102 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
3103 providers: vec![
3104 ProviderConfig {
3105 name: "anthropic".to_string(),
3106 api_key: Some("test-key".to_string()),
3107 base_url: None,
3108 headers: std::collections::HashMap::new(),
3109 session_id_header: None,
3110 models: vec![ModelConfig {
3111 id: "claude-sonnet-4-20250514".to_string(),
3112 name: "Claude Sonnet 4".to_string(),
3113 family: "claude-sonnet".to_string(),
3114 api_key: None,
3115 base_url: None,
3116 headers: std::collections::HashMap::new(),
3117 session_id_header: None,
3118 attachment: false,
3119 reasoning: false,
3120 tool_call: true,
3121 temperature: true,
3122 release_date: None,
3123 modalities: ModelModalities::default(),
3124 cost: Default::default(),
3125 limit: Default::default(),
3126 }],
3127 },
3128 ProviderConfig {
3129 name: "openai".to_string(),
3130 api_key: Some("test-openai-key".to_string()),
3131 base_url: None,
3132 headers: std::collections::HashMap::new(),
3133 session_id_header: None,
3134 models: vec![ModelConfig {
3135 id: "gpt-4o".to_string(),
3136 name: "GPT-4o".to_string(),
3137 family: "gpt-4".to_string(),
3138 api_key: None,
3139 base_url: None,
3140 headers: std::collections::HashMap::new(),
3141 session_id_header: None,
3142 attachment: false,
3143 reasoning: false,
3144 tool_call: true,
3145 temperature: true,
3146 release_date: None,
3147 modalities: ModelModalities::default(),
3148 cost: Default::default(),
3149 limit: Default::default(),
3150 }],
3151 },
3152 ],
3153 ..Default::default()
3154 }
3155 }
3156
3157 fn build_effective_registry_for_test(
3158 agent_registry: Option<Arc<crate::skills::SkillRegistry>>,
3159 opts: &SessionOptions,
3160 ) -> Arc<crate::skills::SkillRegistry> {
3161 let base_registry = agent_registry
3162 .as_deref()
3163 .map(|r| r.fork())
3164 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
3165 if let Some(ref r) = opts.skill_registry {
3166 for skill in r.all() {
3167 base_registry.register_unchecked(skill);
3168 }
3169 }
3170 for dir in &opts.skill_dirs {
3171 if let Err(e) = base_registry.load_from_dir(dir) {
3172 tracing::warn!(
3173 dir = %dir.display(),
3174 error = %e,
3175 "Failed to load session skill dir — skipping"
3176 );
3177 }
3178 }
3179 Arc::new(base_registry)
3180 }
3181
3182 #[tokio::test]
3183 async fn test_from_config() {
3184 let agent = Agent::from_config(test_config()).await;
3185 assert!(agent.is_ok());
3186 }
3187
3188 #[tokio::test]
3189 async fn test_session_default() {
3190 let agent = Agent::from_config(test_config()).await.unwrap();
3191 let session = agent.session("/tmp/test-workspace", None);
3192 assert!(session.is_ok());
3193 let debug = format!("{:?}", session.unwrap());
3194 assert!(debug.contains("AgentSession"));
3195 }
3196
3197 #[tokio::test]
3198 async fn test_session_routes_agents_md_through_context_provider() {
3199 let temp_dir = tempfile::tempdir().unwrap();
3200 std::fs::write(
3201 temp_dir.path().join("AGENTS.md"),
3202 "Always run focused tests before reporting completion.",
3203 )
3204 .unwrap();
3205
3206 let agent = Agent::from_config(test_config()).await.unwrap();
3207 let session = agent
3208 .session(temp_dir.path().display().to_string(), None)
3209 .unwrap();
3210
3211 let agents_provider = session
3212 .config
3213 .context_providers
3214 .iter()
3215 .find(|provider| provider.name() == "agents_md")
3216 .expect("AGENTS.md provider should be registered");
3217 assert!(!session
3218 .config
3219 .prompt_slots
3220 .extra
3221 .as_deref()
3222 .unwrap_or_default()
3223 .contains("Project Instructions (AGENTS.md)"));
3224
3225 let result = agents_provider
3226 .query(&crate::context::ContextQuery::new("complete the task"))
3227 .await
3228 .unwrap();
3229
3230 assert_eq!(result.items.len(), 1);
3231 assert_eq!(result.items[0].id, "agents_md");
3232 assert!(result.items[0]
3233 .content
3234 .contains("Always run focused tests before reporting completion."));
3235 assert_eq!(result.items[0].relevance, 0.95);
3236 }
3237
3238 #[tokio::test]
3239 async fn test_session_initializes_without_legacy_agentic_tools() {
3240 let agent = Agent::from_config(test_config()).await.unwrap();
3241 let _session = agent.session("/tmp/test-workspace", None).unwrap();
3242 }
3243
3244 #[tokio::test]
3245 async fn test_session_with_model_override() {
3246 let agent = Agent::from_config(test_config()).await.unwrap();
3247 let opts = SessionOptions::new().with_model("openai/gpt-4o");
3248 let session = agent.session("/tmp/test-workspace", Some(opts));
3249 assert!(session.is_ok());
3250 }
3251
3252 #[tokio::test]
3253 async fn test_session_with_invalid_model_format() {
3254 let agent = Agent::from_config(test_config()).await.unwrap();
3255 let opts = SessionOptions::new().with_model("gpt-4o");
3256 let session = agent.session("/tmp/test-workspace", Some(opts));
3257 assert!(session.is_err());
3258 }
3259
3260 #[tokio::test]
3261 async fn test_session_with_model_not_found() {
3262 let agent = Agent::from_config(test_config()).await.unwrap();
3263 let opts = SessionOptions::new().with_model("openai/nonexistent");
3264 let session = agent.session("/tmp/test-workspace", Some(opts));
3265 assert!(session.is_err());
3266 }
3267
3268 #[tokio::test]
3269 async fn test_session_skill_dirs_preserve_agent_registry_validator() {
3270 use crate::skills::validator::DefaultSkillValidator;
3271 use crate::skills::SkillRegistry;
3272
3273 let registry = Arc::new(SkillRegistry::new());
3274 registry.set_validator(Arc::new(DefaultSkillValidator::default()));
3275
3276 let temp_dir = tempfile::tempdir().unwrap();
3277 let invalid_skill = temp_dir.path().join("invalid.md");
3278 std::fs::write(
3279 &invalid_skill,
3280 r#"---
3281name: BadName
3282description: "invalid skill name"
3283kind: instruction
3284---
3285# Invalid Skill
3286"#,
3287 )
3288 .unwrap();
3289
3290 let opts = SessionOptions::new().with_skill_dirs([temp_dir.path()]);
3291 let effective_registry = build_effective_registry_for_test(Some(registry), &opts);
3292 assert!(effective_registry.get("BadName").is_none());
3293 }
3294
3295 #[tokio::test]
3296 async fn test_session_skill_registry_overrides_agent_registry_without_polluting_parent() {
3297 use crate::skills::{Skill, SkillKind, SkillRegistry};
3298
3299 let registry = Arc::new(SkillRegistry::new());
3300 registry.register_unchecked(Arc::new(Skill {
3301 name: "shared-skill".to_string(),
3302 description: "agent level".to_string(),
3303 allowed_tools: None,
3304 disable_model_invocation: false,
3305 kind: SkillKind::Instruction,
3306 content: "agent content".to_string(),
3307 tags: vec![],
3308 version: None,
3309 }));
3310
3311 let session_registry = Arc::new(SkillRegistry::new());
3312 session_registry.register_unchecked(Arc::new(Skill {
3313 name: "shared-skill".to_string(),
3314 description: "session level".to_string(),
3315 allowed_tools: None,
3316 disable_model_invocation: false,
3317 kind: SkillKind::Instruction,
3318 content: "session content".to_string(),
3319 tags: vec![],
3320 version: None,
3321 }));
3322
3323 let opts = SessionOptions::new().with_skill_registry(session_registry);
3324 let effective_registry = build_effective_registry_for_test(Some(registry.clone()), &opts);
3325
3326 assert_eq!(
3327 effective_registry.get("shared-skill").unwrap().content,
3328 "session content"
3329 );
3330 assert_eq!(
3331 registry.get("shared-skill").unwrap().content,
3332 "agent content"
3333 );
3334 }
3335
3336 #[tokio::test]
3337 async fn test_session_skill_dirs_override_session_registry_and_skip_invalid_entries() {
3338 use crate::skills::{Skill, SkillKind, SkillRegistry};
3339
3340 let session_registry = Arc::new(SkillRegistry::new());
3341 session_registry.register_unchecked(Arc::new(Skill {
3342 name: "shared-skill".to_string(),
3343 description: "session registry".to_string(),
3344 allowed_tools: None,
3345 disable_model_invocation: false,
3346 kind: SkillKind::Instruction,
3347 content: "registry content".to_string(),
3348 tags: vec![],
3349 version: None,
3350 }));
3351
3352 let temp_dir = tempfile::tempdir().unwrap();
3353 std::fs::write(
3354 temp_dir.path().join("shared.md"),
3355 r#"---
3356name: shared-skill
3357description: "skill dir override"
3358kind: instruction
3359---
3360# Shared Skill
3361dir content
3362"#,
3363 )
3364 .unwrap();
3365 std::fs::write(temp_dir.path().join("README.md"), "# not a skill").unwrap();
3366
3367 let opts = SessionOptions::new()
3368 .with_skill_registry(session_registry)
3369 .with_skill_dirs([temp_dir.path()]);
3370 let effective_registry = build_effective_registry_for_test(None, &opts);
3371
3372 assert_eq!(
3373 effective_registry.get("shared-skill").unwrap().description,
3374 "skill dir override"
3375 );
3376 assert!(effective_registry.get("README").is_none());
3377 }
3378
3379 #[tokio::test]
3380 async fn test_session_specific_skills_do_not_leak_across_sessions() {
3381 use crate::skills::{Skill, SkillKind, SkillRegistry};
3382
3383 let mut agent = Agent::from_config(test_config()).await.unwrap();
3384 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3385 agent.config.skill_registry = Some(agent_registry);
3386
3387 let session_registry = Arc::new(SkillRegistry::new());
3388 session_registry.register_unchecked(Arc::new(Skill {
3389 name: "session-only".to_string(),
3390 description: "only for first session".to_string(),
3391 allowed_tools: None,
3392 disable_model_invocation: false,
3393 kind: SkillKind::Instruction,
3394 content: "session one".to_string(),
3395 tags: vec![],
3396 version: None,
3397 }));
3398
3399 let session_one = agent
3400 .session(
3401 "/tmp/test-workspace",
3402 Some(SessionOptions::new().with_skill_registry(session_registry)),
3403 )
3404 .unwrap();
3405 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3406
3407 assert!(session_one
3408 .config
3409 .skill_registry
3410 .as_ref()
3411 .unwrap()
3412 .get("session-only")
3413 .is_some());
3414 assert!(session_two
3415 .config
3416 .skill_registry
3417 .as_ref()
3418 .unwrap()
3419 .get("session-only")
3420 .is_none());
3421 }
3422
3423 #[tokio::test]
3424 async fn test_session_for_agent_applies_definition_and_keeps_skill_overrides_isolated() {
3425 use crate::skills::{Skill, SkillKind, SkillRegistry};
3426 use crate::subagent::AgentDefinition;
3427
3428 let mut agent = Agent::from_config(test_config()).await.unwrap();
3429 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3430
3431 let definition = AgentDefinition::new("reviewer", "Review code")
3432 .with_prompt("Agent definition prompt")
3433 .with_max_steps(7);
3434
3435 let session_registry = Arc::new(SkillRegistry::new());
3436 session_registry.register_unchecked(Arc::new(Skill {
3437 name: "agent-session-skill".to_string(),
3438 description: "agent session only".to_string(),
3439 allowed_tools: None,
3440 disable_model_invocation: false,
3441 kind: SkillKind::Instruction,
3442 content: "agent session content".to_string(),
3443 tags: vec![],
3444 version: None,
3445 }));
3446
3447 let session_one = agent
3448 .session_for_agent(
3449 "/tmp/test-workspace",
3450 &definition,
3451 Some(SessionOptions::new().with_skill_registry(session_registry)),
3452 )
3453 .unwrap();
3454 let session_two = agent
3455 .session_for_agent("/tmp/test-workspace", &definition, None)
3456 .unwrap();
3457
3458 assert_eq!(session_one.config.max_tool_rounds, 7);
3459 let extra = session_one.config.prompt_slots.extra.as_deref().unwrap();
3460 assert!(extra.contains("Agent definition prompt"));
3461 assert!(!extra.contains("agent-session-skill"));
3462 assert!(session_one
3463 .config
3464 .context_providers
3465 .iter()
3466 .any(|provider| provider.name() == "skills_catalog"));
3467 assert!(session_one
3468 .config
3469 .skill_registry
3470 .as_ref()
3471 .unwrap()
3472 .get("agent-session-skill")
3473 .is_some());
3474 assert!(session_two
3475 .config
3476 .skill_registry
3477 .as_ref()
3478 .unwrap()
3479 .get("agent-session-skill")
3480 .is_none());
3481 }
3482
3483 #[tokio::test]
3484 async fn test_session_for_agent_preserves_existing_prompt_slots_when_injecting_definition_prompt(
3485 ) {
3486 use crate::prompts::SystemPromptSlots;
3487 use crate::subagent::AgentDefinition;
3488
3489 let agent = Agent::from_config(test_config()).await.unwrap();
3490 let definition = AgentDefinition::new("planner", "Plan work")
3491 .with_prompt("Definition extra prompt")
3492 .with_max_steps(3);
3493
3494 let opts = SessionOptions::new().with_prompt_slots(SystemPromptSlots {
3495 style: None,
3496 role: Some("Custom role".to_string()),
3497 guidelines: None,
3498 response_style: None,
3499 extra: None,
3500 });
3501
3502 let session = agent
3503 .session_for_agent("/tmp/test-workspace", &definition, Some(opts))
3504 .unwrap();
3505
3506 assert_eq!(
3507 session.config.prompt_slots.role.as_deref(),
3508 Some("Custom role")
3509 );
3510 assert!(session
3511 .config
3512 .prompt_slots
3513 .extra
3514 .as_deref()
3515 .unwrap()
3516 .contains("Definition extra prompt"));
3517 assert_eq!(session.config.max_tool_rounds, 3);
3518 }
3519
3520 #[tokio::test]
3521 async fn test_new_with_acl_string() {
3522 let acl = r#"
3523 default_model = "anthropic/claude-sonnet-4-20250514"
3524 providers "anthropic" {
3525 apiKey = "test-key"
3526 models "claude-sonnet-4-20250514" {
3527 name = "Claude Sonnet 4"
3528 }
3529 }
3530 "#;
3531 let agent = Agent::new(acl).await;
3532 assert!(agent.is_ok());
3533 }
3534
3535 #[tokio::test]
3536 async fn test_create_alias_acl() {
3537 let acl = r#"
3538 default_model = "anthropic/claude-sonnet-4-20250514"
3539 providers "anthropic" {
3540 apiKey = "test-key"
3541 models "claude-sonnet-4-20250514" {
3542 name = "Claude Sonnet 4"
3543 }
3544 }
3545 "#;
3546 let agent = Agent::create(acl).await;
3547 assert!(agent.is_ok());
3548 }
3549
3550 #[tokio::test]
3551 async fn test_create_and_new_produce_same_result() {
3552 let acl = r#"
3553 default_model = "anthropic/claude-sonnet-4-20250514"
3554 providers "anthropic" {
3555 apiKey = "test-key"
3556 models "claude-sonnet-4-20250514" {
3557 name = "Claude Sonnet 4"
3558 }
3559 }
3560 "#;
3561 let agent_new = Agent::new(acl).await;
3562 let agent_create = Agent::create(acl).await;
3563 assert!(agent_new.is_ok());
3564 assert!(agent_create.is_ok());
3565
3566 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
3568 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
3569 assert!(session_new.is_ok());
3570 assert!(session_create.is_ok());
3571 }
3572
3573 #[tokio::test]
3574 async fn test_new_with_existing_acl_file_uses_file_loading() {
3575 let temp_dir = tempfile::tempdir().unwrap();
3576 let config_path = temp_dir.path().join("agent.acl");
3577 std::fs::write(&config_path, "providers {").unwrap();
3578
3579 let err = Agent::new(config_path.display().to_string())
3580 .await
3581 .unwrap_err();
3582 let msg = err.to_string();
3583
3584 assert!(msg.contains("Failed to load config"));
3585 assert!(msg.contains("agent.acl"));
3586 assert!(!msg.contains("Failed to parse config as ACL string"));
3587 }
3588
3589 #[tokio::test]
3590 async fn test_new_with_missing_acl_file_reports_not_found() {
3591 let temp_dir = tempfile::tempdir().unwrap();
3592 let missing_path = temp_dir.path().join("agent.acl");
3593
3594 let err = Agent::new(missing_path.display().to_string())
3595 .await
3596 .unwrap_err();
3597 let msg = err.to_string();
3598
3599 assert!(msg.contains("Config file not found"));
3600 assert!(msg.contains("agent.acl"));
3601 assert!(!msg.contains("Failed to parse config as ACL string"));
3602 }
3603
3604 #[tokio::test]
3605 async fn test_new_rejects_hcl_files() {
3606 let temp_dir = tempfile::tempdir().unwrap();
3607 let config_path = temp_dir.path().join("agent.hcl");
3608 std::fs::write(&config_path, "default_model = \"openai/test\"").unwrap();
3609
3610 let err = Agent::new(config_path.display().to_string())
3611 .await
3612 .unwrap_err();
3613 let msg = err.to_string();
3614
3615 assert!(msg.contains("HCL config files are not supported in 2.0"));
3616 assert!(msg.contains(".acl"));
3617 }
3618
3619 #[test]
3620 fn test_from_config_requires_default_model() {
3621 let rt = tokio::runtime::Runtime::new().unwrap();
3622 let config = CodeConfig {
3623 providers: vec![ProviderConfig {
3624 name: "anthropic".to_string(),
3625 api_key: Some("test-key".to_string()),
3626 base_url: None,
3627 headers: std::collections::HashMap::new(),
3628 session_id_header: None,
3629 models: vec![],
3630 }],
3631 ..Default::default()
3632 };
3633 let result = rt.block_on(Agent::from_config(config));
3634 assert!(result.is_err());
3635 }
3636
3637 #[tokio::test]
3638 async fn test_history_empty_on_new_session() {
3639 let agent = Agent::from_config(test_config()).await.unwrap();
3640 let session = agent.session("/tmp/test-workspace", None).unwrap();
3641 assert!(session.history().is_empty());
3642 }
3643
3644 #[tokio::test]
3645 async fn test_stream_updates_history_and_auto_saves() {
3646 let store = Arc::new(crate::store::MemorySessionStore::new());
3647 let agent = Agent::from_config(test_config()).await.unwrap();
3648 let opts = SessionOptions::new()
3649 .with_session_store(store.clone())
3650 .with_session_id("stream-history-test")
3651 .with_auto_save(true);
3652 let session = agent
3653 .build_session(
3654 "/tmp/test-stream-history".into(),
3655 Arc::new(StaticStreamingClient::new("streamed answer")),
3656 &opts,
3657 )
3658 .unwrap();
3659
3660 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
3661 let mut saw_end = false;
3662 while let Some(event) = rx.recv().await {
3663 if matches!(event, AgentEvent::End { .. }) {
3664 saw_end = true;
3665 break;
3666 }
3667 }
3668 handle.await.unwrap();
3669
3670 assert!(saw_end);
3671 let history = session.history();
3672 assert_eq!(history.len(), 2);
3673 assert_eq!(history[0].text(), "hello");
3674 assert_eq!(history[1].text(), "streamed answer");
3675
3676 let saved = store
3677 .load("stream-history-test")
3678 .await
3679 .unwrap()
3680 .expect("saved session");
3681 assert_eq!(saved.messages.len(), 2);
3682 assert_eq!(saved.messages[1].text(), "streamed answer");
3683
3684 let run_records = store
3685 .load_run_records("stream-history-test")
3686 .await
3687 .unwrap()
3688 .expect("saved run records");
3689 assert_eq!(run_records.len(), 1);
3690 assert_eq!(
3691 run_records[0].snapshot.status,
3692 crate::run::RunStatus::Completed
3693 );
3694 assert!(run_records[0]
3695 .events
3696 .iter()
3697 .any(|record| matches!(record.event, AgentEvent::End { .. })));
3698 }
3699
3700 #[tokio::test]
3701 async fn test_stream_with_custom_history_does_not_update_session_history() {
3702 let agent = Agent::from_config(test_config()).await.unwrap();
3703 let session = agent
3704 .build_session(
3705 "/tmp/test-stream-custom-history".into(),
3706 Arc::new(StaticStreamingClient::new("custom history answer")),
3707 &SessionOptions::new(),
3708 )
3709 .unwrap();
3710 let custom_history = vec![Message::user("custom prompt")];
3711
3712 let (mut rx, handle) = session
3713 .stream("ignored", Some(&custom_history))
3714 .await
3715 .unwrap();
3716 while let Some(event) = rx.recv().await {
3717 if matches!(event, AgentEvent::End { .. }) {
3718 break;
3719 }
3720 }
3721 handle.await.unwrap();
3722
3723 assert!(session.history().is_empty());
3724 }
3725
3726 #[tokio::test]
3727 async fn test_stream_error_does_not_update_history_or_auto_save() {
3728 let store = Arc::new(crate::store::MemorySessionStore::new());
3729 let agent = Agent::from_config(test_config()).await.unwrap();
3730 let opts = SessionOptions::new()
3731 .with_session_store(store.clone())
3732 .with_session_id("stream-error-test")
3733 .with_auto_save(true);
3734 let session = agent
3735 .build_session(
3736 "/tmp/test-stream-error".into(),
3737 Arc::new(FailingStreamingClient),
3738 &opts,
3739 )
3740 .unwrap();
3741
3742 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
3743 let mut saw_error = false;
3744 while let Some(event) = rx.recv().await {
3745 if matches!(event, AgentEvent::Error { .. }) {
3746 saw_error = true;
3747 break;
3748 }
3749 }
3750 handle.await.unwrap();
3751
3752 assert!(saw_error);
3753 assert!(session.history().is_empty());
3754 assert!(store.load("stream-error-test").await.unwrap().is_none());
3755 }
3756
3757 #[tokio::test]
3758 async fn test_stream_cancel_does_not_update_history_or_auto_save() {
3759 let store = Arc::new(crate::store::MemorySessionStore::new());
3760 let agent = Agent::from_config(test_config()).await.unwrap();
3761 let opts = SessionOptions::new()
3762 .with_session_store(store.clone())
3763 .with_session_id("stream-cancel-test")
3764 .with_auto_save(true);
3765 let session = agent
3766 .build_session(
3767 "/tmp/test-stream-cancel".into(),
3768 Arc::new(CancellableStreamingClient::new("partial answer")),
3769 &opts,
3770 )
3771 .unwrap();
3772
3773 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
3774 let mut saw_delta = false;
3775 for _ in 0..16 {
3776 let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
3777 .await
3778 .expect("stream event before timeout")
3779 .expect("stream should stay open until cancelled");
3780 if matches!(event, AgentEvent::TextDelta { ref text } if text == "partial answer") {
3781 saw_delta = true;
3782 break;
3783 }
3784 }
3785 assert!(saw_delta);
3786 assert!(session.cancel().await);
3787
3788 while rx.recv().await.is_some() {}
3789 handle.await.unwrap();
3790
3791 assert!(session.history().is_empty());
3792 assert!(store.load("stream-cancel-test").await.unwrap().is_none());
3793 assert!(!session.cancel().await);
3794 }
3795
3796 #[tokio::test]
3797 async fn test_stream_with_attachments_cancel_does_not_update_history_or_auto_save() {
3798 let store = Arc::new(crate::store::MemorySessionStore::new());
3799 let agent = Agent::from_config(test_config()).await.unwrap();
3800 let opts = SessionOptions::new()
3801 .with_session_store(store.clone())
3802 .with_session_id("stream-attachments-cancel-test")
3803 .with_auto_save(true);
3804 let session = agent
3805 .build_session(
3806 "/tmp/test-stream-attachments-cancel".into(),
3807 Arc::new(CancellableStreamingClient::new("partial attachment answer")),
3808 &opts,
3809 )
3810 .unwrap();
3811 let attachments = vec![crate::llm::Attachment::png(vec![1, 2, 3])];
3812
3813 let (mut rx, handle) = session
3814 .stream_with_attachments("hello", &attachments, None)
3815 .await
3816 .unwrap();
3817 let mut saw_delta = false;
3818 for _ in 0..16 {
3819 let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
3820 .await
3821 .expect("stream event before timeout")
3822 .expect("stream should stay open until cancelled");
3823 if matches!(event, AgentEvent::TextDelta { .. }) {
3824 saw_delta = true;
3825 break;
3826 }
3827 }
3828 assert!(saw_delta);
3829 assert!(session.cancel().await);
3830
3831 while rx.recv().await.is_some() {}
3832 handle.await.unwrap();
3833
3834 assert!(session.history().is_empty());
3835 assert!(store
3836 .load("stream-attachments-cancel-test")
3837 .await
3838 .unwrap()
3839 .is_none());
3840 assert_eq!(
3841 session.runs().await[0].status,
3842 crate::run::RunStatus::Cancelled
3843 );
3844 assert!(!session.cancel().await);
3845 }
3846
3847 #[tokio::test]
3848 async fn test_run_handle_cancels_send_with_attachments() {
3849 let agent = Agent::from_config(test_config()).await.unwrap();
3850 let session = Arc::new(
3851 agent
3852 .build_session(
3853 "/tmp/test-send-attachments-run-handle-cancel".into(),
3854 Arc::new(CancellableStreamingClient::new("partial answer")),
3855 &SessionOptions::new(),
3856 )
3857 .unwrap(),
3858 );
3859 let worker_session = Arc::clone(&session);
3860 let attachments = vec![crate::llm::Attachment::png(vec![1, 2, 3])];
3861
3862 let worker = tokio::spawn(async move {
3863 worker_session
3864 .send_with_attachments("hello", &attachments, None)
3865 .await
3866 });
3867
3868 let mut run = None;
3869 for _ in 0..20 {
3870 if let Some(current) = session.current_run().await {
3871 run = Some(current);
3872 break;
3873 }
3874 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3875 }
3876 let run = run.expect("current run should be visible");
3877 assert!(run.cancel().await);
3878
3879 let result = tokio::time::timeout(std::time::Duration::from_secs(1), worker)
3880 .await
3881 .expect("send_with_attachments should stop after cancellation")
3882 .expect("worker should not panic");
3883 assert!(result.is_err());
3884 assert_eq!(run.status().await, Some(crate::run::RunStatus::Cancelled));
3885 assert!(session.history().is_empty());
3886 assert!(!session.cancel().await);
3887 }
3888
3889 #[tokio::test]
3890 async fn test_cancel_run_only_cancels_matching_current_run() {
3891 let agent = Agent::from_config(test_config()).await.unwrap();
3892 let session = Arc::new(
3893 agent
3894 .build_session(
3895 "/tmp/test-cancel-run-by-id".into(),
3896 Arc::new(CancellableStreamingClient::new("partial answer")),
3897 &SessionOptions::new(),
3898 )
3899 .unwrap(),
3900 );
3901 let worker_session = Arc::clone(&session);
3902 let worker = tokio::spawn(async move { worker_session.send("hello", None).await });
3903
3904 let mut run_id = None;
3905 for _ in 0..20 {
3906 if let Some(current) = session.current_run().await {
3907 run_id = Some(current.id().to_string());
3908 break;
3909 }
3910 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3911 }
3912 let run_id = run_id.expect("current run should be visible");
3913
3914 assert!(!session.cancel_run("stale-run").await);
3915 assert!(session.cancel_run(&run_id).await);
3916
3917 let result = tokio::time::timeout(std::time::Duration::from_secs(1), worker)
3918 .await
3919 .expect("send should stop after cancellation")
3920 .expect("worker should not panic");
3921 assert!(result.is_err());
3922 assert_eq!(
3923 session.run_snapshot(&run_id).await.unwrap().status,
3924 crate::run::RunStatus::Cancelled
3925 );
3926 assert!(!session.cancel_run(&run_id).await);
3927 }
3928
3929 #[tokio::test]
3930 async fn test_send_with_attachments_passes_session_id_to_context_providers() {
3931 let provider = Arc::new(CapturingContextProvider::default());
3932 let agent = Agent::from_config(test_config()).await.unwrap();
3933 let opts = SessionOptions::new()
3934 .with_session_id("attachments-context-session")
3935 .with_context_provider(provider.clone());
3936 let session = agent
3937 .build_session(
3938 "/tmp/test-send-attachments-context".into(),
3939 Arc::new(StaticStreamingClient::new("attachment answer")),
3940 &opts,
3941 )
3942 .unwrap();
3943 let attachments = vec![crate::llm::Attachment::png(vec![1, 2, 3])];
3944
3945 session
3946 .send_with_attachments("hello", &attachments, None)
3947 .await
3948 .unwrap();
3949
3950 let session_ids = provider.session_ids.lock().unwrap();
3951 assert!(!session_ids.is_empty());
3952 assert!(session_ids
3953 .iter()
3954 .all(|id| id.as_deref() == Some("attachments-context-session")));
3955 }
3956
3957 #[tokio::test]
3958 async fn test_send_records_run_snapshot_and_events() {
3959 let agent = Agent::from_config(test_config()).await.unwrap();
3960 let session = agent
3961 .build_session(
3962 "/tmp/test-send-run-store".into(),
3963 Arc::new(StaticStreamingClient::new("run answer")),
3964 &SessionOptions::new(),
3965 )
3966 .unwrap();
3967
3968 let result = session.send("hello", None).await.unwrap();
3969 assert_eq!(result.text, "run answer");
3970
3971 let runs = session.runs().await;
3972 assert_eq!(runs.len(), 1);
3973 assert_eq!(runs[0].status, crate::run::RunStatus::Completed);
3974 assert_eq!(runs[0].result_text.as_deref(), Some("run answer"));
3975
3976 let events = session.run_events(&runs[0].id).await;
3977 assert!(events
3978 .iter()
3979 .any(|record| matches!(record.event, AgentEvent::Start { .. })));
3980 assert!(events
3981 .iter()
3982 .any(|record| matches!(record.event, AgentEvent::End { .. })));
3983 }
3984
3985 #[tokio::test]
3986 async fn test_send_publishes_runtime_events_to_hook_executor() {
3987 let hook = Arc::new(RecordingRuntimeHook::default());
3988 let agent = Agent::from_config(test_config()).await.unwrap();
3989 let opts = SessionOptions::new().with_hook_executor(hook.clone());
3990 let session = agent
3991 .build_session(
3992 "/tmp/test-runtime-event-hook".into(),
3993 Arc::new(StaticStreamingClient::new("hooked answer")),
3994 &opts,
3995 )
3996 .unwrap();
3997
3998 session.send("hello", None).await.unwrap();
3999
4000 let events = hook.events.lock().unwrap();
4001 assert!(events
4002 .iter()
4003 .any(|(_, session_id, event)| session_id == session.id()
4004 && matches!(event, AgentEvent::Start { .. })));
4005 assert!(events
4006 .iter()
4007 .any(|(_, session_id, event)| session_id == session.id()
4008 && matches!(event, AgentEvent::End { .. })));
4009 assert!(events
4010 .iter()
4011 .all(|(run_id, _, _)| run_id.starts_with("run-")));
4012 }
4013
4014 #[tokio::test]
4015 async fn test_stream_exposes_current_run_handle_and_replay() {
4016 let agent = Agent::from_config(test_config()).await.unwrap();
4017 let session = agent
4018 .build_session(
4019 "/tmp/test-stream-run-handle".into(),
4020 Arc::new(CancellableStreamingClient::new("partial answer")),
4021 &SessionOptions::new(),
4022 )
4023 .unwrap();
4024
4025 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
4026 let mut saw_delta = false;
4027 for _ in 0..16 {
4028 let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
4029 .await
4030 .expect("stream event before timeout")
4031 .expect("stream emits event");
4032 if matches!(event, AgentEvent::TextDelta { .. }) {
4033 saw_delta = true;
4034 break;
4035 }
4036 }
4037 assert!(saw_delta);
4038
4039 let run = session.current_run().await.expect("current run handle");
4040 assert_eq!(run.session_id(), session.id());
4041 assert!(matches!(
4042 run.status().await,
4043 Some(crate::run::RunStatus::Executing | crate::run::RunStatus::Planning)
4044 ));
4045 assert!(run.cancel().await);
4046
4047 while rx.recv().await.is_some() {}
4048 handle.await.unwrap();
4049
4050 let snapshot = run
4051 .snapshot()
4052 .await
4053 .expect("run snapshot remains replayable");
4054 assert_eq!(snapshot.status, crate::run::RunStatus::Cancelled);
4055 assert!(!run.events().await.is_empty());
4056 }
4057
4058 #[tokio::test]
4059 async fn test_session_options_with_agent_dir() {
4060 let opts = SessionOptions::new()
4061 .with_agent_dir("/tmp/agents")
4062 .with_agent_dir("/tmp/more-agents");
4063 assert_eq!(opts.agent_dirs.len(), 2);
4064 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
4065 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
4066 }
4067
4068 #[test]
4073 fn test_session_options_with_queue_config() {
4074 let qc = SessionQueueConfig::default().with_lane_features();
4075 let opts = SessionOptions::new().with_queue_config(qc.clone());
4076 assert!(opts.queue_config.is_some());
4077
4078 let config = opts.queue_config.unwrap();
4079 assert!(config.enable_dlq);
4080 assert!(config.enable_metrics);
4081 assert!(config.enable_alerts);
4082 assert_eq!(config.default_timeout_ms, Some(60_000));
4083 }
4084
4085 #[tokio::test]
4086 async fn test_session_uses_single_delegation_tool_surface() {
4087 let agent = Agent::from_config(test_config()).await.unwrap();
4088 let session = agent
4089 .session("/tmp/test-workspace-delegation-tools", None)
4090 .unwrap();
4091 let names = session.tool_names();
4092
4093 assert!(names.contains(&"task".to_string()));
4094 assert!(names.contains(&"parallel_task".to_string()));
4095 assert!(!names.contains(&"run_team".to_string()));
4096 }
4097
4098 #[tokio::test(flavor = "multi_thread")]
4099 async fn test_session_with_queue_config() {
4100 let agent = Agent::from_config(test_config()).await.unwrap();
4101 let qc = SessionQueueConfig::default();
4102 let opts = SessionOptions::new().with_queue_config(qc);
4103 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
4104 assert!(session.is_ok());
4105 let session = session.unwrap();
4106 assert!(session.has_queue());
4107 }
4108
4109 #[tokio::test]
4110 async fn test_session_without_queue_config() {
4111 let agent = Agent::from_config(test_config()).await.unwrap();
4112 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
4113 assert!(!session.has_queue());
4114 }
4115
4116 #[tokio::test]
4117 async fn test_session_queue_stats_without_queue() {
4118 let agent = Agent::from_config(test_config()).await.unwrap();
4119 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
4120 let stats = session.queue_stats().await;
4121 assert_eq!(stats.total_pending, 0);
4123 assert_eq!(stats.total_active, 0);
4124 }
4125
4126 #[tokio::test(flavor = "multi_thread")]
4127 async fn test_session_queue_stats_with_queue() {
4128 let agent = Agent::from_config(test_config()).await.unwrap();
4129 let qc = SessionQueueConfig::default();
4130 let opts = SessionOptions::new().with_queue_config(qc);
4131 let session = agent
4132 .session("/tmp/test-workspace-qstats", Some(opts))
4133 .unwrap();
4134 let stats = session.queue_stats().await;
4135 assert_eq!(stats.total_pending, 0);
4137 assert_eq!(stats.total_active, 0);
4138 }
4139
4140 #[tokio::test(flavor = "multi_thread")]
4141 async fn test_session_pending_external_tasks_empty() {
4142 let agent = Agent::from_config(test_config()).await.unwrap();
4143 let qc = SessionQueueConfig::default();
4144 let opts = SessionOptions::new().with_queue_config(qc);
4145 let session = agent
4146 .session("/tmp/test-workspace-ext", Some(opts))
4147 .unwrap();
4148 let tasks = session.pending_external_tasks().await;
4149 assert!(tasks.is_empty());
4150 }
4151
4152 #[tokio::test(flavor = "multi_thread")]
4153 async fn test_session_dead_letters_empty() {
4154 let agent = Agent::from_config(test_config()).await.unwrap();
4155 let qc = SessionQueueConfig::default().with_dlq(Some(100));
4156 let opts = SessionOptions::new().with_queue_config(qc);
4157 let session = agent
4158 .session("/tmp/test-workspace-dlq", Some(opts))
4159 .unwrap();
4160 let dead = session.dead_letters().await;
4161 assert!(dead.is_empty());
4162 }
4163
4164 #[tokio::test(flavor = "multi_thread")]
4165 async fn test_session_queue_metrics_disabled() {
4166 let agent = Agent::from_config(test_config()).await.unwrap();
4167 let qc = SessionQueueConfig::default();
4169 let opts = SessionOptions::new().with_queue_config(qc);
4170 let session = agent
4171 .session("/tmp/test-workspace-nomet", Some(opts))
4172 .unwrap();
4173 let metrics = session.queue_metrics().await;
4174 assert!(metrics.is_none());
4175 }
4176
4177 #[tokio::test(flavor = "multi_thread")]
4178 async fn test_session_queue_metrics_enabled() {
4179 let agent = Agent::from_config(test_config()).await.unwrap();
4180 let qc = SessionQueueConfig::default().with_metrics();
4181 let opts = SessionOptions::new().with_queue_config(qc);
4182 let session = agent
4183 .session("/tmp/test-workspace-met", Some(opts))
4184 .unwrap();
4185 let metrics = session.queue_metrics().await;
4186 assert!(metrics.is_some());
4187 }
4188
4189 #[tokio::test(flavor = "multi_thread")]
4190 async fn test_session_set_lane_handler() {
4191 let agent = Agent::from_config(test_config()).await.unwrap();
4192 let qc = SessionQueueConfig::default();
4193 let opts = SessionOptions::new().with_queue_config(qc);
4194 let session = agent
4195 .session("/tmp/test-workspace-handler", Some(opts))
4196 .unwrap();
4197
4198 session
4200 .set_lane_handler(
4201 SessionLane::Execute,
4202 LaneHandlerConfig {
4203 mode: crate::queue::TaskHandlerMode::External,
4204 timeout_ms: 30_000,
4205 },
4206 )
4207 .await;
4208
4209 }
4212
4213 #[tokio::test(flavor = "multi_thread")]
4218 async fn test_session_has_id() {
4219 let agent = Agent::from_config(test_config()).await.unwrap();
4220 let session = agent.session("/tmp/test-ws-id", None).unwrap();
4221 assert!(!session.session_id().is_empty());
4223 assert_eq!(session.session_id().len(), 36); }
4225
4226 #[tokio::test(flavor = "multi_thread")]
4227 async fn test_session_explicit_id() {
4228 let agent = Agent::from_config(test_config()).await.unwrap();
4229 let opts = SessionOptions::new().with_session_id("my-session-42");
4230 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
4231 assert_eq!(session.session_id(), "my-session-42");
4232 }
4233
4234 #[tokio::test(flavor = "multi_thread")]
4235 async fn test_session_artifact_store_limits_option() {
4236 let agent = Agent::from_config(test_config()).await.unwrap();
4237 let opts =
4238 SessionOptions::new().with_artifact_store_limits(crate::tools::ArtifactStoreLimits {
4239 max_artifacts: 3,
4240 max_bytes: 4096,
4241 });
4242 let session = agent
4243 .session("/tmp/test-ws-artifact-limits", Some(opts))
4244 .unwrap();
4245
4246 let limits = session.tool_executor.artifact_store().limits();
4247 assert_eq!(limits.max_artifacts, 3);
4248 assert_eq!(limits.max_bytes, 4096);
4249 }
4250
4251 #[tokio::test(flavor = "multi_thread")]
4252 async fn test_session_save_no_store() {
4253 let agent = Agent::from_config(test_config()).await.unwrap();
4254 let session = agent.session("/tmp/test-ws-save", None).unwrap();
4255 session.save().await.unwrap();
4257 }
4258
4259 #[tokio::test(flavor = "multi_thread")]
4260 async fn test_session_save_and_load() {
4261 let store = Arc::new(crate::store::MemorySessionStore::new());
4262 let agent = Agent::from_config(test_config()).await.unwrap();
4263
4264 let opts = SessionOptions::new()
4265 .with_session_store(store.clone())
4266 .with_session_id("persist-test");
4267 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
4268
4269 session.save().await.unwrap();
4271
4272 assert!(store.exists("persist-test").await.unwrap());
4274
4275 let data = store.load("persist-test").await.unwrap().unwrap();
4276 assert_eq!(data.id, "persist-test");
4277 assert!(data.messages.is_empty());
4278 }
4279
4280 #[tokio::test(flavor = "multi_thread")]
4281 async fn test_session_save_with_history() {
4282 let store = Arc::new(crate::store::MemorySessionStore::new());
4283 let agent = Agent::from_config(test_config()).await.unwrap();
4284
4285 let opts = SessionOptions::new()
4286 .with_session_store(store.clone())
4287 .with_session_id("history-test");
4288 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
4289
4290 {
4292 let mut h = session.history.write().unwrap();
4293 h.push(Message::user("Hello"));
4294 h.push(Message::user("How are you?"));
4295 }
4296
4297 session.save().await.unwrap();
4298
4299 let data = store.load("history-test").await.unwrap().unwrap();
4300 assert_eq!(data.messages.len(), 2);
4301 }
4302
4303 #[tokio::test(flavor = "multi_thread")]
4304 async fn test_resume_session() {
4305 let store = Arc::new(crate::store::MemorySessionStore::new());
4306 let agent = Agent::from_config(test_config()).await.unwrap();
4307
4308 let opts = SessionOptions::new()
4310 .with_session_store(store.clone())
4311 .with_session_id("resume-test");
4312 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
4313 {
4314 let mut h = session.history.write().unwrap();
4315 h.push(Message::user("What is Rust?"));
4316 h.push(Message::user("Tell me more"));
4317 }
4318 session.save().await.unwrap();
4319
4320 let opts2 = SessionOptions::new().with_session_store(store.clone());
4322 let resumed = agent.resume_session("resume-test", opts2).unwrap();
4323
4324 assert_eq!(resumed.session_id(), "resume-test");
4325 let history = resumed.history();
4326 assert_eq!(history.len(), 2);
4327 assert_eq!(history[0].text(), "What is Rust?");
4328 }
4329
4330 #[tokio::test(flavor = "multi_thread")]
4331 async fn test_resume_session_restores_artifacts() {
4332 let store = Arc::new(crate::store::MemorySessionStore::new());
4333 let agent = Agent::from_config(test_config()).await.unwrap();
4334
4335 let opts = SessionOptions::new()
4336 .with_session_store(store.clone())
4337 .with_session_id("resume-artifacts-test");
4338 let session = agent.session("/tmp/test-ws-artifacts", Some(opts)).unwrap();
4339 session
4340 .tool_executor
4341 .artifact_store()
4342 .put(crate::tools::ToolArtifact {
4343 artifact_id: "tool-output:test:a".to_string(),
4344 artifact_uri: "a3s://tool-output/test/a".to_string(),
4345 tool_name: "test".to_string(),
4346 content: "artifact content".to_string(),
4347 original_bytes: 16,
4348 shown_bytes: 4,
4349 });
4350
4351 session.save().await.unwrap();
4352 let opts2 = SessionOptions::new().with_session_store(store.clone());
4353 let resumed = agent
4354 .resume_session("resume-artifacts-test", opts2)
4355 .unwrap();
4356
4357 let artifact = resumed
4358 .get_artifact("a3s://tool-output/test/a")
4359 .expect("artifact");
4360 assert_eq!(artifact.content, "artifact content");
4361 }
4362
4363 #[tokio::test(flavor = "multi_thread")]
4364 async fn test_resume_session_restores_trace_events() {
4365 let store = Arc::new(crate::store::MemorySessionStore::new());
4366 let agent = Agent::from_config(test_config()).await.unwrap();
4367 let event = crate::trace::TraceEvent::tool_execution(
4368 "read",
4369 true,
4370 0,
4371 std::time::Duration::from_millis(3),
4372 32,
4373 Some(&serde_json::json!({
4374 "artifact": {
4375 "artifact_uri": "a3s://tool-output/read/abc"
4376 }
4377 })),
4378 );
4379
4380 let opts = SessionOptions::new()
4381 .with_session_store(store.clone())
4382 .with_session_id("resume-trace-test");
4383 let session = agent.session("/tmp/test-ws-trace", Some(opts)).unwrap();
4384 session.trace_sink.replace_events(vec![event.clone()]);
4385 session.save().await.unwrap();
4386
4387 let opts2 = SessionOptions::new().with_session_store(store.clone());
4388 let resumed = agent.resume_session("resume-trace-test", opts2).unwrap();
4389
4390 assert_eq!(resumed.trace_events(), vec![event]);
4391 }
4392
4393 #[tokio::test(flavor = "multi_thread")]
4394 async fn test_resume_session_restores_run_records() {
4395 let store = Arc::new(crate::store::MemorySessionStore::new());
4396 let agent = Agent::from_config(test_config()).await.unwrap();
4397
4398 let opts = SessionOptions::new()
4399 .with_session_store(store.clone())
4400 .with_session_id("resume-runs-test");
4401 let session = agent.session("/tmp/test-ws-runs", Some(opts)).unwrap();
4402 let run = session
4403 .run_store
4404 .create_run(session.session_id(), "persist run")
4405 .await;
4406 session
4407 .run_store
4408 .record_event(
4409 &run.id,
4410 AgentEvent::Start {
4411 prompt: "persist run".to_string(),
4412 },
4413 )
4414 .await;
4415 session.save().await.unwrap();
4416
4417 let opts2 = SessionOptions::new().with_session_store(store.clone());
4418 let resumed = agent.resume_session("resume-runs-test", opts2).unwrap();
4419
4420 let runs = resumed.runs().await;
4421 assert_eq!(runs.len(), 1);
4422 assert_eq!(runs[0].prompt, "persist run");
4423 assert_eq!(resumed.run_events(&run.id).await.len(), 1);
4424 }
4425
4426 #[tokio::test(flavor = "multi_thread")]
4427 async fn test_resume_session_restores_verification_reports() {
4428 let store = Arc::new(crate::store::MemorySessionStore::new());
4429 let agent = Agent::from_config(test_config()).await.unwrap();
4430 let report = crate::verification::VerificationReport::new(
4431 "program:test",
4432 vec![crate::verification::VerificationCheck::required(
4433 "check:test",
4434 "test",
4435 "Run tests",
4436 )
4437 .with_status(crate::verification::VerificationStatus::Passed)],
4438 );
4439
4440 let opts = SessionOptions::new()
4441 .with_session_store(store.clone())
4442 .with_session_id("resume-verification-test");
4443 let session = agent
4444 .session("/tmp/test-ws-verification", Some(opts))
4445 .unwrap();
4446 session.record_verification_reports([report.clone()]);
4447 session.save().await.unwrap();
4448
4449 let opts2 = SessionOptions::new().with_session_store(store.clone());
4450 let resumed = agent
4451 .resume_session("resume-verification-test", opts2)
4452 .unwrap();
4453
4454 assert_eq!(resumed.verification_reports(), vec![report]);
4455 assert_eq!(
4456 resumed.verification_summary().status,
4457 crate::verification::VerificationStatus::Passed
4458 );
4459 assert!(resumed
4460 .verification_summary_text()
4461 .contains("Verification passed"));
4462 }
4463
4464 #[tokio::test(flavor = "multi_thread")]
4465 async fn test_verify_commands_builds_report_from_bash_results() {
4466 let temp_dir = tempfile::tempdir().unwrap();
4467 let agent = Agent::from_config(test_config()).await.unwrap();
4468 let session = agent
4469 .session(temp_dir.path().display().to_string(), None)
4470 .unwrap();
4471 let commands = vec![
4472 crate::verification::VerificationCommand::required(
4473 "check:smoke",
4474 "smoke",
4475 "Run smoke command",
4476 "printf ok",
4477 ),
4478 crate::verification::VerificationCommand::required(
4479 "check:failure",
4480 "smoke",
4481 "Run failing command",
4482 "exit 7",
4483 ),
4484 ];
4485
4486 let report = session.verify_commands("turn", &commands).await.unwrap();
4487
4488 assert_eq!(report.subject, "turn");
4489 assert_eq!(
4490 report.status,
4491 crate::verification::VerificationStatus::Failed
4492 );
4493 assert_eq!(
4494 report.checks[0].status,
4495 crate::verification::VerificationStatus::Passed
4496 );
4497 assert_eq!(
4498 report.checks[1].status,
4499 crate::verification::VerificationStatus::Failed
4500 );
4501 assert_eq!(
4502 report.checks[1].residual_risk.as_deref(),
4503 Some("verification command exited with code 7: exit 7")
4504 );
4505 assert_eq!(session.verification_reports(), vec![report]);
4506 assert_eq!(
4507 session.verification_summary().status,
4508 crate::verification::VerificationStatus::Failed
4509 );
4510 }
4511
4512 #[tokio::test(flavor = "multi_thread")]
4513 async fn test_session_verification_presets_reflect_workspace() {
4514 let temp_dir = tempfile::tempdir().unwrap();
4515 std::fs::write(
4516 temp_dir.path().join("package.json"),
4517 r#"{"scripts":{"test":"vitest","typecheck":"tsc --noEmit"}}"#,
4518 )
4519 .unwrap();
4520 let agent = Agent::from_config(test_config()).await.unwrap();
4521 let session = agent
4522 .session(temp_dir.path().display().to_string(), None)
4523 .unwrap();
4524
4525 let presets = session.verification_presets();
4526
4527 assert_eq!(presets.len(), 1);
4528 assert_eq!(presets[0].project_kind, "node");
4529 assert_eq!(presets[0].commands[0].command, "npm test");
4530 assert_eq!(presets[0].commands[1].command, "npm run typecheck");
4531 }
4532
4533 #[tokio::test(flavor = "multi_thread")]
4534 async fn test_resume_session_not_found() {
4535 let store = Arc::new(crate::store::MemorySessionStore::new());
4536 let agent = Agent::from_config(test_config()).await.unwrap();
4537
4538 let opts = SessionOptions::new().with_session_store(store.clone());
4539 let result = agent.resume_session("nonexistent", opts);
4540 assert!(result.is_err());
4541 assert!(result.unwrap_err().to_string().contains("not found"));
4542 }
4543
4544 #[tokio::test(flavor = "multi_thread")]
4545 async fn test_resume_session_no_store() {
4546 let agent = Agent::from_config(test_config()).await.unwrap();
4547 let opts = SessionOptions::new();
4548 let result = agent.resume_session("any-id", opts);
4549 assert!(result.is_err());
4550 assert!(result.unwrap_err().to_string().contains("session_store"));
4551 }
4552
4553 #[tokio::test(flavor = "multi_thread")]
4554 async fn test_file_session_store_persistence() {
4555 let dir = tempfile::TempDir::new().unwrap();
4556 let store = Arc::new(
4557 crate::store::FileSessionStore::new(dir.path())
4558 .await
4559 .unwrap(),
4560 );
4561 let agent = Agent::from_config(test_config()).await.unwrap();
4562
4563 let opts = SessionOptions::new()
4565 .with_session_store(store.clone())
4566 .with_session_id("file-persist");
4567 let session = agent
4568 .session("/tmp/test-ws-file-persist", Some(opts))
4569 .unwrap();
4570 {
4571 let mut h = session.history.write().unwrap();
4572 h.push(Message::user("test message"));
4573 }
4574 session.save().await.unwrap();
4575
4576 let store2 = Arc::new(
4578 crate::store::FileSessionStore::new(dir.path())
4579 .await
4580 .unwrap(),
4581 );
4582 let data = store2.load("file-persist").await.unwrap().unwrap();
4583 assert_eq!(data.messages.len(), 1);
4584 }
4585
4586 #[tokio::test(flavor = "multi_thread")]
4587 async fn test_session_options_builders() {
4588 let opts = SessionOptions::new()
4589 .with_session_id("test-id")
4590 .with_auto_save(true);
4591 assert_eq!(opts.session_id, Some("test-id".to_string()));
4592 assert!(opts.auto_save);
4593 }
4594
4595 #[tokio::test(flavor = "multi_thread")]
4600 async fn test_session_with_memory_store() {
4601 use a3s_memory::InMemoryStore;
4602 let store = Arc::new(InMemoryStore::new());
4603 let agent = Agent::from_config(test_config()).await.unwrap();
4604 let opts = SessionOptions::new().with_memory(store);
4605 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
4606 assert!(session.memory().is_some());
4607 }
4608
4609 #[tokio::test(flavor = "multi_thread")]
4610 async fn test_session_without_memory_store() {
4611 let agent = Agent::from_config(test_config()).await.unwrap();
4612 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
4613 assert!(session.memory().is_none());
4614 }
4615
4616 #[tokio::test(flavor = "multi_thread")]
4617 async fn test_session_memory_wired_into_config() {
4618 use a3s_memory::InMemoryStore;
4619 let store = Arc::new(InMemoryStore::new());
4620 let agent = Agent::from_config(test_config()).await.unwrap();
4621 let opts = SessionOptions::new().with_memory(store);
4622 let session = agent
4623 .session("/tmp/test-ws-mem-config", Some(opts))
4624 .unwrap();
4625 assert!(session.memory().is_some());
4627 }
4628
4629 #[tokio::test(flavor = "multi_thread")]
4630 async fn test_session_with_file_memory() {
4631 let dir = tempfile::TempDir::new().unwrap();
4632 let agent = Agent::from_config(test_config()).await.unwrap();
4633 let opts = SessionOptions::new().with_file_memory(dir.path());
4634 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
4635 assert!(session.memory().is_some());
4636 }
4637
4638 #[tokio::test(flavor = "multi_thread")]
4639 async fn test_memory_remember_and_recall() {
4640 use a3s_memory::InMemoryStore;
4641 let store = Arc::new(InMemoryStore::new());
4642 let agent = Agent::from_config(test_config()).await.unwrap();
4643 let opts = SessionOptions::new().with_memory(store);
4644 let session = agent
4645 .session("/tmp/test-ws-mem-recall", Some(opts))
4646 .unwrap();
4647
4648 let memory = session.memory().unwrap();
4649 memory
4650 .remember_success("write a file", &["write".to_string()], "done")
4651 .await
4652 .unwrap();
4653
4654 let results = memory.recall_similar("write", 5).await.unwrap();
4655 assert!(!results.is_empty());
4656 let stats = memory.stats().await.unwrap();
4657 assert_eq!(stats.long_term_count, 1);
4658 }
4659
4660 #[tokio::test(flavor = "multi_thread")]
4665 async fn test_session_tool_timeout_configured() {
4666 let agent = Agent::from_config(test_config()).await.unwrap();
4667 let opts = SessionOptions::new().with_tool_timeout(5000);
4668 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
4669 assert!(!session.id().is_empty());
4670 }
4671
4672 #[tokio::test(flavor = "multi_thread")]
4677 async fn test_session_without_queue_builds_ok() {
4678 let agent = Agent::from_config(test_config()).await.unwrap();
4679 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
4680 assert!(!session.id().is_empty());
4681 }
4682
4683 #[tokio::test(flavor = "multi_thread")]
4688 async fn test_concurrent_history_reads() {
4689 let agent = Agent::from_config(test_config()).await.unwrap();
4690 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
4691
4692 let handles: Vec<_> = (0..10)
4693 .map(|_| {
4694 let s = Arc::clone(&session);
4695 tokio::spawn(async move { s.history().len() })
4696 })
4697 .collect();
4698
4699 for h in handles {
4700 h.await.unwrap();
4701 }
4702 }
4703
4704 #[tokio::test(flavor = "multi_thread")]
4709 async fn test_session_no_init_warning_without_file_memory() {
4710 let agent = Agent::from_config(test_config()).await.unwrap();
4711 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
4712 assert!(session.init_warning().is_none());
4713 }
4714
4715 #[tokio::test(flavor = "multi_thread")]
4716 async fn test_register_agent_dir_loads_agents_into_live_session() {
4717 let temp_dir = tempfile::tempdir().unwrap();
4718
4719 std::fs::write(
4721 temp_dir.path().join("my-agent.yaml"),
4722 "name: my-dynamic-agent\ndescription: Dynamically registered agent\n",
4723 )
4724 .unwrap();
4725
4726 let agent = Agent::from_config(test_config()).await.unwrap();
4727 let session = agent.session(".", None).unwrap();
4728
4729 assert!(!session.agent_registry.exists("my-dynamic-agent"));
4731
4732 let count = session.register_agent_dir(temp_dir.path());
4733 assert_eq!(count, 1);
4734 assert!(session.agent_registry.exists("my-dynamic-agent"));
4735 }
4736
4737 #[tokio::test(flavor = "multi_thread")]
4738 async fn test_register_agent_dir_empty_dir_returns_zero() {
4739 let temp_dir = tempfile::tempdir().unwrap();
4740 let agent = Agent::from_config(test_config()).await.unwrap();
4741 let session = agent.session(".", None).unwrap();
4742 let count = session.register_agent_dir(temp_dir.path());
4743 assert_eq!(count, 0);
4744 }
4745
4746 #[tokio::test(flavor = "multi_thread")]
4747 async fn test_register_agent_dir_nonexistent_returns_zero() {
4748 let agent = Agent::from_config(test_config()).await.unwrap();
4749 let session = agent.session(".", None).unwrap();
4750 let count = session.register_agent_dir(std::path::Path::new("/nonexistent/path/abc"));
4751 assert_eq!(count, 0);
4752 }
4753
4754 #[tokio::test(flavor = "multi_thread")]
4755 async fn test_session_with_mcp_manager_builds_ok() {
4756 use crate::mcp::manager::McpManager;
4757 let mcp = Arc::new(McpManager::new());
4758 let agent = Agent::from_config(test_config()).await.unwrap();
4759 let opts = SessionOptions::new().with_mcp(mcp);
4760 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
4762 assert!(!session.id().is_empty());
4763 }
4764
4765 #[test]
4766 fn test_session_command_is_available_from_queue_module() {
4767 use crate::queue::SessionCommand;
4769 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
4770 }
4771}