1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{CommandAction, CommandContext, CommandRegistry};
21use crate::config::CodeConfig;
22use crate::context::{ContextItem, ContextType, StaticContextProvider};
23use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
24use crate::hitl::PendingConfirmationInfo;
25use crate::llm::{LlmClient, Message};
26use crate::prompts::{PlanningMode, SystemPromptSlots};
27use crate::queue::{
28 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
29 SessionQueueStats,
30};
31use crate::session_lane_queue::SessionLaneQueue;
32use crate::text::truncate_utf8;
33use crate::tools::{ToolContext, ToolExecutor};
34use a3s_lane::{DeadLetter, MetricsSnapshot};
35use a3s_memory::{FileMemoryStore, MemoryStore};
36use anyhow::Context;
37use std::collections::HashMap;
38use std::path::{Path, PathBuf};
39use std::sync::{Arc, RwLock};
40use tokio::sync::{broadcast, mpsc};
41use tokio::task::JoinHandle;
42
43fn safe_canonicalize(path: &Path) -> PathBuf {
46 match std::fs::canonicalize(path) {
47 Ok(p) => strip_unc_prefix(p),
48 Err(_) => path.to_path_buf(),
49 }
50}
51
52fn strip_unc_prefix(path: PathBuf) -> PathBuf {
55 #[cfg(windows)]
56 {
57 let s = path.to_string_lossy();
58 if let Some(stripped) = s.strip_prefix(r"\\?\") {
59 return PathBuf::from(stripped);
60 }
61 }
62 path
63}
64
65#[derive(Debug, Clone)]
71pub struct ToolCallResult {
72 pub name: String,
73 pub output: String,
74 pub exit_code: i32,
75 pub metadata: Option<serde_json::Value>,
76}
77
78#[derive(Clone, Default)]
84pub struct SessionOptions {
85 pub model: Option<String>,
87 pub agent_dirs: Vec<PathBuf>,
90 pub queue_config: Option<SessionQueueConfig>,
95 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
97 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
99 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
101 pub confirmation_policy: Option<crate::hitl::ConfirmationPolicy>,
103 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
105 pub planning_mode: PlanningMode,
107 pub goal_tracking: bool,
109 pub skill_dirs: Vec<PathBuf>,
112 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
114 pub memory_store: Option<Arc<dyn MemoryStore>>,
116 pub(crate) file_memory_dir: Option<PathBuf>,
118 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
120 pub session_id: Option<String>,
122 pub auto_save: bool,
124 pub artifact_store_limits: Option<crate::tools::ArtifactStoreLimits>,
126 pub max_parse_retries: Option<u32>,
129 pub tool_timeout_ms: Option<u64>,
132 pub circuit_breaker_threshold: Option<u32>,
136 pub sandbox_handle: Option<Arc<dyn crate::sandbox::BashSandbox>>,
142 pub auto_compact: bool,
144 pub auto_compact_threshold: Option<f32>,
147 pub continuation_enabled: Option<bool>,
150 pub max_continuation_turns: Option<u32>,
153 pub max_execution_time_ms: Option<u64>,
157 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
162 pub temperature: Option<f32>,
164 pub thinking_budget: Option<usize>,
166 pub max_tool_rounds: Option<usize>,
172 pub prompt_slots: Option<SystemPromptSlots>,
178 pub hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
185}
186
187impl std::fmt::Debug for SessionOptions {
188 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
189 f.debug_struct("SessionOptions")
190 .field("model", &self.model)
191 .field("agent_dirs", &self.agent_dirs)
192 .field("skill_dirs", &self.skill_dirs)
193 .field("queue_config", &self.queue_config)
194 .field("security_provider", &self.security_provider.is_some())
195 .field("context_providers", &self.context_providers.len())
196 .field("confirmation_manager", &self.confirmation_manager.is_some())
197 .field("permission_checker", &self.permission_checker.is_some())
198 .field("planning_mode", &self.planning_mode)
199 .field("goal_tracking", &self.goal_tracking)
200 .field(
201 "skill_registry",
202 &self
203 .skill_registry
204 .as_ref()
205 .map(|r| format!("{} skills", r.len())),
206 )
207 .field("memory_store", &self.memory_store.is_some())
208 .field("session_store", &self.session_store.is_some())
209 .field("session_id", &self.session_id)
210 .field("auto_save", &self.auto_save)
211 .field("artifact_store_limits", &self.artifact_store_limits)
212 .field("max_parse_retries", &self.max_parse_retries)
213 .field("tool_timeout_ms", &self.tool_timeout_ms)
214 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
215 .field("sandbox_handle", &self.sandbox_handle.is_some())
216 .field("auto_compact", &self.auto_compact)
217 .field("auto_compact_threshold", &self.auto_compact_threshold)
218 .field("continuation_enabled", &self.continuation_enabled)
219 .field("max_continuation_turns", &self.max_continuation_turns)
220 .field("mcp_manager", &self.mcp_manager.is_some())
221 .field("temperature", &self.temperature)
222 .field("thinking_budget", &self.thinking_budget)
223 .field("max_tool_rounds", &self.max_tool_rounds)
224 .field("prompt_slots", &self.prompt_slots.is_some())
225 .finish()
226 }
227}
228
229impl SessionOptions {
230 pub fn new() -> Self {
231 Self::default()
232 }
233
234 pub fn with_model(mut self, model: impl Into<String>) -> Self {
235 self.model = Some(model.into());
236 self
237 }
238
239 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
240 self.agent_dirs.push(dir.into());
241 self
242 }
243
244 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
245 self.queue_config = Some(config);
246 self
247 }
248
249 pub fn with_default_security(mut self) -> Self {
251 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
252 self
253 }
254
255 pub fn with_security_provider(
257 mut self,
258 provider: Arc<dyn crate::security::SecurityProvider>,
259 ) -> Self {
260 self.security_provider = Some(provider);
261 self
262 }
263
264 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
266 let config = crate::context::FileSystemContextConfig::new(root_path);
267 self.context_providers
268 .push(Arc::new(crate::context::FileSystemContextProvider::new(
269 config,
270 )));
271 self
272 }
273
274 pub fn with_context_provider(
276 mut self,
277 provider: Arc<dyn crate::context::ContextProvider>,
278 ) -> Self {
279 self.context_providers.push(provider);
280 self
281 }
282
283 pub fn with_confirmation_manager(
285 mut self,
286 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
287 ) -> Self {
288 self.confirmation_manager = Some(manager);
289 self
290 }
291
292 pub fn with_confirmation_policy(mut self, policy: crate::hitl::ConfirmationPolicy) -> Self {
297 self.confirmation_policy = Some(policy);
298 self
299 }
300
301 pub fn with_permission_checker(
303 mut self,
304 checker: Arc<dyn crate::permissions::PermissionChecker>,
305 ) -> Self {
306 self.permission_checker = Some(checker);
307 self
308 }
309
310 pub fn with_planning_mode(mut self, mode: PlanningMode) -> Self {
312 self.planning_mode = mode;
313 self
314 }
315
316 pub fn with_planning(mut self, enabled: bool) -> Self {
318 self.planning_mode = if enabled {
319 PlanningMode::Enabled
320 } else {
321 PlanningMode::Disabled
322 };
323 self
324 }
325
326 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
328 self.goal_tracking = enabled;
329 self
330 }
331
332 pub fn with_builtin_skills(mut self) -> Self {
334 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
335 self
336 }
337
338 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
340 self.skill_registry = Some(registry);
341 self
342 }
343
344 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
347 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
348 self
349 }
350
351 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
353 let registry = self
354 .skill_registry
355 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
356 if let Err(e) = registry.load_from_dir(&dir) {
357 tracing::warn!(
358 dir = %dir.as_ref().display(),
359 error = %e,
360 "Failed to load skills from directory — continuing without them"
361 );
362 }
363 self.skill_registry = Some(registry);
364 self
365 }
366
367 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
369 self.memory_store = Some(store);
370 self
371 }
372
373 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
379 self.file_memory_dir = Some(dir.into());
380 self
381 }
382
383 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
385 self.session_store = Some(store);
386 self
387 }
388
389 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
391 let dir = dir.into();
392 match tokio::runtime::Handle::try_current() {
393 Ok(handle) => {
394 match tokio::task::block_in_place(|| {
395 handle.block_on(crate::store::FileSessionStore::new(dir))
396 }) {
397 Ok(store) => {
398 self.session_store =
399 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
400 }
401 Err(e) => {
402 tracing::warn!("Failed to create file session store: {}", e);
403 }
404 }
405 }
406 Err(_) => {
407 tracing::warn!(
408 "No async runtime available for file session store — persistence disabled"
409 );
410 }
411 }
412 self
413 }
414
415 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
417 self.session_id = Some(id.into());
418 self
419 }
420
421 pub fn with_auto_save(mut self, enabled: bool) -> Self {
423 self.auto_save = enabled;
424 self
425 }
426
427 pub fn with_artifact_store_limits(mut self, limits: crate::tools::ArtifactStoreLimits) -> Self {
429 self.artifact_store_limits = Some(limits);
430 self
431 }
432
433 pub fn with_parse_retries(mut self, max: u32) -> Self {
439 self.max_parse_retries = Some(max);
440 self
441 }
442
443 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
449 self.tool_timeout_ms = Some(timeout_ms);
450 self
451 }
452
453 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
459 self.circuit_breaker_threshold = Some(threshold);
460 self
461 }
462
463 pub fn with_resilience_defaults(self) -> Self {
469 self.with_parse_retries(2)
470 .with_tool_timeout(120_000)
471 .with_circuit_breaker(3)
472 }
473
474 pub fn with_sandbox_handle(mut self, handle: Arc<dyn crate::sandbox::BashSandbox>) -> Self {
482 self.sandbox_handle = Some(handle);
483 self
484 }
485
486 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
491 self.auto_compact = enabled;
492 self
493 }
494
495 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
497 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
498 self
499 }
500
501 pub fn with_continuation(mut self, enabled: bool) -> Self {
506 self.continuation_enabled = Some(enabled);
507 self
508 }
509
510 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
512 self.max_continuation_turns = Some(turns);
513 self
514 }
515
516 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
521 self.mcp_manager = Some(manager);
522 self
523 }
524
525 pub fn with_temperature(mut self, temperature: f32) -> Self {
526 self.temperature = Some(temperature);
527 self
528 }
529
530 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
531 self.thinking_budget = Some(budget);
532 self
533 }
534
535 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
540 self.max_tool_rounds = Some(rounds);
541 self
542 }
543
544 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
549 self.prompt_slots = Some(slots);
550 self
551 }
552
553 pub fn with_hook_executor(mut self, executor: Arc<dyn crate::hooks::HookExecutor>) -> Self {
559 self.hook_executor = Some(executor);
560 self
561 }
562}
563
564pub struct Agent {
573 code_config: CodeConfig,
574 config: AgentConfig,
575 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
577 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
580}
581
582impl std::fmt::Debug for Agent {
583 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
584 f.debug_struct("Agent").finish()
585 }
586}
587
588impl Agent {
589 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
593 let source = config_source.into();
594
595 let expanded = if let Some(rest) = source.strip_prefix("~/") {
597 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
598 if let Some(home) = home {
599 PathBuf::from(home).join(rest).display().to_string()
600 } else {
601 source.clone()
602 }
603 } else {
604 source.clone()
605 };
606
607 let path = Path::new(&expanded);
608
609 let ext = path.extension().and_then(|ext| ext.to_str());
610
611 let config = if matches!(ext, Some("acl")) {
612 if !path.exists() {
613 return Err(CodeError::Config(format!(
614 "Config file not found: {}",
615 path.display()
616 )));
617 }
618
619 CodeConfig::from_file(path)
620 .with_context(|| format!("Failed to load config: {}", path.display()))?
621 } else if matches!(ext, Some("hcl")) {
622 return Err(CodeError::Config(
623 "HCL config files are not supported in 2.0; rename the file to .acl".into(),
624 ));
625 } else if source.trim().starts_with('{') {
626 return Err(CodeError::Config(
627 "JSON config is not supported; use ACL-compatible .acl config".into(),
628 ));
629 } else if matches!(ext, Some("json")) {
630 return Err(CodeError::Config(
631 "JSON config files are not supported; use .acl".into(),
632 ));
633 } else {
634 CodeConfig::from_acl(&source).context("Failed to parse config as ACL string")?
635 };
636
637 Self::from_config(config).await
638 }
639
640 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
645 Self::new(config_source).await
646 }
647
648 pub async fn from_config(config: CodeConfig) -> Result<Self> {
650 config
651 .default_llm_config()
652 .context("default_model must be set in 'provider/model' format with a valid API key")?;
653
654 let agent_config = AgentConfig {
655 max_tool_rounds: config
656 .max_tool_rounds
657 .unwrap_or(AgentConfig::default().max_tool_rounds),
658 ..AgentConfig::default()
659 };
660
661 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
663 (None, vec![])
664 } else {
665 let manager = Arc::new(crate::mcp::manager::McpManager::new());
666 for server in &config.mcp_servers {
667 if !server.enabled {
668 continue;
669 }
670 manager.register_server(server.clone()).await;
671 if let Err(e) = manager.connect(&server.name).await {
672 tracing::warn!(
673 server = %server.name,
674 error = %e,
675 "Failed to connect to MCP server — skipping"
676 );
677 }
678 }
679 let tools = manager.get_all_tools().await;
681 (Some(manager), tools)
682 };
683
684 let mut agent = Agent {
685 code_config: config,
686 config: agent_config,
687 global_mcp,
688 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
689 };
690
691 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
693 for dir in &agent.code_config.skill_dirs.clone() {
694 if let Err(e) = registry.load_from_dir(dir) {
695 tracing::warn!(
696 dir = %dir.display(),
697 error = %e,
698 "Failed to load skills from directory — skipping"
699 );
700 }
701 }
702 agent.config.skill_registry = Some(registry);
703
704 Ok(agent)
705 }
706
707 pub async fn refresh_mcp_tools(&self) -> Result<()> {
715 if let Some(ref mcp) = self.global_mcp {
716 let fresh = mcp.get_all_tools().await;
717 *self
718 .global_mcp_tools
719 .lock()
720 .expect("global_mcp_tools lock poisoned") = fresh;
721 }
722 Ok(())
723 }
724
725 pub fn session(
730 &self,
731 workspace: impl Into<String>,
732 options: Option<SessionOptions>,
733 ) -> Result<AgentSession> {
734 let opts = options.unwrap_or_default();
735
736 let mut merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
739 (Some(global), Some(session)) => {
740 let global = Arc::clone(global);
741 let session_mgr = Arc::clone(session);
742 match tokio::runtime::Handle::try_current() {
743 Ok(handle) => {
744 let global_for_merge = Arc::clone(&global);
745 tokio::task::block_in_place(|| {
746 handle.block_on(async move {
747 for config in session_mgr.all_configs().await {
748 let name = config.name.clone();
749 global_for_merge.register_server(config).await;
750 if let Err(e) = global_for_merge.connect(&name).await {
751 tracing::warn!(
752 server = %name,
753 error = %e,
754 "Failed to connect session-level MCP server — skipping"
755 );
756 }
757 }
758 })
759 });
760 }
761 Err(_) => {
762 tracing::warn!(
763 "No async runtime available to merge session-level MCP servers \
764 into global manager — session MCP servers will not be available"
765 );
766 }
767 }
768 SessionOptions {
769 mcp_manager: Some(Arc::clone(&global)),
770 ..opts
771 }
772 }
773 (Some(global), None) => SessionOptions {
774 mcp_manager: Some(Arc::clone(global)),
775 ..opts
776 },
777 _ => opts,
778 };
779
780 let session_id = merged_opts
781 .session_id
782 .clone()
783 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
784 merged_opts.session_id = Some(session_id.clone());
785 let llm_client = self.resolve_session_llm_client(&merged_opts, Some(&session_id))?;
786
787 self.build_session(workspace.into(), llm_client, &merged_opts)
788 }
789
790 pub fn session_for_agent(
805 &self,
806 workspace: impl Into<String>,
807 def: &crate::subagent::AgentDefinition,
808 extra: Option<SessionOptions>,
809 ) -> Result<AgentSession> {
810 let mut opts = extra.unwrap_or_default();
811
812 if opts.permission_checker.is_none()
814 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
815 {
816 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
817 }
818
819 if opts.max_tool_rounds.is_none() {
821 if let Some(steps) = def.max_steps {
822 opts.max_tool_rounds = Some(steps);
823 }
824 }
825
826 if opts.model.is_none() {
828 if let Some(ref m) = def.model {
829 let provider = m.provider.as_deref().unwrap_or("anthropic");
830 opts.model = Some(format!("{}/{}", provider, m.model));
831 }
832 }
833
834 if let Some(ref prompt) = def.prompt {
841 let slots = opts
842 .prompt_slots
843 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
844 if slots.extra.is_none() {
845 slots.extra = Some(prompt.clone());
846 }
847 }
848
849 self.session(workspace, Some(opts))
850 }
851
852 pub fn resume_session(
860 &self,
861 session_id: &str,
862 options: SessionOptions,
863 ) -> Result<AgentSession> {
864 let store = options.session_store.clone().ok_or_else(|| {
865 crate::error::CodeError::Session(
866 "resume_session requires a session_store in SessionOptions".to_string(),
867 )
868 })?;
869
870 let data = match tokio::runtime::Handle::try_current() {
872 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
873 .map_err(|e| {
874 crate::error::CodeError::Session(format!(
875 "Failed to load session {}: {}",
876 session_id, e
877 ))
878 })?,
879 Err(_) => {
880 return Err(crate::error::CodeError::Session(
881 "No async runtime available for session resume".to_string(),
882 ))
883 }
884 };
885
886 let data = data.ok_or_else(|| {
887 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
888 })?;
889
890 let mut opts = options;
892 opts.session_id = Some(data.id.clone());
893 let llm_client = self.resolve_session_llm_client(&opts, Some(&data.id))?;
894
895 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
896
897 *write_or_recover(&session.history) = data.messages;
899 let artifacts = match tokio::runtime::Handle::try_current() {
900 Ok(handle) => {
901 tokio::task::block_in_place(|| handle.block_on(store.load_artifacts(&data.id)))
902 .map_err(|e| {
903 crate::error::CodeError::Session(format!(
904 "Failed to load artifacts for session {}: {}",
905 data.id, e
906 ))
907 })?
908 }
909 Err(_) => None,
910 };
911 if let Some(artifacts) = artifacts {
912 let target_store = session.tool_executor.artifact_store();
913 for artifact in artifacts.artifacts() {
914 target_store.put(artifact);
915 }
916 }
917
918 let trace_events = match tokio::runtime::Handle::try_current() {
919 Ok(handle) => {
920 tokio::task::block_in_place(|| handle.block_on(store.load_trace_events(&data.id)))
921 .map_err(|e| {
922 crate::error::CodeError::Session(format!(
923 "Failed to load trace events for session {}: {}",
924 data.id, e
925 ))
926 })?
927 }
928 Err(_) => None,
929 };
930 if let Some(events) = trace_events {
931 session.trace_sink.replace_events(events);
932 }
933
934 let run_records = match tokio::runtime::Handle::try_current() {
935 Ok(handle) => {
936 tokio::task::block_in_place(|| handle.block_on(store.load_run_records(&data.id)))
937 .map_err(|e| {
938 crate::error::CodeError::Session(format!(
939 "Failed to load run records for session {}: {}",
940 data.id, e
941 ))
942 })?
943 }
944 Err(_) => None,
945 };
946 if let Some(records) = run_records {
947 if let Ok(handle) = tokio::runtime::Handle::try_current() {
948 tokio::task::block_in_place(|| {
949 handle.block_on(session.run_store.replace_records(records))
950 });
951 }
952 }
953
954 let verification_reports = match tokio::runtime::Handle::try_current() {
955 Ok(handle) => tokio::task::block_in_place(|| {
956 handle.block_on(store.load_verification_reports(&data.id))
957 })
958 .map_err(|e| {
959 crate::error::CodeError::Session(format!(
960 "Failed to load verification reports for session {}: {}",
961 data.id, e
962 ))
963 })?,
964 Err(_) => None,
965 };
966 if let Some(reports) = verification_reports {
967 *write_or_recover(&session.verification_reports) = reports;
968 }
969
970 Ok(session)
971 }
972
973 fn resolve_session_llm_client(
974 &self,
975 opts: &SessionOptions,
976 session_id: Option<&str>,
977 ) -> Result<Arc<dyn LlmClient>> {
978 let model_ref = if let Some(ref model) = opts.model {
979 model.as_str()
980 } else {
981 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
982 tracing::warn!(
983 "temperature/thinking_budget set without model override — these will be ignored. \
984 Use with_model() to apply LLM parameter overrides."
985 );
986 }
987 self.code_config
988 .default_model
989 .as_deref()
990 .context("default_model must be set in 'provider/model' format")?
991 };
992
993 let (provider_name, model_id) = model_ref
994 .split_once('/')
995 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
996
997 let mut llm_config = self
998 .code_config
999 .llm_config(provider_name, model_id)
1000 .with_context(|| {
1001 format!("provider '{provider_name}' or model '{model_id}' not found in config")
1002 })?;
1003
1004 if opts.model.is_some() {
1005 if let Some(temp) = opts.temperature {
1006 llm_config = llm_config.with_temperature(temp);
1007 }
1008 if let Some(budget) = opts.thinking_budget {
1009 llm_config = llm_config.with_thinking_budget(budget);
1010 }
1011 }
1012
1013 if let Some(session_id) = session_id {
1014 llm_config = llm_config.with_session_id(session_id);
1015 }
1016
1017 Ok(crate::llm::create_client_with_config(llm_config))
1018 }
1019
1020 fn build_session(
1021 &self,
1022 workspace: String,
1023 llm_client: Arc<dyn LlmClient>,
1024 opts: &SessionOptions,
1025 ) -> Result<AgentSession> {
1026 let canonical = safe_canonicalize(Path::new(&workspace));
1027
1028 let artifact_limits = opts.artifact_store_limits.unwrap_or_default();
1029 let tool_executor = Arc::new(ToolExecutor::new_with_artifact_limits(
1030 canonical.display().to_string(),
1031 artifact_limits,
1032 ));
1033 let trace_sink = crate::trace::InMemoryTraceSink::default();
1034 tool_executor.set_trace_sink(Arc::new(trace_sink.clone()));
1035
1036 if let Some(ref search_config) = self.code_config.search {
1038 tool_executor
1039 .registry()
1040 .set_search_config(search_config.clone());
1041 }
1042
1043 let agent_registry = {
1047 use crate::subagent::{load_agents_from_dir, AgentRegistry};
1048 use crate::tools::register_task_with_mcp;
1049 let registry = AgentRegistry::new();
1050 for dir in self
1051 .code_config
1052 .agent_dirs
1053 .iter()
1054 .chain(opts.agent_dirs.iter())
1055 {
1056 for agent in load_agents_from_dir(dir) {
1057 registry.register(agent);
1058 }
1059 }
1060 let registry = Arc::new(registry);
1061 register_task_with_mcp(
1062 tool_executor.registry(),
1063 Arc::clone(&llm_client),
1064 Arc::clone(®istry),
1065 canonical.display().to_string(),
1066 opts.mcp_manager.clone(),
1067 );
1068 registry
1069 };
1070
1071 if let Some(ref mcp) = opts.mcp_manager {
1074 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
1077 Arc::as_ptr(mcp),
1078 self.global_mcp
1079 .as_ref()
1080 .map(Arc::as_ptr)
1081 .unwrap_or(std::ptr::null()),
1082 ) {
1083 self.global_mcp_tools
1085 .lock()
1086 .expect("global_mcp_tools lock poisoned")
1087 .clone()
1088 } else {
1089 match tokio::runtime::Handle::try_current() {
1091 Ok(handle) => {
1092 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
1093 }
1094 Err(_) => {
1095 tracing::warn!(
1096 "No async runtime available for session-level MCP tools — \
1097 MCP tools will not be registered"
1098 );
1099 vec![]
1100 }
1101 }
1102 };
1103
1104 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
1105 std::collections::HashMap::new();
1106 for (server, tool) in all_tools {
1107 by_server.entry(server).or_default().push(tool);
1108 }
1109 for (server_name, tools) in by_server {
1110 for tool in
1111 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
1112 {
1113 tool_executor.register_dynamic_tool(tool);
1114 }
1115 }
1116 }
1117
1118 let tool_defs = tool_executor.definitions();
1119
1120 let prompt_slots = opts
1122 .prompt_slots
1123 .clone()
1124 .unwrap_or_else(|| self.config.prompt_slots.clone());
1125
1126 let mut context_providers = opts.context_providers.clone();
1127
1128 let agents_md_path = canonical.join("AGENTS.md");
1130 if agents_md_path.exists() && agents_md_path.is_file() {
1131 match std::fs::read_to_string(&agents_md_path) {
1132 Ok(content) if !content.trim().is_empty() => {
1133 tracing::info!(
1134 path = %agents_md_path.display(),
1135 "Auto-loaded AGENTS.md from workspace root"
1136 );
1137 let token_count = content.split_whitespace().count().max(1);
1138 let item = ContextItem::new(
1139 "agents_md",
1140 ContextType::Resource,
1141 format!("# Project Instructions (AGENTS.md)\n\n{}", content),
1142 )
1143 .with_source(format!("file://{}", agents_md_path.display()))
1144 .with_provenance("workspace_instructions")
1145 .with_priority(0.95)
1146 .with_trust(0.95)
1147 .with_freshness(1.0)
1148 .with_relevance(0.95)
1149 .with_token_count(token_count);
1150
1151 context_providers.push(Arc::new(
1152 StaticContextProvider::new("agents_md").with_item(item),
1153 ));
1154 }
1155 Ok(_) => {
1156 tracing::debug!(
1157 path = %agents_md_path.display(),
1158 "AGENTS.md exists but is empty — skipping"
1159 );
1160 }
1161 Err(e) => {
1162 tracing::warn!(
1163 path = %agents_md_path.display(),
1164 error = %e,
1165 "Failed to read AGENTS.md — skipping"
1166 );
1167 }
1168 }
1169 }
1170
1171 let base_registry = self
1175 .config
1176 .skill_registry
1177 .as_deref()
1178 .map(|r| r.fork())
1179 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
1180 if let Some(ref r) = opts.skill_registry {
1182 for skill in r.all() {
1183 base_registry.register_unchecked(skill);
1184 }
1185 }
1186 for dir in &opts.skill_dirs {
1188 if let Err(e) = base_registry.load_from_dir(dir) {
1189 tracing::warn!(
1190 dir = %dir.display(),
1191 error = %e,
1192 "Failed to load session skill dir — skipping"
1193 );
1194 }
1195 }
1196 let effective_registry = Arc::new(base_registry);
1197
1198 let skill_prompt = effective_registry.to_system_prompt();
1200 if !skill_prompt.is_empty() {
1201 let item = ContextItem::new("skills_catalog", ContextType::Skill, skill_prompt)
1202 .with_source("a3s://skills/catalog")
1203 .with_provenance("skill_registry")
1204 .with_priority(0.85)
1205 .with_trust(0.9)
1206 .with_freshness(1.0)
1207 .with_relevance(1.0);
1208 context_providers.push(Arc::new(
1209 StaticContextProvider::new("skills_catalog").with_item(item),
1210 ));
1211 }
1212
1213 let mut init_warning: Option<String> = None;
1215 let memory = {
1216 let store = if let Some(ref store) = opts.memory_store {
1217 Some(Arc::clone(store))
1218 } else if let Some(ref dir) = opts.file_memory_dir {
1219 match tokio::runtime::Handle::try_current() {
1220 Ok(handle) => {
1221 let dir = dir.clone();
1222 match tokio::task::block_in_place(|| {
1223 handle.block_on(FileMemoryStore::new(dir))
1224 }) {
1225 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1226 Err(e) => {
1227 let msg = format!("Failed to create file memory store: {}", e);
1228 tracing::warn!("{}", msg);
1229 init_warning = Some(msg);
1230 None
1231 }
1232 }
1233 }
1234 Err(_) => {
1235 let msg =
1236 "No async runtime available for file memory store — memory disabled"
1237 .to_string();
1238 tracing::warn!("{}", msg);
1239 init_warning = Some(msg);
1240 None
1241 }
1242 }
1243 } else {
1244 None
1245 };
1246 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1247 };
1248
1249 let base = self.config.clone();
1250
1251 let config = AgentConfig {
1252 prompt_slots,
1253 tools: tool_defs,
1254 security_provider: opts.security_provider.clone(),
1255 permission_checker: opts.permission_checker.clone(),
1256 confirmation_manager: None, context_providers,
1258 planning_mode: opts.planning_mode,
1259 goal_tracking: opts.goal_tracking,
1260 skill_registry: Some(Arc::clone(&effective_registry)),
1261 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1262 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1263 circuit_breaker_threshold: opts
1264 .circuit_breaker_threshold
1265 .unwrap_or(base.circuit_breaker_threshold),
1266 auto_compact: opts.auto_compact,
1267 auto_compact_threshold: opts
1268 .auto_compact_threshold
1269 .unwrap_or(crate::store::DEFAULT_AUTO_COMPACT_THRESHOLD),
1270 max_context_tokens: base.max_context_tokens,
1271 memory: memory.clone(),
1272 continuation_enabled: opts
1273 .continuation_enabled
1274 .unwrap_or(base.continuation_enabled),
1275 max_continuation_turns: opts
1276 .max_continuation_turns
1277 .unwrap_or(base.max_continuation_turns),
1278 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1279 max_execution_time_ms: opts.max_execution_time_ms.or(base.max_execution_time_ms),
1280 ..base
1281 };
1282
1283 {
1287 use crate::tools::register_skill;
1288 register_skill(
1289 tool_executor.registry(),
1290 Arc::clone(&llm_client),
1291 Arc::clone(&effective_registry),
1292 Arc::clone(&tool_executor),
1293 config.clone(),
1294 );
1295 }
1296
1297 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(2048);
1300
1301 let confirmation_manager = if opts.confirmation_manager.is_some() {
1303 opts.confirmation_manager.clone()
1304 } else if let Some(policy) = &opts.confirmation_policy {
1305 let manager = Arc::new(crate::hitl::ConfirmationManager::new(
1307 policy.clone(),
1308 agent_event_tx.clone(),
1309 ));
1310 Some(manager as Arc<dyn crate::hitl::ConfirmationProvider>)
1311 } else {
1312 None
1313 };
1314
1315 let config = AgentConfig {
1317 confirmation_manager,
1318 ..config
1319 };
1320
1321 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1322 let session_id = uuid::Uuid::new_v4().to_string();
1323 let rt = tokio::runtime::Handle::try_current();
1324
1325 match rt {
1326 Ok(handle) => {
1327 let queue = tokio::task::block_in_place(|| {
1329 handle.block_on(SessionLaneQueue::new(
1330 &session_id,
1331 queue_config.clone(),
1332 agent_event_tx.clone(),
1333 ))
1334 });
1335 match queue {
1336 Ok(q) => {
1337 let q = Arc::new(q);
1339 let q2 = Arc::clone(&q);
1340 tokio::task::block_in_place(|| {
1341 handle.block_on(async { q2.start().await.ok() })
1342 });
1343 Some(q)
1344 }
1345 Err(e) => {
1346 tracing::warn!("Failed to create session lane queue: {}", e);
1347 None
1348 }
1349 }
1350 }
1351 Err(_) => {
1352 tracing::warn!(
1353 "No async runtime available for queue creation — queue disabled"
1354 );
1355 None
1356 }
1357 }
1358 } else {
1359 None
1360 };
1361
1362 let mut tool_context = ToolContext::new(canonical.clone());
1364 if let Some(ref search_config) = self.code_config.search {
1365 tool_context = tool_context.with_search_config(search_config.clone());
1366 }
1367 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1368
1369 if let Some(handle) = opts.sandbox_handle.clone() {
1371 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1372 tool_context = tool_context.with_sandbox(handle);
1373 }
1374
1375 let session_id = opts
1376 .session_id
1377 .clone()
1378 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1379
1380 let session_store = if opts.session_store.is_some() {
1382 opts.session_store.clone()
1383 } else if let Some(ref dir) = self.code_config.sessions_dir {
1384 match tokio::runtime::Handle::try_current() {
1385 Ok(handle) => {
1386 let dir = dir.clone();
1387 match tokio::task::block_in_place(|| {
1388 handle.block_on(crate::store::FileSessionStore::new(dir))
1389 }) {
1390 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1391 Err(e) => {
1392 tracing::warn!(
1393 "Failed to create session store from sessions_dir: {}",
1394 e
1395 );
1396 None
1397 }
1398 }
1399 }
1400 Err(_) => {
1401 tracing::warn!(
1402 "No async runtime for sessions_dir store — persistence disabled"
1403 );
1404 None
1405 }
1406 }
1407 } else {
1408 None
1409 };
1410
1411 let command_registry = CommandRegistry::new();
1412
1413 Ok(AgentSession {
1414 llm_client,
1415 tool_executor,
1416 tool_context,
1417 memory: config.memory.clone(),
1418 config,
1419 workspace: canonical,
1420 session_id,
1421 history: Arc::new(RwLock::new(Vec::new())),
1422 command_queue,
1423 session_store,
1424 auto_save: opts.auto_save,
1425 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1426 ahp_executor: opts.hook_executor.clone(),
1427 init_warning,
1428 command_registry: std::sync::Mutex::new(command_registry),
1429 model_name: opts
1430 .model
1431 .clone()
1432 .or_else(|| self.code_config.default_model.clone())
1433 .unwrap_or_else(|| "unknown".to_string()),
1434 mcp_manager: opts
1435 .mcp_manager
1436 .clone()
1437 .or_else(|| self.global_mcp.clone())
1438 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1439 agent_registry,
1440 cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
1441 current_run_id: Arc::new(tokio::sync::Mutex::new(None)),
1442 run_store: Arc::new(crate::run::InMemoryRunStore::new()),
1443 active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
1444 trace_sink,
1445 verification_reports: Arc::new(RwLock::new(Vec::new())),
1446 })
1447 }
1448}
1449
1450#[derive(Debug, Clone)]
1459pub struct BtwResult {
1460 pub question: String,
1462 pub answer: String,
1464 pub usage: crate::llm::TokenUsage,
1466}
1467
1468pub struct AgentSession {
1478 llm_client: Arc<dyn LlmClient>,
1479 tool_executor: Arc<ToolExecutor>,
1480 tool_context: ToolContext,
1481 config: AgentConfig,
1482 workspace: PathBuf,
1483 session_id: String,
1485 history: Arc<RwLock<Vec<Message>>>,
1487 command_queue: Option<Arc<SessionLaneQueue>>,
1489 memory: Option<Arc<crate::memory::AgentMemory>>,
1491 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1493 auto_save: bool,
1495 hook_engine: Arc<crate::hooks::HookEngine>,
1497 ahp_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
1500 init_warning: Option<String>,
1502 command_registry: std::sync::Mutex<CommandRegistry>,
1505 model_name: String,
1507 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1509 agent_registry: Arc<crate::subagent::AgentRegistry>,
1511 cancel_token: Arc<tokio::sync::Mutex<Option<tokio_util::sync::CancellationToken>>>,
1514 current_run_id: Arc<tokio::sync::Mutex<Option<String>>>,
1516 run_store: Arc<crate::run::InMemoryRunStore>,
1518 active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1520 trace_sink: crate::trace::InMemoryTraceSink,
1522 verification_reports: Arc<RwLock<Vec<crate::verification::VerificationReport>>>,
1524}
1525
1526#[derive(Debug, Clone)]
1527struct ActiveToolSnapshot {
1528 tool_name: String,
1529 started_at_ms: u64,
1530}
1531
1532#[derive(Clone)]
1533struct SessionPersistenceContext {
1534 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1535 session_id: String,
1536 workspace: PathBuf,
1537 config: AgentConfig,
1538 tool_executor: Arc<ToolExecutor>,
1539 trace_sink: crate::trace::InMemoryTraceSink,
1540 run_store: Arc<crate::run::InMemoryRunStore>,
1541 history: Arc<RwLock<Vec<Message>>>,
1542 verification_reports: Arc<RwLock<Vec<crate::verification::VerificationReport>>>,
1543 auto_save: bool,
1544}
1545
1546impl SessionPersistenceContext {
1547 fn from_session(session: &AgentSession) -> Self {
1548 Self {
1549 session_store: session.session_store.clone(),
1550 session_id: session.session_id.clone(),
1551 workspace: session.workspace.clone(),
1552 config: session.config.clone(),
1553 tool_executor: Arc::clone(&session.tool_executor),
1554 trace_sink: session.trace_sink.clone(),
1555 run_store: Arc::clone(&session.run_store),
1556 history: Arc::clone(&session.history),
1557 verification_reports: Arc::clone(&session.verification_reports),
1558 auto_save: session.auto_save,
1559 }
1560 }
1561
1562 fn record_result(&self, result: &AgentResult) {
1563 *write_or_recover(&self.history) = result.messages.clone();
1564 if !result.verification_reports.is_empty() {
1565 write_or_recover(&self.verification_reports)
1566 .extend(result.verification_reports.clone());
1567 }
1568 }
1569
1570 async fn save(&self) -> Result<()> {
1571 let store = match &self.session_store {
1572 Some(store) => store,
1573 None => return Ok(()),
1574 };
1575
1576 let history = read_or_recover(&self.history).clone();
1577 let verification_reports = read_or_recover(&self.verification_reports).clone();
1578 let now = chrono::Utc::now().timestamp();
1579
1580 let data = crate::store::SessionData {
1581 id: self.session_id.clone(),
1582 config: crate::store::SessionConfig {
1583 name: String::new(),
1584 workspace: self.workspace.display().to_string(),
1585 system_prompt: Some(self.config.prompt_slots.build()),
1586 max_context_length: 200_000,
1587 auto_compact: false,
1588 auto_compact_threshold: crate::store::DEFAULT_AUTO_COMPACT_THRESHOLD,
1589 storage_type: crate::config::StorageBackend::File,
1590 queue_config: None,
1591 confirmation_policy: None,
1592 permission_policy: None,
1593 parent_id: None,
1594 security_config: None,
1595 hook_engine: None,
1596 planning_mode: self.config.planning_mode,
1597 goal_tracking: self.config.goal_tracking,
1598 },
1599 state: crate::store::SessionState::Active,
1600 messages: history,
1601 context_usage: crate::store::ContextUsage::default(),
1602 total_usage: crate::llm::TokenUsage::default(),
1603 total_cost: 0.0,
1604 model_name: None,
1605 cost_records: Vec::new(),
1606 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
1607 thinking_enabled: false,
1608 thinking_budget: None,
1609 created_at: now,
1610 updated_at: now,
1611 llm_config: None,
1612 tasks: Vec::new(),
1613 parent_id: None,
1614 };
1615
1616 store.save(&data).await?;
1617 store
1618 .save_artifacts(&self.session_id, &self.tool_executor.artifact_store())
1619 .await?;
1620 store
1621 .save_trace_events(&self.session_id, &self.trace_sink.events())
1622 .await?;
1623 store
1624 .save_run_records(&self.session_id, &self.run_store.records().await)
1625 .await?;
1626 store
1627 .save_verification_reports(&self.session_id, &verification_reports)
1628 .await?;
1629 tracing::debug!("Session {} saved", self.session_id);
1630 Ok(())
1631 }
1632
1633 async fn auto_save_if_enabled(&self) {
1634 if self.auto_save {
1635 if let Err(e) = self.save().await {
1636 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1637 }
1638 }
1639 }
1640}
1641
1642impl std::fmt::Debug for AgentSession {
1643 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1644 f.debug_struct("AgentSession")
1645 .field("session_id", &self.session_id)
1646 .field("workspace", &self.workspace.display().to_string())
1647 .field("auto_save", &self.auto_save)
1648 .finish()
1649 }
1650}
1651
1652impl AgentSession {
1653 fn now_ms() -> u64 {
1654 std::time::SystemTime::now()
1655 .duration_since(std::time::UNIX_EPOCH)
1656 .map(|d| d.as_millis() as u64)
1657 .unwrap_or(0)
1658 }
1659
1660 fn compact_json_value(value: &serde_json::Value) -> String {
1661 let raw = match value {
1662 serde_json::Value::Null => String::new(),
1663 serde_json::Value::String(s) => s.clone(),
1664 _ => serde_json::to_string(value).unwrap_or_default(),
1665 };
1666 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1667 if compact.len() > 180 {
1668 format!("{}...", truncate_utf8(&compact, 180))
1669 } else {
1670 compact
1671 }
1672 }
1673
1674 async fn start_run(&self, prompt: &str) -> crate::run::RunHandle {
1675 let snapshot = self.run_store.create_run(&self.session_id, prompt).await;
1676 *self.current_run_id.lock().await = Some(snapshot.id.clone());
1677 crate::run::RunHandle::new(
1678 snapshot.id,
1679 self.session_id.clone(),
1680 Arc::clone(&self.run_store),
1681 Arc::clone(&self.cancel_token),
1682 Arc::clone(&self.current_run_id),
1683 self.ahp_executor.clone(),
1684 )
1685 }
1686
1687 async fn finish_run_if_current(&self, run_id: &str) {
1688 let mut current = self.current_run_id.lock().await;
1689 if current.as_deref() == Some(run_id) {
1690 *current = None;
1691 }
1692 }
1693
1694 async fn record_runtime_event(
1695 run_store: &Arc<crate::run::InMemoryRunStore>,
1696 run_id: &str,
1697 session_id: &str,
1698 hook_executor: &Option<Arc<dyn crate::hooks::HookExecutor>>,
1699 event: &AgentEvent,
1700 ) {
1701 let _ = run_store.record_event(run_id, event.clone()).await;
1702 if let Some(executor) = hook_executor {
1703 executor.record_agent_event(event, run_id, session_id).await;
1704 }
1705 }
1706
1707 async fn apply_runtime_event(
1708 active_tools: &Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1709 event: &AgentEvent,
1710 ) {
1711 match event {
1712 AgentEvent::ToolStart { id, name } => {
1713 active_tools.write().await.insert(
1714 id.clone(),
1715 ActiveToolSnapshot {
1716 tool_name: name.clone(),
1717 started_at_ms: Self::now_ms(),
1718 },
1719 );
1720 }
1721 AgentEvent::ToolEnd { id, .. }
1722 | AgentEvent::PermissionDenied { tool_id: id, .. }
1723 | AgentEvent::ConfirmationRequired { tool_id: id, .. }
1724 | AgentEvent::ConfirmationReceived { tool_id: id, .. }
1725 | AgentEvent::ConfirmationTimeout { tool_id: id, .. } => {
1726 active_tools.write().await.remove(id);
1727 }
1728 _ => {}
1729 }
1730 }
1731
1732 async fn clear_runtime_tracking(&self) {
1733 self.active_tools.write().await.clear();
1734 }
1735
1736 fn build_agent_loop(&self) -> AgentLoop {
1740 let mut config = self.config.clone();
1741 config.hook_engine = Some(if let Some(ref ahp) = self.ahp_executor {
1742 ahp.clone()
1743 } else {
1744 Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>
1745 });
1746 config.tools = self.tool_executor.definitions();
1750 let mut agent_loop = AgentLoop::new(
1751 self.llm_client.clone(),
1752 self.tool_executor.clone(),
1753 self.tool_context.clone(),
1754 config,
1755 );
1756 if let Some(ref queue) = self.command_queue {
1757 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1758 }
1759 agent_loop
1760 }
1761
1762 fn build_command_context(&self) -> CommandContext {
1764 let history = read_or_recover(&self.history);
1765
1766 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1768
1769 let mut mcp_map: std::collections::HashMap<String, usize> =
1771 std::collections::HashMap::new();
1772 for name in &tool_names {
1773 if let Some(rest) = name.strip_prefix("mcp__") {
1774 if let Some((server, _)) = rest.split_once("__") {
1775 *mcp_map.entry(server.to_string()).or_default() += 1;
1776 }
1777 }
1778 }
1779 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1780 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1781
1782 CommandContext {
1783 session_id: self.session_id.clone(),
1784 workspace: self.workspace.display().to_string(),
1785 model: self.model_name.clone(),
1786 history_len: history.len(),
1787 total_tokens: 0,
1788 total_cost: 0.0,
1789 tool_names,
1790 mcp_servers,
1791 }
1792 }
1793
1794 pub fn command_registry(&self) -> std::sync::MutexGuard<'_, CommandRegistry> {
1798 self.command_registry
1799 .lock()
1800 .expect("command_registry lock poisoned")
1801 }
1802
1803 pub fn register_command(&self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1807 self.command_registry
1808 .lock()
1809 .expect("command_registry lock poisoned")
1810 .register(cmd);
1811 }
1812
1813 pub async fn close(&self) {
1815 let _ = self.cancel().await;
1816 }
1817
1818 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1827 if CommandRegistry::is_command(prompt) {
1829 let ctx = self.build_command_context();
1830 let output = self.command_registry().dispatch(prompt, &ctx);
1831 if let Some(output) = output {
1833 if let Some(CommandAction::BtwQuery(ref question)) = output.action {
1835 let result = self.btw(question).await?;
1836 return Ok(AgentResult {
1837 text: result.answer,
1838 messages: history
1839 .map(|h| h.to_vec())
1840 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1841 tool_calls_count: 0,
1842 usage: result.usage,
1843 verification_reports: Vec::new(),
1844 });
1845 }
1846 return Ok(AgentResult {
1847 text: output.text,
1848 messages: history
1849 .map(|h| h.to_vec())
1850 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1851 tool_calls_count: 0,
1852 usage: crate::llm::TokenUsage::default(),
1853 verification_reports: Vec::new(),
1854 });
1855 }
1856 }
1857
1858 if let Some(ref w) = self.init_warning {
1859 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1860 }
1861 let run = self.start_run(prompt).await;
1862 let run_id = run.id().to_string();
1863 let agent_loop = self.build_agent_loop();
1864 let (runtime_tx, mut runtime_rx) = mpsc::channel(2048);
1865 let runtime_state = Arc::clone(&self.active_tools);
1866 let run_store = Arc::clone(&self.run_store);
1867 let collector_run_id = run_id.clone();
1868 let collector_session_id = self.session_id.clone();
1869 let collector_hook_executor = self.ahp_executor.clone();
1870 let runtime_collector = tokio::spawn(async move {
1871 while let Some(event) = runtime_rx.recv().await {
1872 AgentSession::record_runtime_event(
1873 &run_store,
1874 &collector_run_id,
1875 &collector_session_id,
1876 &collector_hook_executor,
1877 &event,
1878 )
1879 .await;
1880 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1881 }
1882 });
1883
1884 let use_internal = history.is_none();
1885 let effective_history = match history {
1886 Some(h) => h.to_vec(),
1887 None => read_or_recover(&self.history).clone(),
1888 };
1889
1890 let cancel_token = tokio_util::sync::CancellationToken::new();
1891 *self.cancel_token.lock().await = Some(cancel_token.clone());
1892 let result = agent_loop
1893 .execute_with_session(
1894 &effective_history,
1895 prompt,
1896 Some(&self.session_id),
1897 Some(runtime_tx),
1898 Some(&cancel_token),
1899 )
1900 .await;
1901 *self.cancel_token.lock().await = None;
1902 let _ = runtime_collector.await;
1903 let result = match result {
1904 Ok(result) => result,
1905 Err(error) => {
1906 let _ = self.run_store.mark_failed(&run_id, error.to_string()).await;
1907 self.clear_runtime_tracking().await;
1908 self.finish_run_if_current(&run_id).await;
1909 return Err(error.into());
1910 }
1911 };
1912
1913 if use_internal {
1916 *write_or_recover(&self.history) = result.messages.clone();
1917 self.record_verification_reports(result.verification_reports.clone());
1918
1919 if self.auto_save {
1921 if let Err(e) = self.save().await {
1922 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1923 }
1924 }
1925 }
1926
1927 self.clear_runtime_tracking().await;
1928 self.finish_run_if_current(&run_id).await;
1929
1930 Ok(result)
1931 }
1932
1933 async fn build_btw_runtime_context(&self) -> String {
1934 let mut sections = Vec::new();
1935
1936 let active_tools = {
1937 let tools = self.active_tools.read().await;
1938 let mut items = tools
1939 .iter()
1940 .map(|(tool_id, tool)| {
1941 let elapsed_ms = Self::now_ms().saturating_sub(tool.started_at_ms);
1942 format!(
1943 "- {} [{}] running_for={}ms",
1944 tool.tool_name, tool_id, elapsed_ms
1945 )
1946 })
1947 .collect::<Vec<_>>();
1948 items.sort();
1949 items
1950 };
1951 if !active_tools.is_empty() {
1952 sections.push(format!("[active tools]\n{}", active_tools.join("\n")));
1953 }
1954
1955 if let Some(cm) = &self.config.confirmation_manager {
1956 let pending = cm.pending_confirmations().await;
1957 if !pending.is_empty() {
1958 let mut lines = pending
1959 .into_iter()
1960 .map(
1961 |PendingConfirmationInfo {
1962 tool_id,
1963 tool_name,
1964 args,
1965 remaining_ms,
1966 }| {
1967 let arg_summary = Self::compact_json_value(&args);
1968 if arg_summary.is_empty() {
1969 format!(
1970 "- {} [{}] remaining={}ms",
1971 tool_name, tool_id, remaining_ms
1972 )
1973 } else {
1974 format!(
1975 "- {} [{}] remaining={}ms {}",
1976 tool_name, tool_id, remaining_ms, arg_summary
1977 )
1978 }
1979 },
1980 )
1981 .collect::<Vec<_>>();
1982 lines.sort();
1983 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1984 }
1985 }
1986
1987 if let Some(queue) = &self.command_queue {
1988 let stats = queue.stats().await;
1989 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1990 let mut lines = vec![format!(
1991 "active={}, pending={}, external_pending={}",
1992 stats.total_active, stats.total_pending, stats.external_pending
1993 )];
1994 let mut lanes = stats
1995 .lanes
1996 .into_values()
1997 .filter(|lane| lane.active > 0 || lane.pending > 0)
1998 .map(|lane| {
1999 format!(
2000 "- {:?}: active={}, pending={}, handler={:?}",
2001 lane.lane, lane.active, lane.pending, lane.handler_mode
2002 )
2003 })
2004 .collect::<Vec<_>>();
2005 lanes.sort();
2006 lines.extend(lanes);
2007 sections.push(format!("[session queue]\n{}", lines.join("\n")));
2008 }
2009
2010 let external_tasks = queue.pending_external_tasks().await;
2011 if !external_tasks.is_empty() {
2012 let mut lines = external_tasks
2013 .into_iter()
2014 .take(6)
2015 .map(|task| {
2016 let payload_summary = Self::compact_json_value(&task.payload);
2017 if payload_summary.is_empty() {
2018 format!(
2019 "- {} {:?} remaining={}ms",
2020 task.command_type,
2021 task.lane,
2022 task.remaining_ms()
2023 )
2024 } else {
2025 format!(
2026 "- {} {:?} remaining={}ms {}",
2027 task.command_type,
2028 task.lane,
2029 task.remaining_ms(),
2030 payload_summary
2031 )
2032 }
2033 })
2034 .collect::<Vec<_>>();
2035 lines.sort();
2036 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
2037 }
2038 }
2039
2040 if let Some(store) = &self.session_store {
2041 if let Ok(Some(session)) = store.load(&self.session_id).await {
2042 let active_tasks = session
2043 .tasks
2044 .into_iter()
2045 .filter(|task| task.status.is_active())
2046 .take(6)
2047 .map(|task| match task.tool {
2048 Some(tool) if !tool.is_empty() => {
2049 format!("- [{}] {} ({})", task.status, task.content, tool)
2050 }
2051 _ => format!("- [{}] {}", task.status, task.content),
2052 })
2053 .collect::<Vec<_>>();
2054 if !active_tasks.is_empty() {
2055 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
2056 }
2057 }
2058 }
2059
2060 sections.join("\n\n")
2061 }
2062
2063 pub async fn btw(&self, question: &str) -> Result<BtwResult> {
2081 self.btw_with_context(question, None).await
2082 }
2083
2084 pub async fn btw_with_context(
2089 &self,
2090 question: &str,
2091 runtime_context: Option<&str>,
2092 ) -> Result<BtwResult> {
2093 let question = question.trim();
2094 if question.is_empty() {
2095 return Err(crate::error::CodeError::Session(
2096 "btw: question cannot be empty".to_string(),
2097 ));
2098 }
2099
2100 let history_snapshot = read_or_recover(&self.history).clone();
2102
2103 let mut messages = history_snapshot;
2105 let mut injected_sections = Vec::new();
2106 let session_runtime = self.build_btw_runtime_context().await;
2107 if !session_runtime.is_empty() {
2108 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
2109 }
2110 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
2111 injected_sections.push(format!("[host runtime context]\n{}", extra));
2112 }
2113 if !injected_sections.is_empty() {
2114 let injected_context = format!(
2115 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
2116 injected_sections.join("\n\n")
2117 );
2118 messages.push(Message::user(&injected_context));
2119 }
2120 messages.push(Message::user(question));
2121
2122 let response = self
2123 .llm_client
2124 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2125 .await
2126 .map_err(|e| {
2127 crate::error::CodeError::Llm(format!("btw: ephemeral LLM call failed: {e}"))
2128 })?;
2129
2130 Ok(BtwResult {
2131 question: question.to_string(),
2132 answer: response.text(),
2133 usage: response.usage,
2134 })
2135 }
2136
2137 pub async fn send_with_attachments(
2142 &self,
2143 prompt: &str,
2144 attachments: &[crate::llm::Attachment],
2145 history: Option<&[Message]>,
2146 ) -> Result<AgentResult> {
2147 let use_internal = history.is_none();
2151 let mut effective_history = match history {
2152 Some(h) => h.to_vec(),
2153 None => read_or_recover(&self.history).clone(),
2154 };
2155 effective_history.push(Message::user_with_attachments(prompt, attachments));
2156
2157 let run = self.start_run(prompt).await;
2158 let run_id = run.id().to_string();
2159 let agent_loop = self.build_agent_loop();
2160 let (runtime_tx, mut runtime_rx) = mpsc::channel(2048);
2161 let runtime_state = Arc::clone(&self.active_tools);
2162 let run_store = Arc::clone(&self.run_store);
2163 let collector_run_id = run_id.clone();
2164 let collector_session_id = self.session_id.clone();
2165 let collector_hook_executor = self.ahp_executor.clone();
2166 let runtime_collector = tokio::spawn(async move {
2167 while let Some(event) = runtime_rx.recv().await {
2168 AgentSession::record_runtime_event(
2169 &run_store,
2170 &collector_run_id,
2171 &collector_session_id,
2172 &collector_hook_executor,
2173 &event,
2174 )
2175 .await;
2176 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2177 }
2178 });
2179
2180 let cancel_token = tokio_util::sync::CancellationToken::new();
2181 *self.cancel_token.lock().await = Some(cancel_token.clone());
2182 let result = agent_loop
2183 .execute_from_messages(
2184 effective_history,
2185 Some(&self.session_id),
2186 Some(runtime_tx),
2187 Some(&cancel_token),
2188 )
2189 .await;
2190 *self.cancel_token.lock().await = None;
2191 let _ = runtime_collector.await;
2192 let result = match result {
2193 Ok(result) => result,
2194 Err(error) => {
2195 let _ = self.run_store.mark_failed(&run_id, error.to_string()).await;
2196 self.clear_runtime_tracking().await;
2197 self.finish_run_if_current(&run_id).await;
2198 return Err(error.into());
2199 }
2200 };
2201
2202 if use_internal {
2203 *write_or_recover(&self.history) = result.messages.clone();
2204 self.record_verification_reports(result.verification_reports.clone());
2205 if self.auto_save {
2206 if let Err(e) = self.save().await {
2207 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2208 }
2209 }
2210 }
2211
2212 self.clear_runtime_tracking().await;
2213 self.finish_run_if_current(&run_id).await;
2214
2215 Ok(result)
2216 }
2217
2218 pub async fn stream_with_attachments(
2223 &self,
2224 prompt: &str,
2225 attachments: &[crate::llm::Attachment],
2226 history: Option<&[Message]>,
2227 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2228 let (tx, rx) = mpsc::channel(256);
2229 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2230 let use_internal = history.is_none();
2231 let mut effective_history = match history {
2232 Some(h) => h.to_vec(),
2233 None => read_or_recover(&self.history).clone(),
2234 };
2235 effective_history.push(Message::user_with_attachments(prompt, attachments));
2236
2237 let run = self.start_run(prompt).await;
2238 let run_id = run.id().to_string();
2239 let agent_loop = self.build_agent_loop();
2240 let persistence = use_internal.then(|| SessionPersistenceContext::from_session(self));
2241 let session_id = self.session_id.clone();
2242 let cancel_token = tokio_util::sync::CancellationToken::new();
2243 *self.cancel_token.lock().await = Some(cancel_token.clone());
2244 let token_clone = cancel_token.clone();
2245 let runtime_state = Arc::clone(&self.active_tools);
2246 let run_store = Arc::clone(&self.run_store);
2247 let forwarder_run_id = run_id.clone();
2248 let forwarder_session_id = self.session_id.clone();
2249 let forwarder_hook_executor = self.ahp_executor.clone();
2250 let should_auto_save = Arc::new(std::sync::atomic::AtomicBool::new(false));
2251 let forwarder = tokio::spawn(async move {
2252 while let Some(event) = runtime_rx.recv().await {
2253 AgentSession::record_runtime_event(
2254 &run_store,
2255 &forwarder_run_id,
2256 &forwarder_session_id,
2257 &forwarder_hook_executor,
2258 &event,
2259 )
2260 .await;
2261 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2262 if tx.send(event).await.is_err() {
2263 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2266 break;
2267 }
2268 }
2269 });
2270 let run_store = Arc::clone(&self.run_store);
2271 let worker_run_id = run_id.clone();
2272 let persistence_for_worker = persistence.clone();
2273 let should_auto_save_for_worker = Arc::clone(&should_auto_save);
2274 let handle = tokio::spawn(async move {
2275 let result = agent_loop
2276 .execute_from_messages(
2277 effective_history,
2278 Some(&session_id),
2279 Some(runtime_tx),
2280 Some(&token_clone),
2281 )
2282 .await;
2283 match result {
2284 Ok(result) => {
2285 if let Some(persistence) = persistence_for_worker {
2286 persistence.record_result(&result);
2287 should_auto_save_for_worker
2288 .store(true, std::sync::atomic::Ordering::Release);
2289 }
2290 }
2291 Err(error) => {
2292 let _ = run_store
2293 .mark_failed(&worker_run_id, error.to_string())
2294 .await;
2295 }
2296 }
2297 });
2298 let active_tools = Arc::clone(&self.active_tools);
2299 let current_run_id = Arc::clone(&self.current_run_id);
2300 let cancel_token_ref = self.cancel_token.clone();
2301 let wrapped_handle = tokio::spawn(async move {
2302 let _ = handle.await;
2303 let _ = forwarder.await;
2304 if should_auto_save.load(std::sync::atomic::Ordering::Acquire) {
2305 if let Some(persistence) = persistence {
2306 persistence.auto_save_if_enabled().await;
2307 }
2308 }
2309 *cancel_token_ref.lock().await = None;
2310 active_tools.write().await.clear();
2311 let mut current = current_run_id.lock().await;
2312 if current.as_deref() == Some(run_id.as_str()) {
2313 *current = None;
2314 }
2315 });
2316
2317 Ok((rx, wrapped_handle))
2318 }
2319
2320 pub async fn stream(
2329 &self,
2330 prompt: &str,
2331 history: Option<&[Message]>,
2332 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2333 if CommandRegistry::is_command(prompt) {
2335 let ctx = self.build_command_context();
2336 let output = self.command_registry().dispatch(prompt, &ctx);
2337 if let Some(output) = output {
2339 let (tx, rx) = mpsc::channel(256);
2340
2341 if let Some(CommandAction::BtwQuery(question)) = output.action {
2343 let llm_client = self.llm_client.clone();
2345 let history_snapshot = read_or_recover(&self.history).clone();
2346 let handle = tokio::spawn(async move {
2347 let mut messages = history_snapshot;
2348 messages.push(Message::user(&question));
2349 match llm_client
2350 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2351 .await
2352 {
2353 Ok(response) => {
2354 let answer = response.text();
2355 let _ = tx
2356 .send(AgentEvent::BtwAnswer {
2357 question: question.clone(),
2358 answer: answer.clone(),
2359 usage: response.usage,
2360 })
2361 .await;
2362 let _ = tx
2363 .send(AgentEvent::End {
2364 text: answer,
2365 usage: crate::llm::TokenUsage::default(),
2366 verification_summary: Box::new(
2367 crate::verification::VerificationSummary::from_reports(
2368 &[],
2369 ),
2370 ),
2371 meta: None,
2372 })
2373 .await;
2374 }
2375 Err(e) => {
2376 let _ = tx
2377 .send(AgentEvent::Error {
2378 message: format!("btw failed: {e}"),
2379 })
2380 .await;
2381 }
2382 }
2383 });
2384 return Ok((rx, handle));
2385 }
2386
2387 let handle = tokio::spawn(async move {
2388 let _ = tx
2389 .send(AgentEvent::TextDelta {
2390 text: output.text.clone(),
2391 })
2392 .await;
2393 let _ = tx
2394 .send(AgentEvent::End {
2395 text: output.text.clone(),
2396 usage: crate::llm::TokenUsage::default(),
2397 verification_summary: Box::new(
2398 crate::verification::VerificationSummary::from_reports(&[]),
2399 ),
2400 meta: None,
2401 })
2402 .await;
2403 });
2404 return Ok((rx, handle));
2405 }
2406 }
2407
2408 let (tx, rx) = mpsc::channel(256);
2409 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2410 let agent_loop = self.build_agent_loop();
2411 let use_internal = history.is_none();
2412 let effective_history = match history {
2413 Some(h) => h.to_vec(),
2414 None => read_or_recover(&self.history).clone(),
2415 };
2416 let run = self.start_run(prompt).await;
2417 let run_id = run.id().to_string();
2418 let prompt = prompt.to_string();
2419 let session_id = self.session_id.clone();
2420 let persistence = use_internal.then(|| SessionPersistenceContext::from_session(self));
2421
2422 let cancel_token = tokio_util::sync::CancellationToken::new();
2423 *self.cancel_token.lock().await = Some(cancel_token.clone());
2424 let token_clone = cancel_token.clone();
2425 let runtime_state = Arc::clone(&self.active_tools);
2426 let run_store = Arc::clone(&self.run_store);
2427 let forwarder_run_id = run_id.clone();
2428 let forwarder_session_id = self.session_id.clone();
2429 let forwarder_hook_executor = self.ahp_executor.clone();
2430 let should_auto_save = Arc::new(std::sync::atomic::AtomicBool::new(false));
2431 let forwarder = tokio::spawn(async move {
2432 while let Some(event) = runtime_rx.recv().await {
2433 AgentSession::record_runtime_event(
2434 &run_store,
2435 &forwarder_run_id,
2436 &forwarder_session_id,
2437 &forwarder_hook_executor,
2438 &event,
2439 )
2440 .await;
2441 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2442 if tx.send(event).await.is_err() {
2443 tracing::warn!("stream forwarder: receiver dropped, stopping event forward");
2446 break;
2447 }
2448 }
2449 });
2450
2451 let run_store = Arc::clone(&self.run_store);
2452 let worker_run_id = run_id.clone();
2453 let persistence_for_worker = persistence.clone();
2454 let should_auto_save_for_worker = Arc::clone(&should_auto_save);
2455 let handle = tokio::spawn(async move {
2456 let result = agent_loop
2457 .execute_with_session(
2458 &effective_history,
2459 &prompt,
2460 Some(&session_id),
2461 Some(runtime_tx),
2462 Some(&token_clone),
2463 )
2464 .await;
2465 match result {
2466 Ok(result) => {
2467 if let Some(persistence) = persistence_for_worker {
2468 persistence.record_result(&result);
2469 should_auto_save_for_worker
2470 .store(true, std::sync::atomic::Ordering::Release);
2471 }
2472 }
2473 Err(error) => {
2474 let _ = run_store
2475 .mark_failed(&worker_run_id, error.to_string())
2476 .await;
2477 }
2478 }
2479 });
2480
2481 let cancel_token_ref = self.cancel_token.clone();
2483 let active_tools = Arc::clone(&self.active_tools);
2484 let current_run_id = Arc::clone(&self.current_run_id);
2485 let wrapped_handle = tokio::spawn(async move {
2486 let _ = handle.await;
2487 let _ = forwarder.await;
2488 if should_auto_save.load(std::sync::atomic::Ordering::Acquire) {
2489 if let Some(persistence) = persistence {
2490 persistence.auto_save_if_enabled().await;
2491 }
2492 }
2493 *cancel_token_ref.lock().await = None;
2494 active_tools.write().await.clear();
2495 let mut current = current_run_id.lock().await;
2496 if current.as_deref() == Some(run_id.as_str()) {
2497 *current = None;
2498 }
2499 });
2500
2501 Ok((rx, wrapped_handle))
2502 }
2503
2504 pub async fn cancel(&self) -> bool {
2511 let token = self.cancel_token.lock().await.clone();
2512 if let Some(token) = token {
2513 token.cancel();
2514 if let Some(run_id) = self.current_run_id.lock().await.clone() {
2515 let _ = self.run_store.mark_cancelled(&run_id).await;
2516 if let Some(executor) = &self.ahp_executor {
2517 executor
2518 .record_run_cancelled(&run_id, &self.session_id, Some("cancelled by host"))
2519 .await;
2520 }
2521 }
2522 tracing::info!(session_id = %self.session_id, "Cancelled ongoing operation");
2523 true
2524 } else {
2525 tracing::debug!(session_id = %self.session_id, "No ongoing operation to cancel");
2526 false
2527 }
2528 }
2529
2530 pub async fn cancel_run(&self, run_id: &str) -> bool {
2535 match self.current_run().await {
2536 Some(run) if run.id() == run_id => run.cancel().await,
2537 _ => false,
2538 }
2539 }
2540
2541 pub async fn runs(&self) -> Vec<crate::run::RunSnapshot> {
2543 self.run_store.list().await
2544 }
2545
2546 pub async fn run_snapshot(&self, run_id: &str) -> Option<crate::run::RunSnapshot> {
2548 self.run_store.snapshot(run_id).await
2549 }
2550
2551 pub async fn run_events(&self, run_id: &str) -> Vec<crate::run::RunEventRecord> {
2553 self.run_store.events(run_id).await
2554 }
2555
2556 pub async fn current_run(&self) -> Option<crate::run::RunHandle> {
2558 let run_id = self.current_run_id.lock().await.clone()?;
2559 let snapshot = self.run_store.snapshot(&run_id).await?;
2560 Some(crate::run::RunHandle::new(
2561 snapshot.id,
2562 snapshot.session_id,
2563 Arc::clone(&self.run_store),
2564 Arc::clone(&self.cancel_token),
2565 Arc::clone(&self.current_run_id),
2566 self.ahp_executor.clone(),
2567 ))
2568 }
2569
2570 pub fn history(&self) -> Vec<Message> {
2572 read_or_recover(&self.history).clone()
2573 }
2574
2575 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
2577 self.memory.as_ref()
2578 }
2579
2580 pub fn id(&self) -> &str {
2582 &self.session_id
2583 }
2584
2585 pub fn workspace(&self) -> &std::path::Path {
2587 &self.workspace
2588 }
2589
2590 pub fn init_warning(&self) -> Option<&str> {
2592 self.init_warning.as_deref()
2593 }
2594
2595 pub fn session_id(&self) -> &str {
2597 &self.session_id
2598 }
2599
2600 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
2606 self.tool_executor.definitions()
2607 }
2608
2609 pub fn tool_names(&self) -> Vec<String> {
2615 self.tool_executor
2616 .definitions()
2617 .into_iter()
2618 .map(|t| t.name)
2619 .collect()
2620 }
2621
2622 pub fn get_artifact(&self, artifact_uri: &str) -> Option<crate::tools::ToolArtifact> {
2624 self.tool_executor.get_artifact(artifact_uri)
2625 }
2626
2627 pub fn trace_events(&self) -> Vec<crate::trace::TraceEvent> {
2629 self.trace_sink.events()
2630 }
2631
2632 pub fn verification_reports(&self) -> Vec<crate::verification::VerificationReport> {
2634 read_or_recover(&self.verification_reports).clone()
2635 }
2636
2637 pub fn verification_summary(&self) -> crate::verification::VerificationSummary {
2639 crate::verification::VerificationSummary::from_reports(&self.verification_reports())
2640 }
2641
2642 pub fn verification_summary_text(&self) -> String {
2644 crate::verification::format_verification_summary(&self.verification_summary())
2645 }
2646
2647 pub fn record_verification_reports(
2649 &self,
2650 reports: impl IntoIterator<Item = crate::verification::VerificationReport>,
2651 ) {
2652 let mut target = write_or_recover(&self.verification_reports);
2653 target.extend(reports);
2654 }
2655
2656 pub fn register_hook(&self, hook: crate::hooks::Hook) {
2662 self.hook_engine.register(hook);
2663 }
2664
2665 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
2667 self.hook_engine.unregister(hook_id)
2668 }
2669
2670 pub fn register_hook_handler(
2672 &self,
2673 hook_id: &str,
2674 handler: Arc<dyn crate::hooks::HookHandler>,
2675 ) {
2676 self.hook_engine.register_handler(hook_id, handler);
2677 }
2678
2679 pub fn unregister_hook_handler(&self, hook_id: &str) {
2681 self.hook_engine.unregister_handler(hook_id);
2682 }
2683
2684 pub fn hook_count(&self) -> usize {
2686 self.hook_engine.hook_count()
2687 }
2688
2689 pub async fn save(&self) -> Result<()> {
2693 SessionPersistenceContext::from_session(self).save().await
2694 }
2695
2696 pub async fn read_file(&self, path: &str) -> Result<String> {
2698 let args = serde_json::json!({ "file_path": path });
2699 let result = self.tool_executor.execute("read", &args).await?;
2700 Ok(result.output)
2701 }
2702
2703 pub async fn bash(&self, command: &str) -> Result<String> {
2709 let args = serde_json::json!({ "command": command });
2710 let result = self
2711 .tool_executor
2712 .execute_with_context("bash", &args, &self.tool_context)
2713 .await?;
2714 Ok(result.output)
2715 }
2716
2717 pub async fn verify_commands(
2719 &self,
2720 subject: &str,
2721 commands: &[crate::verification::VerificationCommand],
2722 ) -> Result<crate::verification::VerificationReport> {
2723 let mut checks = Vec::with_capacity(commands.len());
2724
2725 for command in commands {
2726 let mut args = serde_json::json!({ "command": command.command });
2727 if let Some(timeout_ms) = command.timeout_ms {
2728 args["timeout"] = serde_json::json!(timeout_ms);
2729 }
2730
2731 let check = match self
2732 .tool_executor
2733 .execute_with_context("bash", &args, &self.tool_context)
2734 .await
2735 {
2736 Ok(result) => {
2737 let exit_code = result
2738 .metadata
2739 .as_ref()
2740 .and_then(|metadata| metadata.get("exit_code"))
2741 .and_then(|value| value.as_i64())
2742 .and_then(|value| i32::try_from(value).ok())
2743 .unwrap_or(result.exit_code);
2744 command.check_from_execution(exit_code, result.metadata.as_ref(), None)
2745 }
2746 Err(err) => command.check_from_execution(1, None, Some(&err.to_string())),
2747 };
2748 checks.push(check);
2749 }
2750
2751 let report = crate::verification::VerificationReport::new(subject, checks);
2752 self.record_verification_reports([report.clone()]);
2753 if self.auto_save {
2754 if let Err(e) = self.save().await {
2755 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2756 }
2757 }
2758
2759 Ok(report)
2760 }
2761
2762 pub fn verification_presets(&self) -> Vec<crate::verification::VerificationPreset> {
2764 crate::verification::verification_presets_for_workspace(&self.workspace)
2765 }
2766
2767 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
2769 let args = serde_json::json!({ "pattern": pattern });
2770 let result = self.tool_executor.execute("glob", &args).await?;
2771 let files: Vec<String> = result
2772 .output
2773 .lines()
2774 .filter(|l| !l.is_empty())
2775 .map(|l| l.to_string())
2776 .collect();
2777 Ok(files)
2778 }
2779
2780 pub async fn grep(&self, pattern: &str) -> Result<String> {
2782 let args = serde_json::json!({ "pattern": pattern });
2783 let result = self.tool_executor.execute("grep", &args).await?;
2784 Ok(result.output)
2785 }
2786
2787 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
2789 let result = self.tool_executor.execute(name, &args).await?;
2790 Ok(ToolCallResult {
2791 name: name.to_string(),
2792 output: result.output,
2793 exit_code: result.exit_code,
2794 metadata: result.metadata,
2795 })
2796 }
2797
2798 pub fn has_queue(&self) -> bool {
2804 self.command_queue.is_some()
2805 }
2806
2807 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
2811 if let Some(ref queue) = self.command_queue {
2812 queue.set_lane_handler(lane, config).await;
2813 }
2814 }
2815
2816 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
2820 if let Some(ref queue) = self.command_queue {
2821 queue.complete_external_task(task_id, result).await
2822 } else {
2823 false
2824 }
2825 }
2826
2827 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
2829 if let Some(ref queue) = self.command_queue {
2830 queue.pending_external_tasks().await
2831 } else {
2832 Vec::new()
2833 }
2834 }
2835
2836 pub async fn queue_stats(&self) -> SessionQueueStats {
2838 if let Some(ref queue) = self.command_queue {
2839 queue.stats().await
2840 } else {
2841 SessionQueueStats::default()
2842 }
2843 }
2844
2845 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
2847 if let Some(ref queue) = self.command_queue {
2848 queue.metrics_snapshot().await
2849 } else {
2850 None
2851 }
2852 }
2853
2854 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
2856 if let Some(ref queue) = self.command_queue {
2857 queue.dead_letters().await
2858 } else {
2859 Vec::new()
2860 }
2861 }
2862
2863 pub fn register_agent_dir(&self, dir: &std::path::Path) -> usize {
2876 use crate::subagent::load_agents_from_dir;
2877 let agents = load_agents_from_dir(dir);
2878 let count = agents.len();
2879 for agent in agents {
2880 tracing::info!(
2881 session_id = %self.session_id,
2882 agent = agent.name,
2883 dir = %dir.display(),
2884 "Dynamically registered agent"
2885 );
2886 self.agent_registry.register(agent);
2887 }
2888 count
2889 }
2890
2891 pub async fn add_mcp_server(
2898 &self,
2899 config: crate::mcp::McpServerConfig,
2900 ) -> crate::error::Result<usize> {
2901 let server_name = config.name.clone();
2902 self.mcp_manager.register_server(config).await;
2903 self.mcp_manager.connect(&server_name).await.map_err(|e| {
2904 crate::error::CodeError::Tool {
2905 tool: server_name.clone(),
2906 message: format!("Failed to connect MCP server: {}", e),
2907 }
2908 })?;
2909
2910 let tools = self.mcp_manager.get_server_tools(&server_name).await;
2911 let count = tools.len();
2912
2913 for tool in
2914 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
2915 {
2916 self.tool_executor.register_dynamic_tool(tool);
2917 }
2918
2919 tracing::info!(
2920 session_id = %self.session_id,
2921 server = server_name,
2922 tools = count,
2923 "MCP server added to live session"
2924 );
2925
2926 Ok(count)
2927 }
2928
2929 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
2934 self.tool_executor
2935 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
2936 self.mcp_manager
2937 .disconnect(server_name)
2938 .await
2939 .map_err(|e| crate::error::CodeError::Tool {
2940 tool: server_name.to_string(),
2941 message: format!("Failed to disconnect MCP server: {}", e),
2942 })?;
2943 tracing::info!(
2944 session_id = %self.session_id,
2945 server = server_name,
2946 "MCP server removed from live session"
2947 );
2948 Ok(())
2949 }
2950
2951 pub async fn mcp_status(
2953 &self,
2954 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
2955 self.mcp_manager.get_status().await
2956 }
2957}
2958
2959#[cfg(test)]
2964mod tests {
2965 use super::*;
2966 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
2967 use crate::llm::{ContentBlock, LlmResponse, StreamEvent, TokenUsage};
2968 use crate::store::SessionStore;
2969
2970 #[derive(Clone)]
2971 struct StaticStreamingClient {
2972 text: String,
2973 }
2974
2975 impl StaticStreamingClient {
2976 fn new(text: impl Into<String>) -> Self {
2977 Self { text: text.into() }
2978 }
2979
2980 fn response(&self) -> LlmResponse {
2981 LlmResponse {
2982 message: Message {
2983 role: "assistant".to_string(),
2984 content: vec![ContentBlock::Text {
2985 text: self.text.clone(),
2986 }],
2987 reasoning_content: None,
2988 },
2989 usage: TokenUsage {
2990 prompt_tokens: 1,
2991 completion_tokens: 1,
2992 total_tokens: 2,
2993 cache_read_tokens: None,
2994 cache_write_tokens: None,
2995 },
2996 stop_reason: Some("end_turn".to_string()),
2997 meta: None,
2998 }
2999 }
3000 }
3001
3002 #[derive(Clone)]
3003 struct FailingStreamingClient;
3004
3005 #[derive(Clone)]
3006 struct CancellableStreamingClient {
3007 text: String,
3008 }
3009
3010 #[derive(Debug, Default)]
3011 struct RecordingRuntimeHook {
3012 events: std::sync::Mutex<Vec<(String, String, AgentEvent)>>,
3013 }
3014
3015 #[derive(Debug, Default)]
3016 struct CapturingContextProvider {
3017 session_ids: std::sync::Mutex<Vec<Option<String>>>,
3018 }
3019
3020 #[async_trait::async_trait]
3021 impl crate::context::ContextProvider for CapturingContextProvider {
3022 fn name(&self) -> &str {
3023 "capturing-context"
3024 }
3025
3026 async fn query(
3027 &self,
3028 query: &crate::context::ContextQuery,
3029 ) -> anyhow::Result<crate::context::ContextResult> {
3030 self.session_ids
3031 .lock()
3032 .unwrap()
3033 .push(query.session_id.clone());
3034 Ok(crate::context::ContextResult::new(self.name()))
3035 }
3036 }
3037
3038 #[async_trait::async_trait]
3039 impl crate::hooks::HookExecutor for RecordingRuntimeHook {
3040 async fn fire(&self, _event: &crate::hooks::HookEvent) -> crate::hooks::HookResult {
3041 crate::hooks::HookResult::Continue(None)
3042 }
3043
3044 async fn record_agent_event(&self, event: &AgentEvent, run_id: &str, session_id: &str) {
3045 self.events.lock().unwrap().push((
3046 run_id.to_string(),
3047 session_id.to_string(),
3048 event.clone(),
3049 ));
3050 }
3051 }
3052
3053 impl CancellableStreamingClient {
3054 fn new(text: impl Into<String>) -> Self {
3055 Self { text: text.into() }
3056 }
3057 }
3058
3059 #[async_trait::async_trait]
3060 impl LlmClient for StaticStreamingClient {
3061 async fn complete(
3062 &self,
3063 _messages: &[Message],
3064 _system: Option<&str>,
3065 _tools: &[crate::llm::ToolDefinition],
3066 ) -> anyhow::Result<LlmResponse> {
3067 Ok(self.response())
3068 }
3069
3070 async fn complete_streaming(
3071 &self,
3072 _messages: &[Message],
3073 _system: Option<&str>,
3074 _tools: &[crate::llm::ToolDefinition],
3075 _cancel_token: tokio_util::sync::CancellationToken,
3076 ) -> anyhow::Result<mpsc::Receiver<StreamEvent>> {
3077 let (tx, rx) = mpsc::channel(8);
3078 let text = self.text.clone();
3079 let response = self.response();
3080 tokio::spawn(async move {
3081 let _ = tx.send(StreamEvent::TextDelta(text)).await;
3082 let _ = tx.send(StreamEvent::Done(response)).await;
3083 });
3084 Ok(rx)
3085 }
3086 }
3087
3088 #[async_trait::async_trait]
3089 impl LlmClient for FailingStreamingClient {
3090 async fn complete(
3091 &self,
3092 _messages: &[Message],
3093 _system: Option<&str>,
3094 _tools: &[crate::llm::ToolDefinition],
3095 ) -> anyhow::Result<LlmResponse> {
3096 anyhow::bail!("non-streaming fallback failed")
3097 }
3098
3099 async fn complete_streaming(
3100 &self,
3101 _messages: &[Message],
3102 _system: Option<&str>,
3103 _tools: &[crate::llm::ToolDefinition],
3104 _cancel_token: tokio_util::sync::CancellationToken,
3105 ) -> anyhow::Result<mpsc::Receiver<StreamEvent>> {
3106 anyhow::bail!("streaming setup failed")
3107 }
3108 }
3109
3110 #[async_trait::async_trait]
3111 impl LlmClient for CancellableStreamingClient {
3112 async fn complete(
3113 &self,
3114 _messages: &[Message],
3115 _system: Option<&str>,
3116 _tools: &[crate::llm::ToolDefinition],
3117 ) -> anyhow::Result<LlmResponse> {
3118 anyhow::bail!("cancellable client does not support fallback completion")
3119 }
3120
3121 async fn complete_streaming(
3122 &self,
3123 _messages: &[Message],
3124 _system: Option<&str>,
3125 _tools: &[crate::llm::ToolDefinition],
3126 cancel_token: tokio_util::sync::CancellationToken,
3127 ) -> anyhow::Result<mpsc::Receiver<StreamEvent>> {
3128 let (tx, rx) = mpsc::channel(8);
3129 let text = self.text.clone();
3130 tokio::spawn(async move {
3131 let _ = tx.send(StreamEvent::TextDelta(text)).await;
3132 cancel_token.cancelled().await;
3133 });
3134 Ok(rx)
3135 }
3136 }
3137
3138 fn test_config() -> CodeConfig {
3139 CodeConfig {
3140 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
3141 providers: vec![
3142 ProviderConfig {
3143 name: "anthropic".to_string(),
3144 api_key: Some("test-key".to_string()),
3145 base_url: None,
3146 headers: std::collections::HashMap::new(),
3147 session_id_header: None,
3148 models: vec![ModelConfig {
3149 id: "claude-sonnet-4-20250514".to_string(),
3150 name: "Claude Sonnet 4".to_string(),
3151 family: "claude-sonnet".to_string(),
3152 api_key: None,
3153 base_url: None,
3154 headers: std::collections::HashMap::new(),
3155 session_id_header: None,
3156 attachment: false,
3157 reasoning: false,
3158 tool_call: true,
3159 temperature: true,
3160 release_date: None,
3161 modalities: ModelModalities::default(),
3162 cost: Default::default(),
3163 limit: Default::default(),
3164 }],
3165 },
3166 ProviderConfig {
3167 name: "openai".to_string(),
3168 api_key: Some("test-openai-key".to_string()),
3169 base_url: None,
3170 headers: std::collections::HashMap::new(),
3171 session_id_header: None,
3172 models: vec![ModelConfig {
3173 id: "gpt-4o".to_string(),
3174 name: "GPT-4o".to_string(),
3175 family: "gpt-4".to_string(),
3176 api_key: None,
3177 base_url: None,
3178 headers: std::collections::HashMap::new(),
3179 session_id_header: None,
3180 attachment: false,
3181 reasoning: false,
3182 tool_call: true,
3183 temperature: true,
3184 release_date: None,
3185 modalities: ModelModalities::default(),
3186 cost: Default::default(),
3187 limit: Default::default(),
3188 }],
3189 },
3190 ],
3191 ..Default::default()
3192 }
3193 }
3194
3195 fn build_effective_registry_for_test(
3196 agent_registry: Option<Arc<crate::skills::SkillRegistry>>,
3197 opts: &SessionOptions,
3198 ) -> Arc<crate::skills::SkillRegistry> {
3199 let base_registry = agent_registry
3200 .as_deref()
3201 .map(|r| r.fork())
3202 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
3203 if let Some(ref r) = opts.skill_registry {
3204 for skill in r.all() {
3205 base_registry.register_unchecked(skill);
3206 }
3207 }
3208 for dir in &opts.skill_dirs {
3209 if let Err(e) = base_registry.load_from_dir(dir) {
3210 tracing::warn!(
3211 dir = %dir.display(),
3212 error = %e,
3213 "Failed to load session skill dir — skipping"
3214 );
3215 }
3216 }
3217 Arc::new(base_registry)
3218 }
3219
3220 #[tokio::test]
3221 async fn test_from_config() {
3222 let agent = Agent::from_config(test_config()).await;
3223 assert!(agent.is_ok());
3224 }
3225
3226 #[tokio::test]
3227 async fn test_session_default() {
3228 let agent = Agent::from_config(test_config()).await.unwrap();
3229 let session = agent.session("/tmp/test-workspace", None);
3230 assert!(session.is_ok());
3231 let debug = format!("{:?}", session.unwrap());
3232 assert!(debug.contains("AgentSession"));
3233 }
3234
3235 #[tokio::test]
3236 async fn test_session_routes_agents_md_through_context_provider() {
3237 let temp_dir = tempfile::tempdir().unwrap();
3238 std::fs::write(
3239 temp_dir.path().join("AGENTS.md"),
3240 "Always run focused tests before reporting completion.",
3241 )
3242 .unwrap();
3243
3244 let agent = Agent::from_config(test_config()).await.unwrap();
3245 let session = agent
3246 .session(temp_dir.path().display().to_string(), None)
3247 .unwrap();
3248
3249 let agents_provider = session
3250 .config
3251 .context_providers
3252 .iter()
3253 .find(|provider| provider.name() == "agents_md")
3254 .expect("AGENTS.md provider should be registered");
3255 assert!(!session
3256 .config
3257 .prompt_slots
3258 .extra
3259 .as_deref()
3260 .unwrap_or_default()
3261 .contains("Project Instructions (AGENTS.md)"));
3262
3263 let result = agents_provider
3264 .query(&crate::context::ContextQuery::new("complete the task"))
3265 .await
3266 .unwrap();
3267
3268 assert_eq!(result.items.len(), 1);
3269 assert_eq!(result.items[0].id, "agents_md");
3270 assert!(result.items[0]
3271 .content
3272 .contains("Always run focused tests before reporting completion."));
3273 assert_eq!(result.items[0].relevance, 0.95);
3274 }
3275
3276 #[tokio::test]
3277 async fn test_session_initializes_without_legacy_agentic_tools() {
3278 let agent = Agent::from_config(test_config()).await.unwrap();
3279 let _session = agent.session("/tmp/test-workspace", None).unwrap();
3280 }
3281
3282 #[tokio::test]
3283 async fn test_session_with_model_override() {
3284 let agent = Agent::from_config(test_config()).await.unwrap();
3285 let opts = SessionOptions::new().with_model("openai/gpt-4o");
3286 let session = agent.session("/tmp/test-workspace", Some(opts));
3287 assert!(session.is_ok());
3288 }
3289
3290 #[tokio::test]
3291 async fn test_session_with_invalid_model_format() {
3292 let agent = Agent::from_config(test_config()).await.unwrap();
3293 let opts = SessionOptions::new().with_model("gpt-4o");
3294 let session = agent.session("/tmp/test-workspace", Some(opts));
3295 assert!(session.is_err());
3296 }
3297
3298 #[tokio::test]
3299 async fn test_session_with_model_not_found() {
3300 let agent = Agent::from_config(test_config()).await.unwrap();
3301 let opts = SessionOptions::new().with_model("openai/nonexistent");
3302 let session = agent.session("/tmp/test-workspace", Some(opts));
3303 assert!(session.is_err());
3304 }
3305
3306 #[tokio::test]
3307 async fn test_session_skill_dirs_preserve_agent_registry_validator() {
3308 use crate::skills::validator::DefaultSkillValidator;
3309 use crate::skills::SkillRegistry;
3310
3311 let registry = Arc::new(SkillRegistry::new());
3312 registry.set_validator(Arc::new(DefaultSkillValidator::default()));
3313
3314 let temp_dir = tempfile::tempdir().unwrap();
3315 let invalid_skill = temp_dir.path().join("invalid.md");
3316 std::fs::write(
3317 &invalid_skill,
3318 r#"---
3319name: BadName
3320description: "invalid skill name"
3321kind: instruction
3322---
3323# Invalid Skill
3324"#,
3325 )
3326 .unwrap();
3327
3328 let opts = SessionOptions::new().with_skill_dirs([temp_dir.path()]);
3329 let effective_registry = build_effective_registry_for_test(Some(registry), &opts);
3330 assert!(effective_registry.get("BadName").is_none());
3331 }
3332
3333 #[tokio::test]
3334 async fn test_session_skill_registry_overrides_agent_registry_without_polluting_parent() {
3335 use crate::skills::{Skill, SkillKind, SkillRegistry};
3336
3337 let registry = Arc::new(SkillRegistry::new());
3338 registry.register_unchecked(Arc::new(Skill {
3339 name: "shared-skill".to_string(),
3340 description: "agent level".to_string(),
3341 allowed_tools: None,
3342 disable_model_invocation: false,
3343 kind: SkillKind::Instruction,
3344 content: "agent content".to_string(),
3345 tags: vec![],
3346 version: None,
3347 }));
3348
3349 let session_registry = Arc::new(SkillRegistry::new());
3350 session_registry.register_unchecked(Arc::new(Skill {
3351 name: "shared-skill".to_string(),
3352 description: "session level".to_string(),
3353 allowed_tools: None,
3354 disable_model_invocation: false,
3355 kind: SkillKind::Instruction,
3356 content: "session content".to_string(),
3357 tags: vec![],
3358 version: None,
3359 }));
3360
3361 let opts = SessionOptions::new().with_skill_registry(session_registry);
3362 let effective_registry = build_effective_registry_for_test(Some(registry.clone()), &opts);
3363
3364 assert_eq!(
3365 effective_registry.get("shared-skill").unwrap().content,
3366 "session content"
3367 );
3368 assert_eq!(
3369 registry.get("shared-skill").unwrap().content,
3370 "agent content"
3371 );
3372 }
3373
3374 #[tokio::test]
3375 async fn test_session_skill_dirs_override_session_registry_and_skip_invalid_entries() {
3376 use crate::skills::{Skill, SkillKind, SkillRegistry};
3377
3378 let session_registry = Arc::new(SkillRegistry::new());
3379 session_registry.register_unchecked(Arc::new(Skill {
3380 name: "shared-skill".to_string(),
3381 description: "session registry".to_string(),
3382 allowed_tools: None,
3383 disable_model_invocation: false,
3384 kind: SkillKind::Instruction,
3385 content: "registry content".to_string(),
3386 tags: vec![],
3387 version: None,
3388 }));
3389
3390 let temp_dir = tempfile::tempdir().unwrap();
3391 std::fs::write(
3392 temp_dir.path().join("shared.md"),
3393 r#"---
3394name: shared-skill
3395description: "skill dir override"
3396kind: instruction
3397---
3398# Shared Skill
3399dir content
3400"#,
3401 )
3402 .unwrap();
3403 std::fs::write(temp_dir.path().join("README.md"), "# not a skill").unwrap();
3404
3405 let opts = SessionOptions::new()
3406 .with_skill_registry(session_registry)
3407 .with_skill_dirs([temp_dir.path()]);
3408 let effective_registry = build_effective_registry_for_test(None, &opts);
3409
3410 assert_eq!(
3411 effective_registry.get("shared-skill").unwrap().description,
3412 "skill dir override"
3413 );
3414 assert!(effective_registry.get("README").is_none());
3415 }
3416
3417 #[tokio::test]
3418 async fn test_session_specific_skills_do_not_leak_across_sessions() {
3419 use crate::skills::{Skill, SkillKind, SkillRegistry};
3420
3421 let mut agent = Agent::from_config(test_config()).await.unwrap();
3422 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3423 agent.config.skill_registry = Some(agent_registry);
3424
3425 let session_registry = Arc::new(SkillRegistry::new());
3426 session_registry.register_unchecked(Arc::new(Skill {
3427 name: "session-only".to_string(),
3428 description: "only for first session".to_string(),
3429 allowed_tools: None,
3430 disable_model_invocation: false,
3431 kind: SkillKind::Instruction,
3432 content: "session one".to_string(),
3433 tags: vec![],
3434 version: None,
3435 }));
3436
3437 let session_one = agent
3438 .session(
3439 "/tmp/test-workspace",
3440 Some(SessionOptions::new().with_skill_registry(session_registry)),
3441 )
3442 .unwrap();
3443 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3444
3445 assert!(session_one
3446 .config
3447 .skill_registry
3448 .as_ref()
3449 .unwrap()
3450 .get("session-only")
3451 .is_some());
3452 assert!(session_two
3453 .config
3454 .skill_registry
3455 .as_ref()
3456 .unwrap()
3457 .get("session-only")
3458 .is_none());
3459 }
3460
3461 #[tokio::test]
3462 async fn test_session_for_agent_applies_definition_and_keeps_skill_overrides_isolated() {
3463 use crate::skills::{Skill, SkillKind, SkillRegistry};
3464 use crate::subagent::AgentDefinition;
3465
3466 let mut agent = Agent::from_config(test_config()).await.unwrap();
3467 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3468
3469 let definition = AgentDefinition::new("reviewer", "Review code")
3470 .with_prompt("Agent definition prompt")
3471 .with_max_steps(7);
3472
3473 let session_registry = Arc::new(SkillRegistry::new());
3474 session_registry.register_unchecked(Arc::new(Skill {
3475 name: "agent-session-skill".to_string(),
3476 description: "agent session only".to_string(),
3477 allowed_tools: None,
3478 disable_model_invocation: false,
3479 kind: SkillKind::Instruction,
3480 content: "agent session content".to_string(),
3481 tags: vec![],
3482 version: None,
3483 }));
3484
3485 let session_one = agent
3486 .session_for_agent(
3487 "/tmp/test-workspace",
3488 &definition,
3489 Some(SessionOptions::new().with_skill_registry(session_registry)),
3490 )
3491 .unwrap();
3492 let session_two = agent
3493 .session_for_agent("/tmp/test-workspace", &definition, None)
3494 .unwrap();
3495
3496 assert_eq!(session_one.config.max_tool_rounds, 7);
3497 let extra = session_one.config.prompt_slots.extra.as_deref().unwrap();
3498 assert!(extra.contains("Agent definition prompt"));
3499 assert!(!extra.contains("agent-session-skill"));
3500 assert!(session_one
3501 .config
3502 .context_providers
3503 .iter()
3504 .any(|provider| provider.name() == "skills_catalog"));
3505 assert!(session_one
3506 .config
3507 .skill_registry
3508 .as_ref()
3509 .unwrap()
3510 .get("agent-session-skill")
3511 .is_some());
3512 assert!(session_two
3513 .config
3514 .skill_registry
3515 .as_ref()
3516 .unwrap()
3517 .get("agent-session-skill")
3518 .is_none());
3519 }
3520
3521 #[tokio::test]
3522 async fn test_session_for_agent_preserves_existing_prompt_slots_when_injecting_definition_prompt(
3523 ) {
3524 use crate::prompts::SystemPromptSlots;
3525 use crate::subagent::AgentDefinition;
3526
3527 let agent = Agent::from_config(test_config()).await.unwrap();
3528 let definition = AgentDefinition::new("planner", "Plan work")
3529 .with_prompt("Definition extra prompt")
3530 .with_max_steps(3);
3531
3532 let opts = SessionOptions::new().with_prompt_slots(SystemPromptSlots {
3533 style: None,
3534 role: Some("Custom role".to_string()),
3535 guidelines: None,
3536 response_style: None,
3537 extra: None,
3538 });
3539
3540 let session = agent
3541 .session_for_agent("/tmp/test-workspace", &definition, Some(opts))
3542 .unwrap();
3543
3544 assert_eq!(
3545 session.config.prompt_slots.role.as_deref(),
3546 Some("Custom role")
3547 );
3548 assert!(session
3549 .config
3550 .prompt_slots
3551 .extra
3552 .as_deref()
3553 .unwrap()
3554 .contains("Definition extra prompt"));
3555 assert_eq!(session.config.max_tool_rounds, 3);
3556 }
3557
3558 #[tokio::test]
3559 async fn test_new_with_acl_string() {
3560 let acl = r#"
3561 default_model = "anthropic/claude-sonnet-4-20250514"
3562 providers "anthropic" {
3563 apiKey = "test-key"
3564 models "claude-sonnet-4-20250514" {
3565 name = "Claude Sonnet 4"
3566 }
3567 }
3568 "#;
3569 let agent = Agent::new(acl).await;
3570 assert!(agent.is_ok());
3571 }
3572
3573 #[tokio::test]
3574 async fn test_create_alias_acl() {
3575 let acl = r#"
3576 default_model = "anthropic/claude-sonnet-4-20250514"
3577 providers "anthropic" {
3578 apiKey = "test-key"
3579 models "claude-sonnet-4-20250514" {
3580 name = "Claude Sonnet 4"
3581 }
3582 }
3583 "#;
3584 let agent = Agent::create(acl).await;
3585 assert!(agent.is_ok());
3586 }
3587
3588 #[tokio::test]
3589 async fn test_create_and_new_produce_same_result() {
3590 let acl = r#"
3591 default_model = "anthropic/claude-sonnet-4-20250514"
3592 providers "anthropic" {
3593 apiKey = "test-key"
3594 models "claude-sonnet-4-20250514" {
3595 name = "Claude Sonnet 4"
3596 }
3597 }
3598 "#;
3599 let agent_new = Agent::new(acl).await;
3600 let agent_create = Agent::create(acl).await;
3601 assert!(agent_new.is_ok());
3602 assert!(agent_create.is_ok());
3603
3604 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
3606 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
3607 assert!(session_new.is_ok());
3608 assert!(session_create.is_ok());
3609 }
3610
3611 #[tokio::test]
3612 async fn test_new_with_existing_acl_file_uses_file_loading() {
3613 let temp_dir = tempfile::tempdir().unwrap();
3614 let config_path = temp_dir.path().join("agent.acl");
3615 std::fs::write(&config_path, "providers {").unwrap();
3616
3617 let err = Agent::new(config_path.display().to_string())
3618 .await
3619 .unwrap_err();
3620 let msg = err.to_string();
3621
3622 assert!(msg.contains("Failed to load config"));
3623 assert!(msg.contains("agent.acl"));
3624 assert!(!msg.contains("Failed to parse config as ACL string"));
3625 }
3626
3627 #[tokio::test]
3628 async fn test_new_with_missing_acl_file_reports_not_found() {
3629 let temp_dir = tempfile::tempdir().unwrap();
3630 let missing_path = temp_dir.path().join("agent.acl");
3631
3632 let err = Agent::new(missing_path.display().to_string())
3633 .await
3634 .unwrap_err();
3635 let msg = err.to_string();
3636
3637 assert!(msg.contains("Config file not found"));
3638 assert!(msg.contains("agent.acl"));
3639 assert!(!msg.contains("Failed to parse config as ACL string"));
3640 }
3641
3642 #[tokio::test]
3643 async fn test_new_rejects_hcl_files() {
3644 let temp_dir = tempfile::tempdir().unwrap();
3645 let config_path = temp_dir.path().join("agent.hcl");
3646 std::fs::write(&config_path, "default_model = \"openai/test\"").unwrap();
3647
3648 let err = Agent::new(config_path.display().to_string())
3649 .await
3650 .unwrap_err();
3651 let msg = err.to_string();
3652
3653 assert!(msg.contains("HCL config files are not supported in 2.0"));
3654 assert!(msg.contains(".acl"));
3655 }
3656
3657 #[test]
3658 fn test_from_config_requires_default_model() {
3659 let rt = tokio::runtime::Runtime::new().unwrap();
3660 let config = CodeConfig {
3661 providers: vec![ProviderConfig {
3662 name: "anthropic".to_string(),
3663 api_key: Some("test-key".to_string()),
3664 base_url: None,
3665 headers: std::collections::HashMap::new(),
3666 session_id_header: None,
3667 models: vec![],
3668 }],
3669 ..Default::default()
3670 };
3671 let result = rt.block_on(Agent::from_config(config));
3672 assert!(result.is_err());
3673 }
3674
3675 #[tokio::test]
3676 async fn test_history_empty_on_new_session() {
3677 let agent = Agent::from_config(test_config()).await.unwrap();
3678 let session = agent.session("/tmp/test-workspace", None).unwrap();
3679 assert!(session.history().is_empty());
3680 }
3681
3682 #[tokio::test]
3683 async fn test_stream_updates_history_and_auto_saves() {
3684 let store = Arc::new(crate::store::MemorySessionStore::new());
3685 let agent = Agent::from_config(test_config()).await.unwrap();
3686 let opts = SessionOptions::new()
3687 .with_session_store(store.clone())
3688 .with_session_id("stream-history-test")
3689 .with_auto_save(true);
3690 let session = agent
3691 .build_session(
3692 "/tmp/test-stream-history".into(),
3693 Arc::new(StaticStreamingClient::new("streamed answer")),
3694 &opts,
3695 )
3696 .unwrap();
3697
3698 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
3699 let mut saw_end = false;
3700 while let Some(event) = rx.recv().await {
3701 if matches!(event, AgentEvent::End { .. }) {
3702 saw_end = true;
3703 break;
3704 }
3705 }
3706 handle.await.unwrap();
3707
3708 assert!(saw_end);
3709 let history = session.history();
3710 assert_eq!(history.len(), 2);
3711 assert_eq!(history[0].text(), "hello");
3712 assert_eq!(history[1].text(), "streamed answer");
3713
3714 let saved = store
3715 .load("stream-history-test")
3716 .await
3717 .unwrap()
3718 .expect("saved session");
3719 assert_eq!(saved.messages.len(), 2);
3720 assert_eq!(saved.messages[1].text(), "streamed answer");
3721
3722 let run_records = store
3723 .load_run_records("stream-history-test")
3724 .await
3725 .unwrap()
3726 .expect("saved run records");
3727 assert_eq!(run_records.len(), 1);
3728 assert_eq!(
3729 run_records[0].snapshot.status,
3730 crate::run::RunStatus::Completed
3731 );
3732 assert!(run_records[0]
3733 .events
3734 .iter()
3735 .any(|record| matches!(record.event, AgentEvent::End { .. })));
3736 }
3737
3738 #[tokio::test]
3739 async fn test_stream_with_custom_history_does_not_update_session_history() {
3740 let agent = Agent::from_config(test_config()).await.unwrap();
3741 let session = agent
3742 .build_session(
3743 "/tmp/test-stream-custom-history".into(),
3744 Arc::new(StaticStreamingClient::new("custom history answer")),
3745 &SessionOptions::new(),
3746 )
3747 .unwrap();
3748 let custom_history = vec![Message::user("custom prompt")];
3749
3750 let (mut rx, handle) = session
3751 .stream("ignored", Some(&custom_history))
3752 .await
3753 .unwrap();
3754 while let Some(event) = rx.recv().await {
3755 if matches!(event, AgentEvent::End { .. }) {
3756 break;
3757 }
3758 }
3759 handle.await.unwrap();
3760
3761 assert!(session.history().is_empty());
3762 }
3763
3764 #[tokio::test]
3765 async fn test_stream_error_does_not_update_history_or_auto_save() {
3766 let store = Arc::new(crate::store::MemorySessionStore::new());
3767 let agent = Agent::from_config(test_config()).await.unwrap();
3768 let opts = SessionOptions::new()
3769 .with_session_store(store.clone())
3770 .with_session_id("stream-error-test")
3771 .with_auto_save(true);
3772 let session = agent
3773 .build_session(
3774 "/tmp/test-stream-error".into(),
3775 Arc::new(FailingStreamingClient),
3776 &opts,
3777 )
3778 .unwrap();
3779
3780 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
3781 let mut saw_error = false;
3782 while let Some(event) = rx.recv().await {
3783 if matches!(event, AgentEvent::Error { .. }) {
3784 saw_error = true;
3785 break;
3786 }
3787 }
3788 handle.await.unwrap();
3789
3790 assert!(saw_error);
3791 assert!(session.history().is_empty());
3792 assert!(store.load("stream-error-test").await.unwrap().is_none());
3793 }
3794
3795 #[tokio::test]
3796 async fn test_stream_cancel_does_not_update_history_or_auto_save() {
3797 let store = Arc::new(crate::store::MemorySessionStore::new());
3798 let agent = Agent::from_config(test_config()).await.unwrap();
3799 let opts = SessionOptions::new()
3800 .with_session_store(store.clone())
3801 .with_session_id("stream-cancel-test")
3802 .with_auto_save(true);
3803 let session = agent
3804 .build_session(
3805 "/tmp/test-stream-cancel".into(),
3806 Arc::new(CancellableStreamingClient::new("partial answer")),
3807 &opts,
3808 )
3809 .unwrap();
3810
3811 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
3812 let mut saw_delta = false;
3813 for _ in 0..16 {
3814 let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
3815 .await
3816 .expect("stream event before timeout")
3817 .expect("stream should stay open until cancelled");
3818 if matches!(event, AgentEvent::TextDelta { ref text } if text == "partial answer") {
3819 saw_delta = true;
3820 break;
3821 }
3822 }
3823 assert!(saw_delta);
3824 assert!(session.cancel().await);
3825
3826 while rx.recv().await.is_some() {}
3827 handle.await.unwrap();
3828
3829 assert!(session.history().is_empty());
3830 assert!(store.load("stream-cancel-test").await.unwrap().is_none());
3831 assert!(!session.cancel().await);
3832 }
3833
3834 #[tokio::test]
3835 async fn test_stream_with_attachments_cancel_does_not_update_history_or_auto_save() {
3836 let store = Arc::new(crate::store::MemorySessionStore::new());
3837 let agent = Agent::from_config(test_config()).await.unwrap();
3838 let opts = SessionOptions::new()
3839 .with_session_store(store.clone())
3840 .with_session_id("stream-attachments-cancel-test")
3841 .with_auto_save(true);
3842 let session = agent
3843 .build_session(
3844 "/tmp/test-stream-attachments-cancel".into(),
3845 Arc::new(CancellableStreamingClient::new("partial attachment answer")),
3846 &opts,
3847 )
3848 .unwrap();
3849 let attachments = vec![crate::llm::Attachment::png(vec![1, 2, 3])];
3850
3851 let (mut rx, handle) = session
3852 .stream_with_attachments("hello", &attachments, None)
3853 .await
3854 .unwrap();
3855 let mut saw_delta = false;
3856 for _ in 0..16 {
3857 let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
3858 .await
3859 .expect("stream event before timeout")
3860 .expect("stream should stay open until cancelled");
3861 if matches!(event, AgentEvent::TextDelta { .. }) {
3862 saw_delta = true;
3863 break;
3864 }
3865 }
3866 assert!(saw_delta);
3867 assert!(session.cancel().await);
3868
3869 while rx.recv().await.is_some() {}
3870 handle.await.unwrap();
3871
3872 assert!(session.history().is_empty());
3873 assert!(store
3874 .load("stream-attachments-cancel-test")
3875 .await
3876 .unwrap()
3877 .is_none());
3878 assert_eq!(
3879 session.runs().await[0].status,
3880 crate::run::RunStatus::Cancelled
3881 );
3882 assert!(!session.cancel().await);
3883 }
3884
3885 #[tokio::test]
3886 async fn test_run_handle_cancels_send_with_attachments() {
3887 let agent = Agent::from_config(test_config()).await.unwrap();
3888 let session = Arc::new(
3889 agent
3890 .build_session(
3891 "/tmp/test-send-attachments-run-handle-cancel".into(),
3892 Arc::new(CancellableStreamingClient::new("partial answer")),
3893 &SessionOptions::new(),
3894 )
3895 .unwrap(),
3896 );
3897 let worker_session = Arc::clone(&session);
3898 let attachments = vec![crate::llm::Attachment::png(vec![1, 2, 3])];
3899
3900 let worker = tokio::spawn(async move {
3901 worker_session
3902 .send_with_attachments("hello", &attachments, None)
3903 .await
3904 });
3905
3906 let mut run = None;
3907 for _ in 0..20 {
3908 if let Some(current) = session.current_run().await {
3909 run = Some(current);
3910 break;
3911 }
3912 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3913 }
3914 let run = run.expect("current run should be visible");
3915 assert!(run.cancel().await);
3916
3917 let result = tokio::time::timeout(std::time::Duration::from_secs(1), worker)
3918 .await
3919 .expect("send_with_attachments should stop after cancellation")
3920 .expect("worker should not panic");
3921 assert!(result.is_err());
3922 assert_eq!(run.status().await, Some(crate::run::RunStatus::Cancelled));
3923 assert!(session.history().is_empty());
3924 assert!(!session.cancel().await);
3925 }
3926
3927 #[tokio::test]
3928 async fn test_cancel_run_only_cancels_matching_current_run() {
3929 let agent = Agent::from_config(test_config()).await.unwrap();
3930 let session = Arc::new(
3931 agent
3932 .build_session(
3933 "/tmp/test-cancel-run-by-id".into(),
3934 Arc::new(CancellableStreamingClient::new("partial answer")),
3935 &SessionOptions::new(),
3936 )
3937 .unwrap(),
3938 );
3939 let worker_session = Arc::clone(&session);
3940 let worker = tokio::spawn(async move { worker_session.send("hello", None).await });
3941
3942 let mut run_id = None;
3943 for _ in 0..20 {
3944 if let Some(current) = session.current_run().await {
3945 run_id = Some(current.id().to_string());
3946 break;
3947 }
3948 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
3949 }
3950 let run_id = run_id.expect("current run should be visible");
3951
3952 assert!(!session.cancel_run("stale-run").await);
3953 assert!(session.cancel_run(&run_id).await);
3954
3955 let result = tokio::time::timeout(std::time::Duration::from_secs(1), worker)
3956 .await
3957 .expect("send should stop after cancellation")
3958 .expect("worker should not panic");
3959 assert!(result.is_err());
3960 assert_eq!(
3961 session.run_snapshot(&run_id).await.unwrap().status,
3962 crate::run::RunStatus::Cancelled
3963 );
3964 assert!(!session.cancel_run(&run_id).await);
3965 }
3966
3967 #[tokio::test]
3968 async fn test_send_with_attachments_passes_session_id_to_context_providers() {
3969 let provider = Arc::new(CapturingContextProvider::default());
3970 let agent = Agent::from_config(test_config()).await.unwrap();
3971 let opts = SessionOptions::new()
3972 .with_session_id("attachments-context-session")
3973 .with_context_provider(provider.clone());
3974 let session = agent
3975 .build_session(
3976 "/tmp/test-send-attachments-context".into(),
3977 Arc::new(StaticStreamingClient::new("attachment answer")),
3978 &opts,
3979 )
3980 .unwrap();
3981 let attachments = vec![crate::llm::Attachment::png(vec![1, 2, 3])];
3982
3983 session
3984 .send_with_attachments("hello", &attachments, None)
3985 .await
3986 .unwrap();
3987
3988 let session_ids = provider.session_ids.lock().unwrap();
3989 assert!(!session_ids.is_empty());
3990 assert!(session_ids
3991 .iter()
3992 .all(|id| id.as_deref() == Some("attachments-context-session")));
3993 }
3994
3995 #[tokio::test]
3996 async fn test_send_records_run_snapshot_and_events() {
3997 let agent = Agent::from_config(test_config()).await.unwrap();
3998 let session = agent
3999 .build_session(
4000 "/tmp/test-send-run-store".into(),
4001 Arc::new(StaticStreamingClient::new("run answer")),
4002 &SessionOptions::new(),
4003 )
4004 .unwrap();
4005
4006 let result = session.send("hello", None).await.unwrap();
4007 assert_eq!(result.text, "run answer");
4008
4009 let runs = session.runs().await;
4010 assert_eq!(runs.len(), 1);
4011 assert_eq!(runs[0].status, crate::run::RunStatus::Completed);
4012 assert_eq!(runs[0].result_text.as_deref(), Some("run answer"));
4013
4014 let events = session.run_events(&runs[0].id).await;
4015 assert!(events
4016 .iter()
4017 .any(|record| matches!(record.event, AgentEvent::Start { .. })));
4018 assert!(events
4019 .iter()
4020 .any(|record| matches!(record.event, AgentEvent::End { .. })));
4021 }
4022
4023 #[tokio::test]
4024 async fn test_send_publishes_runtime_events_to_hook_executor() {
4025 let hook = Arc::new(RecordingRuntimeHook::default());
4026 let agent = Agent::from_config(test_config()).await.unwrap();
4027 let opts = SessionOptions::new().with_hook_executor(hook.clone());
4028 let session = agent
4029 .build_session(
4030 "/tmp/test-runtime-event-hook".into(),
4031 Arc::new(StaticStreamingClient::new("hooked answer")),
4032 &opts,
4033 )
4034 .unwrap();
4035
4036 session.send("hello", None).await.unwrap();
4037
4038 let events = hook.events.lock().unwrap();
4039 assert!(events
4040 .iter()
4041 .any(|(_, session_id, event)| session_id == session.id()
4042 && matches!(event, AgentEvent::Start { .. })));
4043 assert!(events
4044 .iter()
4045 .any(|(_, session_id, event)| session_id == session.id()
4046 && matches!(event, AgentEvent::End { .. })));
4047 assert!(events
4048 .iter()
4049 .all(|(run_id, _, _)| run_id.starts_with("run-")));
4050 }
4051
4052 #[tokio::test]
4053 async fn test_stream_exposes_current_run_handle_and_replay() {
4054 let agent = Agent::from_config(test_config()).await.unwrap();
4055 let session = agent
4056 .build_session(
4057 "/tmp/test-stream-run-handle".into(),
4058 Arc::new(CancellableStreamingClient::new("partial answer")),
4059 &SessionOptions::new(),
4060 )
4061 .unwrap();
4062
4063 let (mut rx, handle) = session.stream("hello", None).await.unwrap();
4064 let mut saw_delta = false;
4065 for _ in 0..16 {
4066 let event = tokio::time::timeout(std::time::Duration::from_secs(1), rx.recv())
4067 .await
4068 .expect("stream event before timeout")
4069 .expect("stream emits event");
4070 if matches!(event, AgentEvent::TextDelta { .. }) {
4071 saw_delta = true;
4072 break;
4073 }
4074 }
4075 assert!(saw_delta);
4076
4077 let run = session.current_run().await.expect("current run handle");
4078 assert_eq!(run.session_id(), session.id());
4079 assert!(matches!(
4080 run.status().await,
4081 Some(crate::run::RunStatus::Executing | crate::run::RunStatus::Planning)
4082 ));
4083 assert!(run.cancel().await);
4084
4085 while rx.recv().await.is_some() {}
4086 handle.await.unwrap();
4087
4088 let snapshot = run
4089 .snapshot()
4090 .await
4091 .expect("run snapshot remains replayable");
4092 assert_eq!(snapshot.status, crate::run::RunStatus::Cancelled);
4093 assert!(!run.events().await.is_empty());
4094 }
4095
4096 #[tokio::test]
4097 async fn test_session_options_with_agent_dir() {
4098 let opts = SessionOptions::new()
4099 .with_agent_dir("/tmp/agents")
4100 .with_agent_dir("/tmp/more-agents");
4101 assert_eq!(opts.agent_dirs.len(), 2);
4102 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
4103 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
4104 }
4105
4106 #[test]
4111 fn test_session_options_with_queue_config() {
4112 let qc = SessionQueueConfig::default().with_lane_features();
4113 let opts = SessionOptions::new().with_queue_config(qc.clone());
4114 assert!(opts.queue_config.is_some());
4115
4116 let config = opts.queue_config.unwrap();
4117 assert!(config.enable_dlq);
4118 assert!(config.enable_metrics);
4119 assert!(config.enable_alerts);
4120 assert_eq!(config.default_timeout_ms, Some(60_000));
4121 }
4122
4123 #[tokio::test]
4124 async fn test_session_uses_single_delegation_tool_surface() {
4125 let agent = Agent::from_config(test_config()).await.unwrap();
4126 let session = agent
4127 .session("/tmp/test-workspace-delegation-tools", None)
4128 .unwrap();
4129 let names = session.tool_names();
4130
4131 assert!(names.contains(&"task".to_string()));
4132 assert!(names.contains(&"parallel_task".to_string()));
4133 assert!(!names.contains(&"run_team".to_string()));
4134 }
4135
4136 #[tokio::test(flavor = "multi_thread")]
4137 async fn test_session_with_queue_config() {
4138 let agent = Agent::from_config(test_config()).await.unwrap();
4139 let qc = SessionQueueConfig::default();
4140 let opts = SessionOptions::new().with_queue_config(qc);
4141 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
4142 assert!(session.is_ok());
4143 let session = session.unwrap();
4144 assert!(session.has_queue());
4145 }
4146
4147 #[tokio::test]
4148 async fn test_session_without_queue_config() {
4149 let agent = Agent::from_config(test_config()).await.unwrap();
4150 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
4151 assert!(!session.has_queue());
4152 }
4153
4154 #[tokio::test]
4155 async fn test_session_queue_stats_without_queue() {
4156 let agent = Agent::from_config(test_config()).await.unwrap();
4157 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
4158 let stats = session.queue_stats().await;
4159 assert_eq!(stats.total_pending, 0);
4161 assert_eq!(stats.total_active, 0);
4162 }
4163
4164 #[tokio::test(flavor = "multi_thread")]
4165 async fn test_session_queue_stats_with_queue() {
4166 let agent = Agent::from_config(test_config()).await.unwrap();
4167 let qc = SessionQueueConfig::default();
4168 let opts = SessionOptions::new().with_queue_config(qc);
4169 let session = agent
4170 .session("/tmp/test-workspace-qstats", Some(opts))
4171 .unwrap();
4172 let stats = session.queue_stats().await;
4173 assert_eq!(stats.total_pending, 0);
4175 assert_eq!(stats.total_active, 0);
4176 }
4177
4178 #[tokio::test(flavor = "multi_thread")]
4179 async fn test_session_pending_external_tasks_empty() {
4180 let agent = Agent::from_config(test_config()).await.unwrap();
4181 let qc = SessionQueueConfig::default();
4182 let opts = SessionOptions::new().with_queue_config(qc);
4183 let session = agent
4184 .session("/tmp/test-workspace-ext", Some(opts))
4185 .unwrap();
4186 let tasks = session.pending_external_tasks().await;
4187 assert!(tasks.is_empty());
4188 }
4189
4190 #[tokio::test(flavor = "multi_thread")]
4191 async fn test_session_dead_letters_empty() {
4192 let agent = Agent::from_config(test_config()).await.unwrap();
4193 let qc = SessionQueueConfig::default().with_dlq(Some(100));
4194 let opts = SessionOptions::new().with_queue_config(qc);
4195 let session = agent
4196 .session("/tmp/test-workspace-dlq", Some(opts))
4197 .unwrap();
4198 let dead = session.dead_letters().await;
4199 assert!(dead.is_empty());
4200 }
4201
4202 #[tokio::test(flavor = "multi_thread")]
4203 async fn test_session_queue_metrics_disabled() {
4204 let agent = Agent::from_config(test_config()).await.unwrap();
4205 let qc = SessionQueueConfig::default();
4207 let opts = SessionOptions::new().with_queue_config(qc);
4208 let session = agent
4209 .session("/tmp/test-workspace-nomet", Some(opts))
4210 .unwrap();
4211 let metrics = session.queue_metrics().await;
4212 assert!(metrics.is_none());
4213 }
4214
4215 #[tokio::test(flavor = "multi_thread")]
4216 async fn test_session_queue_metrics_enabled() {
4217 let agent = Agent::from_config(test_config()).await.unwrap();
4218 let qc = SessionQueueConfig::default().with_metrics();
4219 let opts = SessionOptions::new().with_queue_config(qc);
4220 let session = agent
4221 .session("/tmp/test-workspace-met", Some(opts))
4222 .unwrap();
4223 let metrics = session.queue_metrics().await;
4224 assert!(metrics.is_some());
4225 }
4226
4227 #[tokio::test(flavor = "multi_thread")]
4228 async fn test_session_set_lane_handler() {
4229 let agent = Agent::from_config(test_config()).await.unwrap();
4230 let qc = SessionQueueConfig::default();
4231 let opts = SessionOptions::new().with_queue_config(qc);
4232 let session = agent
4233 .session("/tmp/test-workspace-handler", Some(opts))
4234 .unwrap();
4235
4236 session
4238 .set_lane_handler(
4239 SessionLane::Execute,
4240 LaneHandlerConfig {
4241 mode: crate::queue::TaskHandlerMode::External,
4242 timeout_ms: 30_000,
4243 },
4244 )
4245 .await;
4246
4247 }
4250
4251 #[tokio::test(flavor = "multi_thread")]
4256 async fn test_session_has_id() {
4257 let agent = Agent::from_config(test_config()).await.unwrap();
4258 let session = agent.session("/tmp/test-ws-id", None).unwrap();
4259 assert!(!session.session_id().is_empty());
4261 assert_eq!(session.session_id().len(), 36); }
4263
4264 #[tokio::test(flavor = "multi_thread")]
4265 async fn test_session_explicit_id() {
4266 let agent = Agent::from_config(test_config()).await.unwrap();
4267 let opts = SessionOptions::new().with_session_id("my-session-42");
4268 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
4269 assert_eq!(session.session_id(), "my-session-42");
4270 }
4271
4272 #[tokio::test(flavor = "multi_thread")]
4273 async fn test_session_artifact_store_limits_option() {
4274 let agent = Agent::from_config(test_config()).await.unwrap();
4275 let opts =
4276 SessionOptions::new().with_artifact_store_limits(crate::tools::ArtifactStoreLimits {
4277 max_artifacts: 3,
4278 max_bytes: 4096,
4279 });
4280 let session = agent
4281 .session("/tmp/test-ws-artifact-limits", Some(opts))
4282 .unwrap();
4283
4284 let limits = session.tool_executor.artifact_store().limits();
4285 assert_eq!(limits.max_artifacts, 3);
4286 assert_eq!(limits.max_bytes, 4096);
4287 }
4288
4289 #[tokio::test(flavor = "multi_thread")]
4290 async fn test_session_save_no_store() {
4291 let agent = Agent::from_config(test_config()).await.unwrap();
4292 let session = agent.session("/tmp/test-ws-save", None).unwrap();
4293 session.save().await.unwrap();
4295 }
4296
4297 #[tokio::test(flavor = "multi_thread")]
4298 async fn test_session_save_and_load() {
4299 let store = Arc::new(crate::store::MemorySessionStore::new());
4300 let agent = Agent::from_config(test_config()).await.unwrap();
4301
4302 let opts = SessionOptions::new()
4303 .with_session_store(store.clone())
4304 .with_session_id("persist-test");
4305 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
4306
4307 session.save().await.unwrap();
4309
4310 assert!(store.exists("persist-test").await.unwrap());
4312
4313 let data = store.load("persist-test").await.unwrap().unwrap();
4314 assert_eq!(data.id, "persist-test");
4315 assert!(data.messages.is_empty());
4316 }
4317
4318 #[tokio::test(flavor = "multi_thread")]
4319 async fn test_session_save_with_history() {
4320 let store = Arc::new(crate::store::MemorySessionStore::new());
4321 let agent = Agent::from_config(test_config()).await.unwrap();
4322
4323 let opts = SessionOptions::new()
4324 .with_session_store(store.clone())
4325 .with_session_id("history-test");
4326 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
4327
4328 {
4330 let mut h = session.history.write().unwrap();
4331 h.push(Message::user("Hello"));
4332 h.push(Message::user("How are you?"));
4333 }
4334
4335 session.save().await.unwrap();
4336
4337 let data = store.load("history-test").await.unwrap().unwrap();
4338 assert_eq!(data.messages.len(), 2);
4339 }
4340
4341 #[tokio::test(flavor = "multi_thread")]
4342 async fn test_resume_session() {
4343 let store = Arc::new(crate::store::MemorySessionStore::new());
4344 let agent = Agent::from_config(test_config()).await.unwrap();
4345
4346 let opts = SessionOptions::new()
4348 .with_session_store(store.clone())
4349 .with_session_id("resume-test");
4350 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
4351 {
4352 let mut h = session.history.write().unwrap();
4353 h.push(Message::user("What is Rust?"));
4354 h.push(Message::user("Tell me more"));
4355 }
4356 session.save().await.unwrap();
4357
4358 let opts2 = SessionOptions::new().with_session_store(store.clone());
4360 let resumed = agent.resume_session("resume-test", opts2).unwrap();
4361
4362 assert_eq!(resumed.session_id(), "resume-test");
4363 let history = resumed.history();
4364 assert_eq!(history.len(), 2);
4365 assert_eq!(history[0].text(), "What is Rust?");
4366 }
4367
4368 #[tokio::test(flavor = "multi_thread")]
4369 async fn test_resume_session_restores_artifacts() {
4370 let store = Arc::new(crate::store::MemorySessionStore::new());
4371 let agent = Agent::from_config(test_config()).await.unwrap();
4372
4373 let opts = SessionOptions::new()
4374 .with_session_store(store.clone())
4375 .with_session_id("resume-artifacts-test");
4376 let session = agent.session("/tmp/test-ws-artifacts", Some(opts)).unwrap();
4377 session
4378 .tool_executor
4379 .artifact_store()
4380 .put(crate::tools::ToolArtifact {
4381 artifact_id: "tool-output:test:a".to_string(),
4382 artifact_uri: "a3s://tool-output/test/a".to_string(),
4383 tool_name: "test".to_string(),
4384 content: "artifact content".to_string(),
4385 original_bytes: 16,
4386 shown_bytes: 4,
4387 });
4388
4389 session.save().await.unwrap();
4390 let opts2 = SessionOptions::new().with_session_store(store.clone());
4391 let resumed = agent
4392 .resume_session("resume-artifacts-test", opts2)
4393 .unwrap();
4394
4395 let artifact = resumed
4396 .get_artifact("a3s://tool-output/test/a")
4397 .expect("artifact");
4398 assert_eq!(artifact.content, "artifact content");
4399 }
4400
4401 #[tokio::test(flavor = "multi_thread")]
4402 async fn test_resume_session_restores_trace_events() {
4403 let store = Arc::new(crate::store::MemorySessionStore::new());
4404 let agent = Agent::from_config(test_config()).await.unwrap();
4405 let event = crate::trace::TraceEvent::tool_execution(
4406 "read",
4407 true,
4408 0,
4409 std::time::Duration::from_millis(3),
4410 32,
4411 Some(&serde_json::json!({
4412 "artifact": {
4413 "artifact_uri": "a3s://tool-output/read/abc"
4414 }
4415 })),
4416 );
4417
4418 let opts = SessionOptions::new()
4419 .with_session_store(store.clone())
4420 .with_session_id("resume-trace-test");
4421 let session = agent.session("/tmp/test-ws-trace", Some(opts)).unwrap();
4422 session.trace_sink.replace_events(vec![event.clone()]);
4423 session.save().await.unwrap();
4424
4425 let opts2 = SessionOptions::new().with_session_store(store.clone());
4426 let resumed = agent.resume_session("resume-trace-test", opts2).unwrap();
4427
4428 assert_eq!(resumed.trace_events(), vec![event]);
4429 }
4430
4431 #[tokio::test(flavor = "multi_thread")]
4432 async fn test_resume_session_restores_run_records() {
4433 let store = Arc::new(crate::store::MemorySessionStore::new());
4434 let agent = Agent::from_config(test_config()).await.unwrap();
4435
4436 let opts = SessionOptions::new()
4437 .with_session_store(store.clone())
4438 .with_session_id("resume-runs-test");
4439 let session = agent.session("/tmp/test-ws-runs", Some(opts)).unwrap();
4440 let run = session
4441 .run_store
4442 .create_run(session.session_id(), "persist run")
4443 .await;
4444 session
4445 .run_store
4446 .record_event(
4447 &run.id,
4448 AgentEvent::Start {
4449 prompt: "persist run".to_string(),
4450 },
4451 )
4452 .await;
4453 session.save().await.unwrap();
4454
4455 let opts2 = SessionOptions::new().with_session_store(store.clone());
4456 let resumed = agent.resume_session("resume-runs-test", opts2).unwrap();
4457
4458 let runs = resumed.runs().await;
4459 assert_eq!(runs.len(), 1);
4460 assert_eq!(runs[0].prompt, "persist run");
4461 assert_eq!(resumed.run_events(&run.id).await.len(), 1);
4462 }
4463
4464 #[tokio::test(flavor = "multi_thread")]
4465 async fn test_resume_session_restores_verification_reports() {
4466 let store = Arc::new(crate::store::MemorySessionStore::new());
4467 let agent = Agent::from_config(test_config()).await.unwrap();
4468 let report = crate::verification::VerificationReport::new(
4469 "program:test",
4470 vec![crate::verification::VerificationCheck::required(
4471 "check:test",
4472 "test",
4473 "Run tests",
4474 )
4475 .with_status(crate::verification::VerificationStatus::Passed)],
4476 );
4477
4478 let opts = SessionOptions::new()
4479 .with_session_store(store.clone())
4480 .with_session_id("resume-verification-test");
4481 let session = agent
4482 .session("/tmp/test-ws-verification", Some(opts))
4483 .unwrap();
4484 session.record_verification_reports([report.clone()]);
4485 session.save().await.unwrap();
4486
4487 let opts2 = SessionOptions::new().with_session_store(store.clone());
4488 let resumed = agent
4489 .resume_session("resume-verification-test", opts2)
4490 .unwrap();
4491
4492 assert_eq!(resumed.verification_reports(), vec![report]);
4493 assert_eq!(
4494 resumed.verification_summary().status,
4495 crate::verification::VerificationStatus::Passed
4496 );
4497 assert!(resumed
4498 .verification_summary_text()
4499 .contains("Verification passed"));
4500 }
4501
4502 #[tokio::test(flavor = "multi_thread")]
4503 async fn test_verify_commands_builds_report_from_bash_results() {
4504 let temp_dir = tempfile::tempdir().unwrap();
4505 let agent = Agent::from_config(test_config()).await.unwrap();
4506 let session = agent
4507 .session(temp_dir.path().display().to_string(), None)
4508 .unwrap();
4509 let commands = vec![
4510 crate::verification::VerificationCommand::required(
4511 "check:smoke",
4512 "smoke",
4513 "Run smoke command",
4514 "printf ok",
4515 ),
4516 crate::verification::VerificationCommand::required(
4517 "check:failure",
4518 "smoke",
4519 "Run failing command",
4520 "exit 7",
4521 ),
4522 ];
4523
4524 let report = session.verify_commands("turn", &commands).await.unwrap();
4525
4526 assert_eq!(report.subject, "turn");
4527 assert_eq!(
4528 report.status,
4529 crate::verification::VerificationStatus::Failed
4530 );
4531 assert_eq!(
4532 report.checks[0].status,
4533 crate::verification::VerificationStatus::Passed
4534 );
4535 assert_eq!(
4536 report.checks[1].status,
4537 crate::verification::VerificationStatus::Failed
4538 );
4539 assert_eq!(
4540 report.checks[1].residual_risk.as_deref(),
4541 Some("verification command exited with code 7: exit 7")
4542 );
4543 assert_eq!(session.verification_reports(), vec![report]);
4544 assert_eq!(
4545 session.verification_summary().status,
4546 crate::verification::VerificationStatus::Failed
4547 );
4548 }
4549
4550 #[tokio::test(flavor = "multi_thread")]
4551 async fn test_session_verification_presets_reflect_workspace() {
4552 let temp_dir = tempfile::tempdir().unwrap();
4553 std::fs::write(
4554 temp_dir.path().join("package.json"),
4555 r#"{"scripts":{"test":"vitest","typecheck":"tsc --noEmit"}}"#,
4556 )
4557 .unwrap();
4558 let agent = Agent::from_config(test_config()).await.unwrap();
4559 let session = agent
4560 .session(temp_dir.path().display().to_string(), None)
4561 .unwrap();
4562
4563 let presets = session.verification_presets();
4564
4565 assert_eq!(presets.len(), 1);
4566 assert_eq!(presets[0].project_kind, "node");
4567 assert_eq!(presets[0].commands[0].command, "npm test");
4568 assert_eq!(presets[0].commands[1].command, "npm run typecheck");
4569 }
4570
4571 #[tokio::test(flavor = "multi_thread")]
4572 async fn test_resume_session_not_found() {
4573 let store = Arc::new(crate::store::MemorySessionStore::new());
4574 let agent = Agent::from_config(test_config()).await.unwrap();
4575
4576 let opts = SessionOptions::new().with_session_store(store.clone());
4577 let result = agent.resume_session("nonexistent", opts);
4578 assert!(result.is_err());
4579 assert!(result.unwrap_err().to_string().contains("not found"));
4580 }
4581
4582 #[tokio::test(flavor = "multi_thread")]
4583 async fn test_resume_session_no_store() {
4584 let agent = Agent::from_config(test_config()).await.unwrap();
4585 let opts = SessionOptions::new();
4586 let result = agent.resume_session("any-id", opts);
4587 assert!(result.is_err());
4588 assert!(result.unwrap_err().to_string().contains("session_store"));
4589 }
4590
4591 #[tokio::test(flavor = "multi_thread")]
4592 async fn test_file_session_store_persistence() {
4593 let dir = tempfile::TempDir::new().unwrap();
4594 let store = Arc::new(
4595 crate::store::FileSessionStore::new(dir.path())
4596 .await
4597 .unwrap(),
4598 );
4599 let agent = Agent::from_config(test_config()).await.unwrap();
4600
4601 let opts = SessionOptions::new()
4603 .with_session_store(store.clone())
4604 .with_session_id("file-persist");
4605 let session = agent
4606 .session("/tmp/test-ws-file-persist", Some(opts))
4607 .unwrap();
4608 {
4609 let mut h = session.history.write().unwrap();
4610 h.push(Message::user("test message"));
4611 }
4612 session.save().await.unwrap();
4613
4614 let store2 = Arc::new(
4616 crate::store::FileSessionStore::new(dir.path())
4617 .await
4618 .unwrap(),
4619 );
4620 let data = store2.load("file-persist").await.unwrap().unwrap();
4621 assert_eq!(data.messages.len(), 1);
4622 }
4623
4624 #[tokio::test(flavor = "multi_thread")]
4625 async fn test_session_options_builders() {
4626 let opts = SessionOptions::new()
4627 .with_session_id("test-id")
4628 .with_auto_save(true);
4629 assert_eq!(opts.session_id, Some("test-id".to_string()));
4630 assert!(opts.auto_save);
4631 }
4632
4633 #[tokio::test(flavor = "multi_thread")]
4638 async fn test_session_with_memory_store() {
4639 use a3s_memory::InMemoryStore;
4640 let store = Arc::new(InMemoryStore::new());
4641 let agent = Agent::from_config(test_config()).await.unwrap();
4642 let opts = SessionOptions::new().with_memory(store);
4643 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
4644 assert!(session.memory().is_some());
4645 }
4646
4647 #[tokio::test(flavor = "multi_thread")]
4648 async fn test_session_without_memory_store() {
4649 let agent = Agent::from_config(test_config()).await.unwrap();
4650 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
4651 assert!(session.memory().is_none());
4652 }
4653
4654 #[tokio::test(flavor = "multi_thread")]
4655 async fn test_session_memory_wired_into_config() {
4656 use a3s_memory::InMemoryStore;
4657 let store = Arc::new(InMemoryStore::new());
4658 let agent = Agent::from_config(test_config()).await.unwrap();
4659 let opts = SessionOptions::new().with_memory(store);
4660 let session = agent
4661 .session("/tmp/test-ws-mem-config", Some(opts))
4662 .unwrap();
4663 assert!(session.memory().is_some());
4665 }
4666
4667 #[tokio::test(flavor = "multi_thread")]
4668 async fn test_session_with_file_memory() {
4669 let dir = tempfile::TempDir::new().unwrap();
4670 let agent = Agent::from_config(test_config()).await.unwrap();
4671 let opts = SessionOptions::new().with_file_memory(dir.path());
4672 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
4673 assert!(session.memory().is_some());
4674 }
4675
4676 #[tokio::test(flavor = "multi_thread")]
4677 async fn test_memory_remember_and_recall() {
4678 use a3s_memory::InMemoryStore;
4679 let store = Arc::new(InMemoryStore::new());
4680 let agent = Agent::from_config(test_config()).await.unwrap();
4681 let opts = SessionOptions::new().with_memory(store);
4682 let session = agent
4683 .session("/tmp/test-ws-mem-recall", Some(opts))
4684 .unwrap();
4685
4686 let memory = session.memory().unwrap();
4687 memory
4688 .remember_success("write a file", &["write".to_string()], "done")
4689 .await
4690 .unwrap();
4691
4692 let results = memory.recall_similar("write", 5).await.unwrap();
4693 assert!(!results.is_empty());
4694 let stats = memory.stats().await.unwrap();
4695 assert_eq!(stats.long_term_count, 1);
4696 }
4697
4698 #[tokio::test(flavor = "multi_thread")]
4703 async fn test_session_tool_timeout_configured() {
4704 let agent = Agent::from_config(test_config()).await.unwrap();
4705 let opts = SessionOptions::new().with_tool_timeout(5000);
4706 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
4707 assert!(!session.id().is_empty());
4708 }
4709
4710 #[tokio::test(flavor = "multi_thread")]
4715 async fn test_session_without_queue_builds_ok() {
4716 let agent = Agent::from_config(test_config()).await.unwrap();
4717 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
4718 assert!(!session.id().is_empty());
4719 }
4720
4721 #[tokio::test(flavor = "multi_thread")]
4726 async fn test_concurrent_history_reads() {
4727 let agent = Agent::from_config(test_config()).await.unwrap();
4728 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
4729
4730 let handles: Vec<_> = (0..10)
4731 .map(|_| {
4732 let s = Arc::clone(&session);
4733 tokio::spawn(async move { s.history().len() })
4734 })
4735 .collect();
4736
4737 for h in handles {
4738 h.await.unwrap();
4739 }
4740 }
4741
4742 #[tokio::test(flavor = "multi_thread")]
4747 async fn test_session_no_init_warning_without_file_memory() {
4748 let agent = Agent::from_config(test_config()).await.unwrap();
4749 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
4750 assert!(session.init_warning().is_none());
4751 }
4752
4753 #[tokio::test(flavor = "multi_thread")]
4754 async fn test_register_agent_dir_loads_agents_into_live_session() {
4755 let temp_dir = tempfile::tempdir().unwrap();
4756
4757 std::fs::write(
4759 temp_dir.path().join("my-agent.yaml"),
4760 "name: my-dynamic-agent\ndescription: Dynamically registered agent\n",
4761 )
4762 .unwrap();
4763
4764 let agent = Agent::from_config(test_config()).await.unwrap();
4765 let session = agent.session(".", None).unwrap();
4766
4767 assert!(!session.agent_registry.exists("my-dynamic-agent"));
4769
4770 let count = session.register_agent_dir(temp_dir.path());
4771 assert_eq!(count, 1);
4772 assert!(session.agent_registry.exists("my-dynamic-agent"));
4773 }
4774
4775 #[tokio::test(flavor = "multi_thread")]
4776 async fn test_register_agent_dir_empty_dir_returns_zero() {
4777 let temp_dir = tempfile::tempdir().unwrap();
4778 let agent = Agent::from_config(test_config()).await.unwrap();
4779 let session = agent.session(".", None).unwrap();
4780 let count = session.register_agent_dir(temp_dir.path());
4781 assert_eq!(count, 0);
4782 }
4783
4784 #[tokio::test(flavor = "multi_thread")]
4785 async fn test_register_agent_dir_nonexistent_returns_zero() {
4786 let agent = Agent::from_config(test_config()).await.unwrap();
4787 let session = agent.session(".", None).unwrap();
4788 let count = session.register_agent_dir(std::path::Path::new("/nonexistent/path/abc"));
4789 assert_eq!(count, 0);
4790 }
4791
4792 #[tokio::test(flavor = "multi_thread")]
4793 async fn test_session_with_mcp_manager_builds_ok() {
4794 use crate::mcp::manager::McpManager;
4795 let mcp = Arc::new(McpManager::new());
4796 let agent = Agent::from_config(test_config()).await.unwrap();
4797 let opts = SessionOptions::new().with_mcp(mcp);
4798 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
4800 assert!(!session.id().is_empty());
4801 }
4802
4803 #[test]
4804 fn test_session_command_is_available_from_queue_module() {
4805 use crate::queue::SessionCommand;
4807 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
4808 }
4809}