1use crate::agent::{AgentConfig, AgentEvent, AgentLoop, AgentResult};
20use crate::commands::{
21 CommandAction, CommandContext, CommandRegistry, CronCancelCommand, CronListCommand, LoopCommand,
22};
23use crate::config::CodeConfig;
24use crate::error::{read_or_recover, write_or_recover, CodeError, Result};
25use crate::hitl::PendingConfirmationInfo;
26use crate::llm::{LlmClient, Message};
27use crate::prompts::{PlanningMode, SystemPromptSlots};
28use crate::queue::{
29 ExternalTask, ExternalTaskResult, LaneHandlerConfig, SessionLane, SessionQueueConfig,
30 SessionQueueStats,
31};
32use crate::scheduler::{CronScheduler, ScheduledFire};
33use crate::session_lane_queue::SessionLaneQueue;
34use crate::task::{ProgressTracker, TaskManager};
35use crate::text::truncate_utf8;
36use crate::tools::{ToolContext, ToolExecutor};
37use a3s_lane::{DeadLetter, MetricsSnapshot};
38use a3s_memory::{FileMemoryStore, MemoryStore};
39use anyhow::Context;
40use std::collections::HashMap;
41use std::path::{Path, PathBuf};
42use std::sync::atomic::{AtomicBool, Ordering};
43use std::sync::{Arc, RwLock};
44use tokio::sync::{broadcast, mpsc};
45use tokio::task::JoinHandle;
46
47fn safe_canonicalize(path: &Path) -> PathBuf {
50 match std::fs::canonicalize(path) {
51 Ok(p) => strip_unc_prefix(p),
52 Err(_) => path.to_path_buf(),
53 }
54}
55
56fn strip_unc_prefix(path: PathBuf) -> PathBuf {
59 #[cfg(windows)]
60 {
61 let s = path.to_string_lossy();
62 if let Some(stripped) = s.strip_prefix(r"\\?\") {
63 return PathBuf::from(stripped);
64 }
65 }
66 path
67}
68
69#[derive(Debug, Clone)]
75pub struct ToolCallResult {
76 pub name: String,
77 pub output: String,
78 pub exit_code: i32,
79 pub metadata: Option<serde_json::Value>,
80}
81
82#[derive(Clone, Default)]
88pub struct SessionOptions {
89 pub model: Option<String>,
91 pub agent_dirs: Vec<PathBuf>,
94 pub queue_config: Option<SessionQueueConfig>,
99 pub security_provider: Option<Arc<dyn crate::security::SecurityProvider>>,
101 pub context_providers: Vec<Arc<dyn crate::context::ContextProvider>>,
103 pub confirmation_manager: Option<Arc<dyn crate::hitl::ConfirmationProvider>>,
105 pub permission_checker: Option<Arc<dyn crate::permissions::PermissionChecker>>,
107 pub planning_mode: PlanningMode,
109 pub goal_tracking: bool,
111 pub skill_dirs: Vec<PathBuf>,
114 pub skill_registry: Option<Arc<crate::skills::SkillRegistry>>,
116 pub memory_store: Option<Arc<dyn MemoryStore>>,
118 pub(crate) file_memory_dir: Option<PathBuf>,
120 pub session_store: Option<Arc<dyn crate::store::SessionStore>>,
122 pub session_id: Option<String>,
124 pub auto_save: bool,
126 pub max_parse_retries: Option<u32>,
129 pub tool_timeout_ms: Option<u64>,
132 pub circuit_breaker_threshold: Option<u32>,
136 pub sandbox_config: Option<crate::sandbox::SandboxConfig>,
144 pub sandbox_handle: Option<Arc<dyn crate::sandbox::BashSandbox>>,
150 pub auto_compact: bool,
152 pub auto_compact_threshold: Option<f32>,
155 pub continuation_enabled: Option<bool>,
158 pub max_continuation_turns: Option<u32>,
161 pub mcp_manager: Option<Arc<crate::mcp::manager::McpManager>>,
166 pub temperature: Option<f32>,
168 pub thinking_budget: Option<usize>,
170 pub max_tool_rounds: Option<usize>,
176 pub prompt_slots: Option<SystemPromptSlots>,
182 pub hook_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
189 pub document_parser_registry: Option<Arc<crate::document_parser::DocumentParserRegistry>>,
196 pub(crate) document_pipeline_registry:
200 Option<Arc<crate::document_pipeline::DocumentPipelineRegistry>>,
201 pub document_ocr_provider: Option<Arc<dyn crate::document_ocr::DocumentOcrProvider>>,
208 pub plugins: Vec<std::sync::Arc<dyn crate::plugin::Plugin>>,
217}
218
219impl std::fmt::Debug for SessionOptions {
220 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
221 f.debug_struct("SessionOptions")
222 .field("model", &self.model)
223 .field("agent_dirs", &self.agent_dirs)
224 .field("skill_dirs", &self.skill_dirs)
225 .field("queue_config", &self.queue_config)
226 .field("security_provider", &self.security_provider.is_some())
227 .field("context_providers", &self.context_providers.len())
228 .field("confirmation_manager", &self.confirmation_manager.is_some())
229 .field("permission_checker", &self.permission_checker.is_some())
230 .field("planning_mode", &self.planning_mode)
231 .field("goal_tracking", &self.goal_tracking)
232 .field(
233 "skill_registry",
234 &self
235 .skill_registry
236 .as_ref()
237 .map(|r| format!("{} skills", r.len())),
238 )
239 .field("memory_store", &self.memory_store.is_some())
240 .field("session_store", &self.session_store.is_some())
241 .field("session_id", &self.session_id)
242 .field("auto_save", &self.auto_save)
243 .field("max_parse_retries", &self.max_parse_retries)
244 .field("tool_timeout_ms", &self.tool_timeout_ms)
245 .field("circuit_breaker_threshold", &self.circuit_breaker_threshold)
246 .field("sandbox_config", &self.sandbox_config)
247 .field("auto_compact", &self.auto_compact)
248 .field("auto_compact_threshold", &self.auto_compact_threshold)
249 .field("continuation_enabled", &self.continuation_enabled)
250 .field("max_continuation_turns", &self.max_continuation_turns)
251 .field(
252 "plugins",
253 &self.plugins.iter().map(|p| p.name()).collect::<Vec<_>>(),
254 )
255 .field("mcp_manager", &self.mcp_manager.is_some())
256 .field("temperature", &self.temperature)
257 .field("thinking_budget", &self.thinking_budget)
258 .field("max_tool_rounds", &self.max_tool_rounds)
259 .field("prompt_slots", &self.prompt_slots.is_some())
260 .field(
261 "document_pipeline_registry",
262 &self.document_pipeline_registry.is_some(),
263 )
264 .field(
265 "document_ocr_provider",
266 &self.document_ocr_provider.is_some(),
267 )
268 .finish()
269 }
270}
271
272impl SessionOptions {
273 pub fn new() -> Self {
274 Self::default()
275 }
276
277 pub fn with_plugin(mut self, plugin: impl crate::plugin::Plugin + 'static) -> Self {
282 self.plugins.push(std::sync::Arc::new(plugin));
283 self
284 }
285
286 pub fn with_model(mut self, model: impl Into<String>) -> Self {
287 self.model = Some(model.into());
288 self
289 }
290
291 pub fn with_document_parser_registry(
293 mut self,
294 registry: Arc<crate::document_parser::DocumentParserRegistry>,
295 ) -> Self {
296 self.document_parser_registry = Some(registry);
297 self
298 }
299
300 pub fn with_document_ocr_provider(
302 mut self,
303 provider: Arc<dyn crate::document_ocr::DocumentOcrProvider>,
304 ) -> Self {
305 self.document_ocr_provider = Some(provider);
306 self
307 }
308
309 pub fn with_agent_dir(mut self, dir: impl Into<PathBuf>) -> Self {
310 self.agent_dirs.push(dir.into());
311 self
312 }
313
314 pub fn with_queue_config(mut self, config: SessionQueueConfig) -> Self {
315 self.queue_config = Some(config);
316 self
317 }
318
319 pub fn with_default_security(mut self) -> Self {
321 self.security_provider = Some(Arc::new(crate::security::DefaultSecurityProvider::new()));
322 self
323 }
324
325 pub fn with_security_provider(
327 mut self,
328 provider: Arc<dyn crate::security::SecurityProvider>,
329 ) -> Self {
330 self.security_provider = Some(provider);
331 self
332 }
333
334 pub fn with_fs_context(mut self, root_path: impl Into<PathBuf>) -> Self {
336 let config = crate::context::FileSystemContextConfig::new(root_path);
337 self.context_providers
338 .push(Arc::new(crate::context::FileSystemContextProvider::new(
339 config,
340 )));
341 self
342 }
343
344 pub fn with_context_provider(
346 mut self,
347 provider: Arc<dyn crate::context::ContextProvider>,
348 ) -> Self {
349 self.context_providers.push(provider);
350 self
351 }
352
353 pub fn with_confirmation_manager(
355 mut self,
356 manager: Arc<dyn crate::hitl::ConfirmationProvider>,
357 ) -> Self {
358 self.confirmation_manager = Some(manager);
359 self
360 }
361
362 pub fn with_permission_checker(
364 mut self,
365 checker: Arc<dyn crate::permissions::PermissionChecker>,
366 ) -> Self {
367 self.permission_checker = Some(checker);
368 self
369 }
370
371 pub fn with_permissive_policy(self) -> Self {
378 self.with_permission_checker(Arc::new(crate::permissions::PermissionPolicy::permissive()))
379 }
380
381 pub fn with_planning_mode(mut self, mode: PlanningMode) -> Self {
383 self.planning_mode = mode;
384 self
385 }
386
387 pub fn with_planning(mut self, enabled: bool) -> Self {
389 self.planning_mode = if enabled {
390 PlanningMode::Enabled
391 } else {
392 PlanningMode::Disabled
393 };
394 self
395 }
396
397 pub fn with_goal_tracking(mut self, enabled: bool) -> Self {
399 self.goal_tracking = enabled;
400 self
401 }
402
403 pub fn with_builtin_skills(mut self) -> Self {
405 self.skill_registry = Some(Arc::new(crate::skills::SkillRegistry::with_builtins()));
406 self
407 }
408
409 pub fn with_skill_registry(mut self, registry: Arc<crate::skills::SkillRegistry>) -> Self {
411 self.skill_registry = Some(registry);
412 self
413 }
414
415 pub fn with_skill_dirs(mut self, dirs: impl IntoIterator<Item = impl Into<PathBuf>>) -> Self {
418 self.skill_dirs.extend(dirs.into_iter().map(Into::into));
419 self
420 }
421
422 pub fn with_skills_from_dir(mut self, dir: impl AsRef<std::path::Path>) -> Self {
424 let registry = self
425 .skill_registry
426 .unwrap_or_else(|| Arc::new(crate::skills::SkillRegistry::new()));
427 if let Err(e) = registry.load_from_dir(&dir) {
428 tracing::warn!(
429 dir = %dir.as_ref().display(),
430 error = %e,
431 "Failed to load skills from directory — continuing without them"
432 );
433 }
434 self.skill_registry = Some(registry);
435 self
436 }
437
438 pub fn with_memory(mut self, store: Arc<dyn MemoryStore>) -> Self {
440 self.memory_store = Some(store);
441 self
442 }
443
444 pub fn with_file_memory(mut self, dir: impl Into<PathBuf>) -> Self {
450 self.file_memory_dir = Some(dir.into());
451 self
452 }
453
454 pub fn with_session_store(mut self, store: Arc<dyn crate::store::SessionStore>) -> Self {
456 self.session_store = Some(store);
457 self
458 }
459
460 pub fn with_file_session_store(mut self, dir: impl Into<PathBuf>) -> Self {
462 let dir = dir.into();
463 match tokio::runtime::Handle::try_current() {
464 Ok(handle) => {
465 match tokio::task::block_in_place(|| {
466 handle.block_on(crate::store::FileSessionStore::new(dir))
467 }) {
468 Ok(store) => {
469 self.session_store =
470 Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>);
471 }
472 Err(e) => {
473 tracing::warn!("Failed to create file session store: {}", e);
474 }
475 }
476 }
477 Err(_) => {
478 tracing::warn!(
479 "No async runtime available for file session store — persistence disabled"
480 );
481 }
482 }
483 self
484 }
485
486 pub fn with_session_id(mut self, id: impl Into<String>) -> Self {
488 self.session_id = Some(id.into());
489 self
490 }
491
492 pub fn with_auto_save(mut self, enabled: bool) -> Self {
494 self.auto_save = enabled;
495 self
496 }
497
498 pub fn with_parse_retries(mut self, max: u32) -> Self {
504 self.max_parse_retries = Some(max);
505 self
506 }
507
508 pub fn with_tool_timeout(mut self, timeout_ms: u64) -> Self {
514 self.tool_timeout_ms = Some(timeout_ms);
515 self
516 }
517
518 pub fn with_circuit_breaker(mut self, threshold: u32) -> Self {
524 self.circuit_breaker_threshold = Some(threshold);
525 self
526 }
527
528 pub fn with_resilience_defaults(self) -> Self {
534 self.with_parse_retries(2)
535 .with_tool_timeout(120_000)
536 .with_circuit_breaker(3)
537 }
538
539 pub fn with_sandbox(mut self, config: crate::sandbox::SandboxConfig) -> Self {
558 self.sandbox_config = Some(config);
559 self
560 }
561
562 pub fn with_sandbox_handle(mut self, handle: Arc<dyn crate::sandbox::BashSandbox>) -> Self {
570 self.sandbox_handle = Some(handle);
571 self
572 }
573
574 pub fn with_auto_compact(mut self, enabled: bool) -> Self {
579 self.auto_compact = enabled;
580 self
581 }
582
583 pub fn with_auto_compact_threshold(mut self, threshold: f32) -> Self {
585 self.auto_compact_threshold = Some(threshold.clamp(0.0, 1.0));
586 self
587 }
588
589 pub fn with_continuation(mut self, enabled: bool) -> Self {
594 self.continuation_enabled = Some(enabled);
595 self
596 }
597
598 pub fn with_max_continuation_turns(mut self, turns: u32) -> Self {
600 self.max_continuation_turns = Some(turns);
601 self
602 }
603
604 pub fn with_mcp(mut self, manager: Arc<crate::mcp::manager::McpManager>) -> Self {
609 self.mcp_manager = Some(manager);
610 self
611 }
612
613 pub fn with_temperature(mut self, temperature: f32) -> Self {
614 self.temperature = Some(temperature);
615 self
616 }
617
618 pub fn with_thinking_budget(mut self, budget: usize) -> Self {
619 self.thinking_budget = Some(budget);
620 self
621 }
622
623 pub fn with_max_tool_rounds(mut self, rounds: usize) -> Self {
628 self.max_tool_rounds = Some(rounds);
629 self
630 }
631
632 pub fn with_prompt_slots(mut self, slots: SystemPromptSlots) -> Self {
637 self.prompt_slots = Some(slots);
638 self
639 }
640
641 pub fn with_hook_executor(mut self, executor: Arc<dyn crate::hooks::HookExecutor>) -> Self {
647 self.hook_executor = Some(executor);
648 self
649 }
650}
651
652pub struct Agent {
661 llm_client: Arc<dyn LlmClient>,
662 code_config: CodeConfig,
663 config: AgentConfig,
664 global_mcp: Option<Arc<crate::mcp::manager::McpManager>>,
666 global_mcp_tools: std::sync::Mutex<Vec<(String, crate::mcp::McpTool)>>,
669}
670
671impl std::fmt::Debug for Agent {
672 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
673 f.debug_struct("Agent").finish()
674 }
675}
676
677impl Agent {
678 pub async fn new(config_source: impl Into<String>) -> Result<Self> {
682 let source = config_source.into();
683
684 let expanded = if let Some(rest) = source.strip_prefix("~/") {
686 let home = std::env::var_os("HOME").or_else(|| std::env::var_os("USERPROFILE"));
687 if let Some(home) = home {
688 PathBuf::from(home).join(rest).display().to_string()
689 } else {
690 source.clone()
691 }
692 } else {
693 source.clone()
694 };
695
696 let path = Path::new(&expanded);
697
698 let config = if matches!(
699 path.extension().and_then(|ext| ext.to_str()),
700 Some("hcl" | "json")
701 ) {
702 if !path.exists() {
703 return Err(CodeError::Config(format!(
704 "Config file not found: {}",
705 path.display()
706 )));
707 }
708
709 CodeConfig::from_file(path)
710 .with_context(|| format!("Failed to load config: {}", path.display()))?
711 } else {
712 CodeConfig::from_hcl(&source).context("Failed to parse config as HCL string")?
714 };
715
716 Self::from_config(config).await
717 }
718
719 pub async fn create(config_source: impl Into<String>) -> Result<Self> {
724 Self::new(config_source).await
725 }
726
727 pub async fn from_config(config: CodeConfig) -> Result<Self> {
729 let llm_config = config
730 .default_llm_config()
731 .context("default_model must be set in 'provider/model' format with a valid API key")?;
732 let llm_client = crate::llm::create_client_with_config(llm_config);
733
734 let agent_config = AgentConfig {
735 max_tool_rounds: config
736 .max_tool_rounds
737 .unwrap_or(AgentConfig::default().max_tool_rounds),
738 ..AgentConfig::default()
739 };
740
741 let (global_mcp, global_mcp_tools) = if config.mcp_servers.is_empty() {
743 (None, vec![])
744 } else {
745 let manager = Arc::new(crate::mcp::manager::McpManager::new());
746 for server in &config.mcp_servers {
747 if !server.enabled {
748 continue;
749 }
750 manager.register_server(server.clone()).await;
751 if let Err(e) = manager.connect(&server.name).await {
752 tracing::warn!(
753 server = %server.name,
754 error = %e,
755 "Failed to connect to MCP server — skipping"
756 );
757 }
758 }
759 let tools = manager.get_all_tools().await;
761 (Some(manager), tools)
762 };
763
764 let mut agent = Agent {
765 llm_client,
766 code_config: config,
767 config: agent_config,
768 global_mcp,
769 global_mcp_tools: std::sync::Mutex::new(global_mcp_tools),
770 };
771
772 let registry = Arc::new(crate::skills::SkillRegistry::with_builtins());
774 for dir in &agent.code_config.skill_dirs.clone() {
775 if let Err(e) = registry.load_from_dir(dir) {
776 tracing::warn!(
777 dir = %dir.display(),
778 error = %e,
779 "Failed to load skills from directory — skipping"
780 );
781 }
782 }
783 agent.config.skill_registry = Some(registry);
784
785 Ok(agent)
786 }
787
788 pub async fn refresh_mcp_tools(&self) -> Result<()> {
796 if let Some(ref mcp) = self.global_mcp {
797 let fresh = mcp.get_all_tools().await;
798 *self
799 .global_mcp_tools
800 .lock()
801 .expect("global_mcp_tools lock poisoned") = fresh;
802 }
803 Ok(())
804 }
805
806 pub fn session(
811 &self,
812 workspace: impl Into<String>,
813 options: Option<SessionOptions>,
814 ) -> Result<AgentSession> {
815 let opts = options.unwrap_or_default();
816
817 let mut merged_opts = match (&self.global_mcp, &opts.mcp_manager) {
820 (Some(global), Some(session)) => {
821 let global = Arc::clone(global);
822 let session_mgr = Arc::clone(session);
823 match tokio::runtime::Handle::try_current() {
824 Ok(handle) => {
825 let global_for_merge = Arc::clone(&global);
826 tokio::task::block_in_place(|| {
827 handle.block_on(async move {
828 for config in session_mgr.all_configs().await {
829 let name = config.name.clone();
830 global_for_merge.register_server(config).await;
831 if let Err(e) = global_for_merge.connect(&name).await {
832 tracing::warn!(
833 server = %name,
834 error = %e,
835 "Failed to connect session-level MCP server — skipping"
836 );
837 }
838 }
839 })
840 });
841 }
842 Err(_) => {
843 tracing::warn!(
844 "No async runtime available to merge session-level MCP servers \
845 into global manager — session MCP servers will not be available"
846 );
847 }
848 }
849 SessionOptions {
850 mcp_manager: Some(Arc::clone(&global)),
851 ..opts
852 }
853 }
854 (Some(global), None) => SessionOptions {
855 mcp_manager: Some(Arc::clone(global)),
856 ..opts
857 },
858 _ => opts,
859 };
860
861 let session_id = merged_opts
862 .session_id
863 .clone()
864 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
865 merged_opts.session_id = Some(session_id.clone());
866 let llm_client = self.resolve_session_llm_client(&merged_opts, Some(&session_id))?;
867
868 self.build_session(workspace.into(), llm_client, &merged_opts)
869 }
870
871 pub fn session_for_agent(
886 &self,
887 workspace: impl Into<String>,
888 def: &crate::subagent::AgentDefinition,
889 extra: Option<SessionOptions>,
890 ) -> Result<AgentSession> {
891 let mut opts = extra.unwrap_or_default();
892
893 if opts.permission_checker.is_none()
895 && (!def.permissions.allow.is_empty() || !def.permissions.deny.is_empty())
896 {
897 opts.permission_checker = Some(Arc::new(def.permissions.clone()));
898 }
899
900 if opts.max_tool_rounds.is_none() {
902 if let Some(steps) = def.max_steps {
903 opts.max_tool_rounds = Some(steps);
904 }
905 }
906
907 if opts.model.is_none() {
909 if let Some(ref m) = def.model {
910 let provider = m.provider.as_deref().unwrap_or("anthropic");
911 opts.model = Some(format!("{}/{}", provider, m.model));
912 }
913 }
914
915 if let Some(ref prompt) = def.prompt {
922 let slots = opts
923 .prompt_slots
924 .get_or_insert_with(crate::prompts::SystemPromptSlots::default);
925 if slots.extra.is_none() {
926 slots.extra = Some(prompt.clone());
927 }
928 }
929
930 self.session(workspace, Some(opts))
931 }
932
933 pub fn resume_session(
941 &self,
942 session_id: &str,
943 options: SessionOptions,
944 ) -> Result<AgentSession> {
945 let store = options.session_store.as_ref().ok_or_else(|| {
946 crate::error::CodeError::Session(
947 "resume_session requires a session_store in SessionOptions".to_string(),
948 )
949 })?;
950
951 let data = match tokio::runtime::Handle::try_current() {
953 Ok(handle) => tokio::task::block_in_place(|| handle.block_on(store.load(session_id)))
954 .map_err(|e| {
955 crate::error::CodeError::Session(format!(
956 "Failed to load session {}: {}",
957 session_id, e
958 ))
959 })?,
960 Err(_) => {
961 return Err(crate::error::CodeError::Session(
962 "No async runtime available for session resume".to_string(),
963 ))
964 }
965 };
966
967 let data = data.ok_or_else(|| {
968 crate::error::CodeError::Session(format!("Session not found: {}", session_id))
969 })?;
970
971 let mut opts = options;
973 opts.session_id = Some(data.id.clone());
974 let llm_client = self.resolve_session_llm_client(&opts, Some(&data.id))?;
975
976 let session = self.build_session(data.config.workspace.clone(), llm_client, &opts)?;
977
978 *write_or_recover(&session.history) = data.messages;
980
981 Ok(session)
982 }
983
984 fn resolve_session_llm_client(
985 &self,
986 opts: &SessionOptions,
987 session_id: Option<&str>,
988 ) -> Result<Arc<dyn LlmClient>> {
989 let model_ref = if let Some(ref model) = opts.model {
990 model.as_str()
991 } else {
992 if opts.temperature.is_some() || opts.thinking_budget.is_some() {
993 tracing::warn!(
994 "temperature/thinking_budget set without model override — these will be ignored. \
995 Use with_model() to apply LLM parameter overrides."
996 );
997 }
998 self.code_config
999 .default_model
1000 .as_deref()
1001 .context("default_model must be set in 'provider/model' format")?
1002 };
1003
1004 let (provider_name, model_id) = model_ref
1005 .split_once('/')
1006 .context("model format must be 'provider/model' (e.g., 'openai/gpt-4o')")?;
1007
1008 let mut llm_config = self
1009 .code_config
1010 .llm_config(provider_name, model_id)
1011 .with_context(|| {
1012 format!("provider '{provider_name}' or model '{model_id}' not found in config")
1013 })?;
1014
1015 if opts.model.is_some() {
1016 if let Some(temp) = opts.temperature {
1017 llm_config = llm_config.with_temperature(temp);
1018 }
1019 if let Some(budget) = opts.thinking_budget {
1020 llm_config = llm_config.with_thinking_budget(budget);
1021 }
1022 }
1023
1024 if let Some(session_id) = session_id {
1025 llm_config = llm_config.with_session_id(session_id);
1026 }
1027
1028 Ok(crate::llm::create_client_with_config(llm_config))
1029 }
1030
1031 fn build_session(
1032 &self,
1033 workspace: String,
1034 llm_client: Arc<dyn LlmClient>,
1035 opts: &SessionOptions,
1036 ) -> Result<AgentSession> {
1037 let canonical = safe_canonicalize(Path::new(&workspace));
1038
1039 let tool_executor = Arc::new(ToolExecutor::new(canonical.display().to_string()));
1040 let search_builtin_cfg = self
1041 .code_config
1042 .agentic_search
1043 .clone()
1044 .unwrap_or_default()
1045 .normalized();
1046 let parse_builtin_cfg = self
1047 .code_config
1048 .agentic_parse
1049 .clone()
1050 .unwrap_or_default()
1051 .normalized();
1052 let document_parser_cfg = self
1053 .code_config
1054 .document_parser
1055 .clone()
1056 .unwrap_or_default()
1057 .normalized();
1058 let effective_document_parser_registry =
1059 crate::document_registry_factory::resolve_document_parser_registry(
1060 opts.document_parser_registry.clone(),
1061 document_parser_cfg.clone(),
1062 opts.document_ocr_provider.clone(),
1063 );
1064 let effective_document_pipeline_registry =
1065 opts.document_pipeline_registry.clone().unwrap_or_else(|| {
1066 Arc::new(
1067 crate::document_pipeline_defaults::build_default_document_pipeline_registry_for_config(
1068 &document_parser_cfg,
1069 ),
1070 )
1071 });
1072
1073 {
1075 use crate::tools::register_agentic_tools;
1076 register_agentic_tools(
1077 tool_executor.registry(),
1078 Some(Arc::clone(&llm_client)),
1079 search_builtin_cfg.enabled,
1080 parse_builtin_cfg.enabled,
1081 );
1082 }
1083
1084 tool_executor
1086 .registry()
1087 .set_document_parsers(Arc::clone(&effective_document_parser_registry));
1088 tool_executor
1089 .registry()
1090 .set_document_pipeline(Arc::clone(&effective_document_pipeline_registry));
1091 if let Some(ref search_config) = self.code_config.search {
1092 tool_executor
1093 .registry()
1094 .set_search_config(search_config.clone());
1095 }
1096 tool_executor
1097 .registry()
1098 .set_agentic_search_config(search_builtin_cfg.clone());
1099 tool_executor
1100 .registry()
1101 .set_agentic_parse_config(parse_builtin_cfg.clone());
1102 tool_executor
1103 .registry()
1104 .set_document_parser_config(document_parser_cfg.clone());
1105
1106 let agent_registry = {
1110 use crate::subagent::{load_agents_from_dir, AgentRegistry};
1111 use crate::tools::register_task_with_mcp;
1112 let registry = AgentRegistry::new();
1113 for dir in self
1114 .code_config
1115 .agent_dirs
1116 .iter()
1117 .chain(opts.agent_dirs.iter())
1118 {
1119 for agent in load_agents_from_dir(dir) {
1120 registry.register(agent);
1121 }
1122 }
1123 let registry = Arc::new(registry);
1124 register_task_with_mcp(
1125 tool_executor.registry(),
1126 Arc::clone(&llm_client),
1127 Arc::clone(®istry),
1128 canonical.display().to_string(),
1129 opts.mcp_manager.clone(),
1130 );
1131 registry
1132 };
1133
1134 if let Some(ref mcp) = opts.mcp_manager {
1137 let all_tools: Vec<(String, crate::mcp::McpTool)> = if std::ptr::eq(
1140 Arc::as_ptr(mcp),
1141 self.global_mcp
1142 .as_ref()
1143 .map(Arc::as_ptr)
1144 .unwrap_or(std::ptr::null()),
1145 ) {
1146 self.global_mcp_tools
1148 .lock()
1149 .expect("global_mcp_tools lock poisoned")
1150 .clone()
1151 } else {
1152 match tokio::runtime::Handle::try_current() {
1154 Ok(handle) => {
1155 tokio::task::block_in_place(|| handle.block_on(mcp.get_all_tools()))
1156 }
1157 Err(_) => {
1158 tracing::warn!(
1159 "No async runtime available for session-level MCP tools — \
1160 MCP tools will not be registered"
1161 );
1162 vec![]
1163 }
1164 }
1165 };
1166
1167 let mut by_server: std::collections::HashMap<String, Vec<crate::mcp::McpTool>> =
1168 std::collections::HashMap::new();
1169 for (server, tool) in all_tools {
1170 by_server.entry(server).or_default().push(tool);
1171 }
1172 for (server_name, tools) in by_server {
1173 for tool in
1174 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(mcp))
1175 {
1176 tool_executor.register_dynamic_tool(tool);
1177 }
1178 }
1179 }
1180
1181 let tool_defs = tool_executor.definitions();
1182
1183 let mut prompt_slots = opts
1185 .prompt_slots
1186 .clone()
1187 .unwrap_or_else(|| self.config.prompt_slots.clone());
1188
1189 let agents_md_path = canonical.join("AGENTS.md");
1191 if agents_md_path.exists() && agents_md_path.is_file() {
1192 match std::fs::read_to_string(&agents_md_path) {
1193 Ok(content) if !content.trim().is_empty() => {
1194 tracing::info!(
1195 path = %agents_md_path.display(),
1196 "Auto-loaded AGENTS.md from workspace root"
1197 );
1198 prompt_slots.extra = match prompt_slots.extra {
1199 Some(existing) => Some(format!(
1200 "{}\n\n# Project Instructions (AGENTS.md)\n\n{}",
1201 existing, content
1202 )),
1203 None => Some(format!("# Project Instructions (AGENTS.md)\n\n{}", content)),
1204 };
1205 }
1206 Ok(_) => {
1207 tracing::debug!(
1208 path = %agents_md_path.display(),
1209 "AGENTS.md exists but is empty — skipping"
1210 );
1211 }
1212 Err(e) => {
1213 tracing::warn!(
1214 path = %agents_md_path.display(),
1215 error = %e,
1216 "Failed to read AGENTS.md — skipping"
1217 );
1218 }
1219 }
1220 }
1221
1222 let base_registry = self
1226 .config
1227 .skill_registry
1228 .as_deref()
1229 .map(|r| r.fork())
1230 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
1231 if let Some(ref r) = opts.skill_registry {
1233 for skill in r.all() {
1234 base_registry.register_unchecked(skill);
1235 }
1236 }
1237 for dir in &opts.skill_dirs {
1239 if let Err(e) = base_registry.load_from_dir(dir) {
1240 tracing::warn!(
1241 dir = %dir.display(),
1242 error = %e,
1243 "Failed to load session skill dir — skipping"
1244 );
1245 }
1246 }
1247 let effective_registry = Arc::new(base_registry);
1248
1249 if !opts.plugins.is_empty() {
1252 use crate::plugin::PluginContext;
1253 let plugin_ctx = PluginContext::new()
1254 .with_llm(Arc::clone(&self.llm_client))
1255 .with_skill_registry(Arc::clone(&effective_registry))
1256 .with_document_parsers(Arc::clone(&effective_document_parser_registry));
1257 let plugin_registry = tool_executor.registry();
1258 for plugin in &opts.plugins {
1259 tracing::info!("Loading plugin '{}' v{}", plugin.name(), plugin.version());
1260 match plugin.load(plugin_registry, &plugin_ctx) {
1261 Ok(()) => {
1262 for skill in plugin.skills() {
1263 tracing::debug!(
1264 "Plugin '{}' registered skill '{}'",
1265 plugin.name(),
1266 skill.name
1267 );
1268 effective_registry.register_unchecked(skill);
1269 }
1270 }
1271 Err(e) => {
1272 tracing::error!("Plugin '{}' failed to load: {}", plugin.name(), e);
1273 }
1274 }
1275 }
1276 }
1277
1278 let skill_prompt = effective_registry.to_system_prompt();
1280 if !skill_prompt.is_empty() {
1281 prompt_slots.extra = match prompt_slots.extra {
1282 Some(existing) => Some(format!("{}\n\n{}", existing, skill_prompt)),
1283 None => Some(skill_prompt),
1284 };
1285 }
1286
1287 let mut init_warning: Option<String> = None;
1289 let memory = {
1290 let store = if let Some(ref store) = opts.memory_store {
1291 Some(Arc::clone(store))
1292 } else if let Some(ref dir) = opts.file_memory_dir {
1293 match tokio::runtime::Handle::try_current() {
1294 Ok(handle) => {
1295 let dir = dir.clone();
1296 match tokio::task::block_in_place(|| {
1297 handle.block_on(FileMemoryStore::new(dir))
1298 }) {
1299 Ok(store) => Some(Arc::new(store) as Arc<dyn MemoryStore>),
1300 Err(e) => {
1301 let msg = format!("Failed to create file memory store: {}", e);
1302 tracing::warn!("{}", msg);
1303 init_warning = Some(msg);
1304 None
1305 }
1306 }
1307 }
1308 Err(_) => {
1309 let msg =
1310 "No async runtime available for file memory store — memory disabled"
1311 .to_string();
1312 tracing::warn!("{}", msg);
1313 init_warning = Some(msg);
1314 None
1315 }
1316 }
1317 } else {
1318 None
1319 };
1320 store.map(|s| Arc::new(crate::memory::AgentMemory::new(s)))
1321 };
1322
1323 let base = self.config.clone();
1324 let config = AgentConfig {
1325 prompt_slots,
1326 tools: tool_defs,
1327 security_provider: opts.security_provider.clone(),
1328 permission_checker: opts.permission_checker.clone(),
1329 confirmation_manager: opts.confirmation_manager.clone(),
1330 context_providers: opts.context_providers.clone(),
1331 planning_mode: opts.planning_mode,
1332 goal_tracking: opts.goal_tracking,
1333 skill_registry: Some(Arc::clone(&effective_registry)),
1334 max_parse_retries: opts.max_parse_retries.unwrap_or(base.max_parse_retries),
1335 tool_timeout_ms: opts.tool_timeout_ms.or(base.tool_timeout_ms),
1336 circuit_breaker_threshold: opts
1337 .circuit_breaker_threshold
1338 .unwrap_or(base.circuit_breaker_threshold),
1339 auto_compact: opts.auto_compact,
1340 auto_compact_threshold: opts
1341 .auto_compact_threshold
1342 .unwrap_or(crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD),
1343 max_context_tokens: base.max_context_tokens,
1344 llm_client: Some(Arc::clone(&llm_client)),
1345 memory: memory.clone(),
1346 continuation_enabled: opts
1347 .continuation_enabled
1348 .unwrap_or(base.continuation_enabled),
1349 max_continuation_turns: opts
1350 .max_continuation_turns
1351 .unwrap_or(base.max_continuation_turns),
1352 max_tool_rounds: opts.max_tool_rounds.unwrap_or(base.max_tool_rounds),
1353 ..base
1354 };
1355
1356 {
1360 use crate::tools::register_skill;
1361 register_skill(
1362 tool_executor.registry(),
1363 Arc::clone(&llm_client),
1364 Arc::clone(&effective_registry),
1365 Arc::clone(&tool_executor),
1366 config.clone(),
1367 );
1368 }
1369
1370 let (agent_event_tx, _) = broadcast::channel::<crate::agent::AgentEvent>(256);
1373 let command_queue = if let Some(ref queue_config) = opts.queue_config {
1374 let session_id = uuid::Uuid::new_v4().to_string();
1375 let rt = tokio::runtime::Handle::try_current();
1376
1377 match rt {
1378 Ok(handle) => {
1379 let queue = tokio::task::block_in_place(|| {
1381 handle.block_on(SessionLaneQueue::new(
1382 &session_id,
1383 queue_config.clone(),
1384 agent_event_tx.clone(),
1385 ))
1386 });
1387 match queue {
1388 Ok(q) => {
1389 let q = Arc::new(q);
1391 let q2 = Arc::clone(&q);
1392 tokio::task::block_in_place(|| {
1393 handle.block_on(async { q2.start().await.ok() })
1394 });
1395 Some(q)
1396 }
1397 Err(e) => {
1398 tracing::warn!("Failed to create session lane queue: {}", e);
1399 None
1400 }
1401 }
1402 }
1403 Err(_) => {
1404 tracing::warn!(
1405 "No async runtime available for queue creation — queue disabled"
1406 );
1407 None
1408 }
1409 }
1410 } else {
1411 None
1412 };
1413
1414 let mut tool_context = ToolContext::new(canonical.clone());
1416 if let Some(ref search_config) = self.code_config.search {
1417 tool_context = tool_context.with_search_config(search_config.clone());
1418 }
1419 tool_context = tool_context.with_agentic_search_config(search_builtin_cfg);
1420 tool_context = tool_context.with_agentic_parse_config(parse_builtin_cfg);
1421 tool_context = tool_context.with_document_parser_config(document_parser_cfg);
1422 tool_context = tool_context.with_agent_event_tx(agent_event_tx);
1423
1424 tool_context =
1426 tool_context.with_document_parsers(Arc::clone(&effective_document_parser_registry));
1427 tool_context =
1428 tool_context.with_document_pipeline(Arc::clone(&effective_document_pipeline_registry));
1429
1430 if let Some(handle) = opts.sandbox_handle.clone() {
1432 tool_executor.registry().set_sandbox(Arc::clone(&handle));
1433 tool_context = tool_context.with_sandbox(handle);
1434 } else if opts.sandbox_config.is_some() {
1435 tracing::warn!(
1436 "sandbox_config is set but no sandbox_handle was provided \
1437 — bash commands will run locally"
1438 );
1439 }
1440
1441 let session_id = opts
1442 .session_id
1443 .clone()
1444 .unwrap_or_else(|| uuid::Uuid::new_v4().to_string());
1445
1446 let session_store = if opts.session_store.is_some() {
1448 opts.session_store.clone()
1449 } else if let Some(ref dir) = self.code_config.sessions_dir {
1450 match tokio::runtime::Handle::try_current() {
1451 Ok(handle) => {
1452 let dir = dir.clone();
1453 match tokio::task::block_in_place(|| {
1454 handle.block_on(crate::store::FileSessionStore::new(dir))
1455 }) {
1456 Ok(store) => Some(Arc::new(store) as Arc<dyn crate::store::SessionStore>),
1457 Err(e) => {
1458 tracing::warn!(
1459 "Failed to create session store from sessions_dir: {}",
1460 e
1461 );
1462 None
1463 }
1464 }
1465 }
1466 Err(_) => {
1467 tracing::warn!(
1468 "No async runtime for sessions_dir store — persistence disabled"
1469 );
1470 None
1471 }
1472 }
1473 } else {
1474 None
1475 };
1476
1477 let (cron_scheduler, cron_rx) = CronScheduler::new();
1481 let mut command_registry = CommandRegistry::new();
1482 command_registry.register(Arc::new(LoopCommand {
1483 scheduler: Arc::clone(&cron_scheduler),
1484 }));
1485 command_registry.register(Arc::new(CronListCommand {
1486 scheduler: Arc::clone(&cron_scheduler),
1487 }));
1488 command_registry.register(Arc::new(CronCancelCommand {
1489 scheduler: Arc::clone(&cron_scheduler),
1490 }));
1491
1492 Ok(AgentSession {
1493 llm_client,
1494 tool_executor,
1495 tool_context,
1496 memory: config.memory.clone(),
1497 config,
1498 workspace: canonical,
1499 session_id,
1500 history: RwLock::new(Vec::new()),
1501 command_queue,
1502 session_store,
1503 auto_save: opts.auto_save,
1504 hook_engine: Arc::new(crate::hooks::HookEngine::new()),
1505 ahp_executor: opts.hook_executor.clone(),
1506 init_warning,
1507 command_registry: std::sync::Mutex::new(command_registry),
1508 model_name: opts
1509 .model
1510 .clone()
1511 .or_else(|| self.code_config.default_model.clone())
1512 .unwrap_or_else(|| "unknown".to_string()),
1513 mcp_manager: opts
1514 .mcp_manager
1515 .clone()
1516 .or_else(|| self.global_mcp.clone())
1517 .unwrap_or_else(|| Arc::new(crate::mcp::manager::McpManager::new())),
1518 agent_registry,
1519 cron_scheduler,
1520 cron_rx: tokio::sync::Mutex::new(cron_rx),
1521 is_processing_cron: AtomicBool::new(false),
1522 cron_started: AtomicBool::new(false),
1523 cancel_token: Arc::new(tokio::sync::Mutex::new(None)),
1524 active_tools: Arc::new(tokio::sync::RwLock::new(HashMap::new())),
1525 task_manager: Arc::new(TaskManager::new()),
1526 progress_tracker: Arc::new(tokio::sync::RwLock::new(ProgressTracker::new(30))),
1527 })
1528 }
1529}
1530
1531#[derive(Debug, Clone)]
1540pub struct BtwResult {
1541 pub question: String,
1543 pub answer: String,
1545 pub usage: crate::llm::TokenUsage,
1547}
1548
1549pub struct AgentSession {
1558 llm_client: Arc<dyn LlmClient>,
1559 tool_executor: Arc<ToolExecutor>,
1560 tool_context: ToolContext,
1561 config: AgentConfig,
1562 workspace: PathBuf,
1563 session_id: String,
1565 history: RwLock<Vec<Message>>,
1567 command_queue: Option<Arc<SessionLaneQueue>>,
1569 memory: Option<Arc<crate::memory::AgentMemory>>,
1571 session_store: Option<Arc<dyn crate::store::SessionStore>>,
1573 auto_save: bool,
1575 hook_engine: Arc<crate::hooks::HookEngine>,
1577 ahp_executor: Option<Arc<dyn crate::hooks::HookExecutor>>,
1580 init_warning: Option<String>,
1582 command_registry: std::sync::Mutex<CommandRegistry>,
1585 model_name: String,
1587 mcp_manager: Arc<crate::mcp::manager::McpManager>,
1589 agent_registry: Arc<crate::subagent::AgentRegistry>,
1591 cron_scheduler: Arc<CronScheduler>,
1593 cron_rx: tokio::sync::Mutex<mpsc::UnboundedReceiver<ScheduledFire>>,
1595 is_processing_cron: AtomicBool,
1597 cron_started: AtomicBool,
1601 cancel_token: Arc<tokio::sync::Mutex<Option<tokio_util::sync::CancellationToken>>>,
1604 active_tools: Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1606 task_manager: Arc<TaskManager>,
1608 progress_tracker: Arc<tokio::sync::RwLock<ProgressTracker>>,
1610}
1611
1612#[derive(Debug, Clone)]
1613struct ActiveToolSnapshot {
1614 tool_name: String,
1615 started_at_ms: u64,
1616}
1617
1618impl std::fmt::Debug for AgentSession {
1619 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
1620 f.debug_struct("AgentSession")
1621 .field("session_id", &self.session_id)
1622 .field("workspace", &self.workspace.display().to_string())
1623 .field("auto_save", &self.auto_save)
1624 .finish()
1625 }
1626}
1627
1628impl AgentSession {
1629 fn now_ms() -> u64 {
1630 std::time::SystemTime::now()
1631 .duration_since(std::time::UNIX_EPOCH)
1632 .map(|d| d.as_millis() as u64)
1633 .unwrap_or(0)
1634 }
1635
1636 fn compact_json_value(value: &serde_json::Value) -> String {
1637 let raw = match value {
1638 serde_json::Value::Null => String::new(),
1639 serde_json::Value::String(s) => s.clone(),
1640 _ => serde_json::to_string(value).unwrap_or_default(),
1641 };
1642 let compact = raw.split_whitespace().collect::<Vec<_>>().join(" ");
1643 if compact.len() > 180 {
1644 format!("{}...", truncate_utf8(&compact, 180))
1645 } else {
1646 compact
1647 }
1648 }
1649
1650 async fn apply_runtime_event(
1651 active_tools: &Arc<tokio::sync::RwLock<HashMap<String, ActiveToolSnapshot>>>,
1652 event: &AgentEvent,
1653 ) {
1654 match event {
1655 AgentEvent::ToolStart { id, name } => {
1656 active_tools.write().await.insert(
1657 id.clone(),
1658 ActiveToolSnapshot {
1659 tool_name: name.clone(),
1660 started_at_ms: Self::now_ms(),
1661 },
1662 );
1663 }
1664 AgentEvent::ToolEnd { id, .. }
1665 | AgentEvent::PermissionDenied { tool_id: id, .. }
1666 | AgentEvent::ConfirmationRequired { tool_id: id, .. }
1667 | AgentEvent::ConfirmationReceived { tool_id: id, .. }
1668 | AgentEvent::ConfirmationTimeout { tool_id: id, .. } => {
1669 active_tools.write().await.remove(id);
1670 }
1671 _ => {}
1672 }
1673 }
1674
1675 async fn clear_runtime_tracking(&self) {
1676 self.active_tools.write().await.clear();
1677 }
1678
1679 fn build_agent_loop(&self) -> AgentLoop {
1683 let mut config = self.config.clone();
1684 config.hook_engine = Some(if let Some(ref ahp) = self.ahp_executor {
1685 ahp.clone()
1686 } else {
1687 Arc::clone(&self.hook_engine) as Arc<dyn crate::hooks::HookExecutor>
1688 });
1689 config.tools = self.tool_executor.definitions();
1693 let mut agent_loop = AgentLoop::new(
1694 self.llm_client.clone(),
1695 self.tool_executor.clone(),
1696 self.tool_context.clone(),
1697 config,
1698 );
1699 if let Some(ref queue) = self.command_queue {
1700 agent_loop = agent_loop.with_queue(Arc::clone(queue));
1701 }
1702 agent_loop = agent_loop.with_progress_tracker(Arc::clone(&self.progress_tracker));
1703 agent_loop = agent_loop.with_task_manager(Arc::clone(&self.task_manager));
1704 agent_loop
1705 }
1706
1707 fn build_command_context(&self) -> CommandContext {
1709 let history = read_or_recover(&self.history);
1710
1711 let tool_names: Vec<String> = self.config.tools.iter().map(|t| t.name.clone()).collect();
1713
1714 let mut mcp_map: std::collections::HashMap<String, usize> =
1716 std::collections::HashMap::new();
1717 for name in &tool_names {
1718 if let Some(rest) = name.strip_prefix("mcp__") {
1719 if let Some((server, _)) = rest.split_once("__") {
1720 *mcp_map.entry(server.to_string()).or_default() += 1;
1721 }
1722 }
1723 }
1724 let mut mcp_servers: Vec<(String, usize)> = mcp_map.into_iter().collect();
1725 mcp_servers.sort_by(|a, b| a.0.cmp(&b.0));
1726
1727 CommandContext {
1728 session_id: self.session_id.clone(),
1729 workspace: self.workspace.display().to_string(),
1730 model: self.model_name.clone(),
1731 history_len: history.len(),
1732 total_tokens: 0,
1733 total_cost: 0.0,
1734 tool_names,
1735 mcp_servers,
1736 }
1737 }
1738
1739 pub fn command_registry(&self) -> std::sync::MutexGuard<'_, CommandRegistry> {
1743 self.command_registry
1744 .lock()
1745 .expect("command_registry lock poisoned")
1746 }
1747
1748 pub fn register_command(&self, cmd: Arc<dyn crate::commands::SlashCommand>) {
1752 self.command_registry
1753 .lock()
1754 .expect("command_registry lock poisoned")
1755 .register(cmd);
1756 }
1757
1758 pub fn cron_scheduler(&self) -> &Arc<CronScheduler> {
1760 &self.cron_scheduler
1761 }
1762
1763 pub async fn close(&self) {
1765 let _ = self.cancel().await;
1766 self.cron_scheduler.stop();
1767 }
1768
1769 fn ensure_cron_started(&self) {
1774 if !self.cron_started.swap(true, Ordering::Relaxed) {
1775 CronScheduler::start(Arc::clone(&self.cron_scheduler));
1776 }
1777 }
1778
1779 pub async fn send(&self, prompt: &str, history: Option<&[Message]>) -> Result<AgentResult> {
1788 self.ensure_cron_started();
1791
1792 if CommandRegistry::is_command(prompt) {
1794 let ctx = self.build_command_context();
1795 let output = self.command_registry().dispatch(prompt, &ctx);
1796 if let Some(output) = output {
1798 if let Some(CommandAction::BtwQuery(ref question)) = output.action {
1800 let result = self.btw(question).await?;
1801 return Ok(AgentResult {
1802 text: result.answer,
1803 messages: history
1804 .map(|h| h.to_vec())
1805 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1806 tool_calls_count: 0,
1807 usage: result.usage,
1808 });
1809 }
1810 return Ok(AgentResult {
1811 text: output.text,
1812 messages: history
1813 .map(|h| h.to_vec())
1814 .unwrap_or_else(|| read_or_recover(&self.history).clone()),
1815 tool_calls_count: 0,
1816 usage: crate::llm::TokenUsage::default(),
1817 });
1818 }
1819 }
1820
1821 if let Some(ref w) = self.init_warning {
1822 tracing::warn!(session_id = %self.session_id, "Session init warning: {}", w);
1823 }
1824 let agent_loop = self.build_agent_loop();
1825 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
1826 let runtime_state = Arc::clone(&self.active_tools);
1827 let runtime_collector = tokio::spawn(async move {
1828 while let Some(event) = runtime_rx.recv().await {
1829 AgentSession::apply_runtime_event(&runtime_state, &event).await;
1830 }
1831 });
1832
1833 let use_internal = history.is_none();
1834 let effective_history = match history {
1835 Some(h) => h.to_vec(),
1836 None => read_or_recover(&self.history).clone(),
1837 };
1838
1839 let cancel_token = tokio_util::sync::CancellationToken::new();
1840 *self.cancel_token.lock().await = Some(cancel_token.clone());
1841 let result = agent_loop
1842 .execute_with_session(
1843 &effective_history,
1844 prompt,
1845 Some(&self.session_id),
1846 Some(runtime_tx),
1847 Some(&cancel_token),
1848 )
1849 .await;
1850 *self.cancel_token.lock().await = None;
1851 let _ = runtime_collector.await;
1852 let result = result?;
1853
1854 if use_internal {
1857 *write_or_recover(&self.history) = result.messages.clone();
1858
1859 if self.auto_save {
1861 if let Err(e) = self.save().await {
1862 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
1863 }
1864 }
1865 }
1866
1867 if !self.is_processing_cron.swap(true, Ordering::Relaxed) {
1872 let fires = {
1873 let mut rx = self.cron_rx.lock().await;
1874 let mut fires: Vec<ScheduledFire> = Vec::new();
1875 while let Ok(fire) = rx.try_recv() {
1876 fires.push(fire);
1877 }
1878 fires
1879 };
1880 for fire in fires {
1881 tracing::debug!(
1882 task_id = %fire.task_id,
1883 "Firing scheduled cron task"
1884 );
1885 if let Err(e) = Box::pin(self.send(&fire.prompt, None)).await {
1886 tracing::warn!(
1887 task_id = %fire.task_id,
1888 "Scheduled task failed: {e}"
1889 );
1890 }
1891 }
1892 self.is_processing_cron.store(false, Ordering::Relaxed);
1893 }
1894
1895 self.clear_runtime_tracking().await;
1896
1897 Ok(result)
1898 }
1899
1900 async fn build_btw_runtime_context(&self) -> String {
1901 let mut sections = Vec::new();
1902
1903 let active_tools = {
1904 let tools = self.active_tools.read().await;
1905 let mut items = tools
1906 .iter()
1907 .map(|(tool_id, tool)| {
1908 let elapsed_ms = Self::now_ms().saturating_sub(tool.started_at_ms);
1909 format!(
1910 "- {} [{}] running_for={}ms",
1911 tool.tool_name, tool_id, elapsed_ms
1912 )
1913 })
1914 .collect::<Vec<_>>();
1915 items.sort();
1916 items
1917 };
1918 if !active_tools.is_empty() {
1919 sections.push(format!("[active tools]\n{}", active_tools.join("\n")));
1920 }
1921
1922 if let Some(cm) = &self.config.confirmation_manager {
1923 let pending = cm.pending_confirmations().await;
1924 if !pending.is_empty() {
1925 let mut lines = pending
1926 .into_iter()
1927 .map(
1928 |PendingConfirmationInfo {
1929 tool_id,
1930 tool_name,
1931 args,
1932 remaining_ms,
1933 }| {
1934 let arg_summary = Self::compact_json_value(&args);
1935 if arg_summary.is_empty() {
1936 format!(
1937 "- {} [{}] remaining={}ms",
1938 tool_name, tool_id, remaining_ms
1939 )
1940 } else {
1941 format!(
1942 "- {} [{}] remaining={}ms {}",
1943 tool_name, tool_id, remaining_ms, arg_summary
1944 )
1945 }
1946 },
1947 )
1948 .collect::<Vec<_>>();
1949 lines.sort();
1950 sections.push(format!("[pending confirmations]\n{}", lines.join("\n")));
1951 }
1952 }
1953
1954 if let Some(queue) = &self.command_queue {
1955 let stats = queue.stats().await;
1956 if stats.total_active > 0 || stats.total_pending > 0 || stats.external_pending > 0 {
1957 let mut lines = vec![format!(
1958 "active={}, pending={}, external_pending={}",
1959 stats.total_active, stats.total_pending, stats.external_pending
1960 )];
1961 let mut lanes = stats
1962 .lanes
1963 .into_values()
1964 .filter(|lane| lane.active > 0 || lane.pending > 0)
1965 .map(|lane| {
1966 format!(
1967 "- {:?}: active={}, pending={}, handler={:?}",
1968 lane.lane, lane.active, lane.pending, lane.handler_mode
1969 )
1970 })
1971 .collect::<Vec<_>>();
1972 lanes.sort();
1973 lines.extend(lanes);
1974 sections.push(format!("[session queue]\n{}", lines.join("\n")));
1975 }
1976
1977 let external_tasks = queue.pending_external_tasks().await;
1978 if !external_tasks.is_empty() {
1979 let mut lines = external_tasks
1980 .into_iter()
1981 .take(6)
1982 .map(|task| {
1983 let payload_summary = Self::compact_json_value(&task.payload);
1984 if payload_summary.is_empty() {
1985 format!(
1986 "- {} {:?} remaining={}ms",
1987 task.command_type,
1988 task.lane,
1989 task.remaining_ms()
1990 )
1991 } else {
1992 format!(
1993 "- {} {:?} remaining={}ms {}",
1994 task.command_type,
1995 task.lane,
1996 task.remaining_ms(),
1997 payload_summary
1998 )
1999 }
2000 })
2001 .collect::<Vec<_>>();
2002 lines.sort();
2003 sections.push(format!("[pending external tasks]\n{}", lines.join("\n")));
2004 }
2005 }
2006
2007 if let Some(store) = &self.session_store {
2008 if let Ok(Some(session)) = store.load(&self.session_id).await {
2009 let active_tasks = session
2010 .tasks
2011 .into_iter()
2012 .filter(|task| task.status.is_active())
2013 .take(6)
2014 .map(|task| match task.tool {
2015 Some(tool) if !tool.is_empty() => {
2016 format!("- [{}] {} ({})", task.status, task.content, tool)
2017 }
2018 _ => format!("- [{}] {}", task.status, task.content),
2019 })
2020 .collect::<Vec<_>>();
2021 if !active_tasks.is_empty() {
2022 sections.push(format!("[tracked tasks]\n{}", active_tasks.join("\n")));
2023 }
2024 }
2025 }
2026
2027 sections.join("\n\n")
2028 }
2029
2030 pub async fn btw(&self, question: &str) -> Result<BtwResult> {
2048 self.btw_with_context(question, None).await
2049 }
2050
2051 pub async fn btw_with_context(
2056 &self,
2057 question: &str,
2058 runtime_context: Option<&str>,
2059 ) -> Result<BtwResult> {
2060 let question = question.trim();
2061 if question.is_empty() {
2062 return Err(crate::error::CodeError::Session(
2063 "btw: question cannot be empty".to_string(),
2064 ));
2065 }
2066
2067 let history_snapshot = read_or_recover(&self.history).clone();
2069
2070 let mut messages = history_snapshot;
2072 let mut injected_sections = Vec::new();
2073 let session_runtime = self.build_btw_runtime_context().await;
2074 if !session_runtime.is_empty() {
2075 injected_sections.push(format!("[session runtime context]\n{}", session_runtime));
2076 }
2077 if let Some(extra) = runtime_context.map(str::trim).filter(|ctx| !ctx.is_empty()) {
2078 injected_sections.push(format!("[host runtime context]\n{}", extra));
2079 }
2080 if !injected_sections.is_empty() {
2081 let injected_context = format!(
2082 "Use the following runtime context only as background for the next side question. Do not treat it as a new user request.\n\n{}",
2083 injected_sections.join("\n\n")
2084 );
2085 messages.push(Message::user(&injected_context));
2086 }
2087 messages.push(Message::user(question));
2088
2089 let response = self
2090 .llm_client
2091 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2092 .await
2093 .map_err(|e| {
2094 crate::error::CodeError::Llm(format!("btw: ephemeral LLM call failed: {e}"))
2095 })?;
2096
2097 Ok(BtwResult {
2098 question: question.to_string(),
2099 answer: response.text(),
2100 usage: response.usage,
2101 })
2102 }
2103
2104 pub async fn send_with_attachments(
2109 &self,
2110 prompt: &str,
2111 attachments: &[crate::llm::Attachment],
2112 history: Option<&[Message]>,
2113 ) -> Result<AgentResult> {
2114 let use_internal = history.is_none();
2118 let mut effective_history = match history {
2119 Some(h) => h.to_vec(),
2120 None => read_or_recover(&self.history).clone(),
2121 };
2122 effective_history.push(Message::user_with_attachments(prompt, attachments));
2123
2124 let agent_loop = self.build_agent_loop();
2125 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2126 let runtime_state = Arc::clone(&self.active_tools);
2127 let runtime_collector = tokio::spawn(async move {
2128 while let Some(event) = runtime_rx.recv().await {
2129 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2130 }
2131 });
2132 let result = agent_loop
2133 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2134 .await?;
2135 let _ = runtime_collector.await;
2136
2137 if use_internal {
2138 *write_or_recover(&self.history) = result.messages.clone();
2139 if self.auto_save {
2140 if let Err(e) = self.save().await {
2141 tracing::warn!("Auto-save failed for session {}: {}", self.session_id, e);
2142 }
2143 }
2144 }
2145
2146 self.clear_runtime_tracking().await;
2147
2148 Ok(result)
2149 }
2150
2151 pub async fn stream_with_attachments(
2156 &self,
2157 prompt: &str,
2158 attachments: &[crate::llm::Attachment],
2159 history: Option<&[Message]>,
2160 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2161 let (tx, rx) = mpsc::channel(256);
2162 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2163 let mut effective_history = match history {
2164 Some(h) => h.to_vec(),
2165 None => read_or_recover(&self.history).clone(),
2166 };
2167 effective_history.push(Message::user_with_attachments(prompt, attachments));
2168
2169 let agent_loop = self.build_agent_loop();
2170 let runtime_state = Arc::clone(&self.active_tools);
2171 let forwarder = tokio::spawn(async move {
2172 let mut forward_enabled = true;
2173 while let Some(event) = runtime_rx.recv().await {
2174 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2175 if forward_enabled && tx.send(event).await.is_err() {
2176 forward_enabled = false;
2177 }
2178 }
2179 });
2180 let handle = tokio::spawn(async move {
2181 let _ = agent_loop
2182 .execute_from_messages(effective_history, None, Some(runtime_tx), None)
2183 .await;
2184 });
2185 let active_tools = Arc::clone(&self.active_tools);
2186 let wrapped_handle = tokio::spawn(async move {
2187 let _ = handle.await;
2188 let _ = forwarder.await;
2189 active_tools.write().await.clear();
2190 });
2191
2192 Ok((rx, wrapped_handle))
2193 }
2194
2195 pub async fn stream(
2205 &self,
2206 prompt: &str,
2207 history: Option<&[Message]>,
2208 ) -> Result<(mpsc::Receiver<AgentEvent>, JoinHandle<()>)> {
2209 self.ensure_cron_started();
2210
2211 if CommandRegistry::is_command(prompt) {
2213 let ctx = self.build_command_context();
2214 let output = self.command_registry().dispatch(prompt, &ctx);
2215 if let Some(output) = output {
2217 let (tx, rx) = mpsc::channel(256);
2218
2219 if let Some(CommandAction::BtwQuery(question)) = output.action {
2221 let llm_client = self.llm_client.clone();
2223 let history_snapshot = read_or_recover(&self.history).clone();
2224 let handle = tokio::spawn(async move {
2225 let mut messages = history_snapshot;
2226 messages.push(Message::user(&question));
2227 match llm_client
2228 .complete(&messages, Some(crate::prompts::BTW_SYSTEM), &[])
2229 .await
2230 {
2231 Ok(response) => {
2232 let answer = response.text();
2233 let _ = tx
2234 .send(AgentEvent::BtwAnswer {
2235 question: question.clone(),
2236 answer: answer.clone(),
2237 usage: response.usage,
2238 })
2239 .await;
2240 let _ = tx
2241 .send(AgentEvent::End {
2242 text: answer,
2243 usage: crate::llm::TokenUsage::default(),
2244 meta: None,
2245 })
2246 .await;
2247 }
2248 Err(e) => {
2249 let _ = tx
2250 .send(AgentEvent::Error {
2251 message: format!("btw failed: {e}"),
2252 })
2253 .await;
2254 }
2255 }
2256 });
2257 return Ok((rx, handle));
2258 }
2259
2260 let handle = tokio::spawn(async move {
2261 let _ = tx
2262 .send(AgentEvent::TextDelta {
2263 text: output.text.clone(),
2264 })
2265 .await;
2266 let _ = tx
2267 .send(AgentEvent::End {
2268 text: output.text.clone(),
2269 usage: crate::llm::TokenUsage::default(),
2270 meta: None,
2271 })
2272 .await;
2273 });
2274 return Ok((rx, handle));
2275 }
2276 }
2277
2278 let (tx, rx) = mpsc::channel(256);
2279 let (runtime_tx, mut runtime_rx) = mpsc::channel(256);
2280 let agent_loop = self.build_agent_loop();
2281 let effective_history = match history {
2282 Some(h) => h.to_vec(),
2283 None => read_or_recover(&self.history).clone(),
2284 };
2285 let prompt = prompt.to_string();
2286 let session_id = self.session_id.clone();
2287
2288 let cancel_token = tokio_util::sync::CancellationToken::new();
2289 *self.cancel_token.lock().await = Some(cancel_token.clone());
2290 let token_clone = cancel_token.clone();
2291 let runtime_state = Arc::clone(&self.active_tools);
2292 let forwarder = tokio::spawn(async move {
2293 let mut forward_enabled = true;
2294 while let Some(event) = runtime_rx.recv().await {
2295 AgentSession::apply_runtime_event(&runtime_state, &event).await;
2296 if forward_enabled && tx.send(event).await.is_err() {
2297 forward_enabled = false;
2298 }
2299 }
2300 });
2301
2302 let handle = tokio::spawn(async move {
2303 let _ = agent_loop
2304 .execute_with_session(
2305 &effective_history,
2306 &prompt,
2307 Some(&session_id),
2308 Some(runtime_tx),
2309 Some(&token_clone),
2310 )
2311 .await;
2312 });
2313
2314 let cancel_token_ref = self.cancel_token.clone();
2316 let active_tools = Arc::clone(&self.active_tools);
2317 let wrapped_handle = tokio::spawn(async move {
2318 let _ = handle.await;
2319 let _ = forwarder.await;
2320 *cancel_token_ref.lock().await = None;
2321 active_tools.write().await.clear();
2322 });
2323
2324 Ok((rx, wrapped_handle))
2325 }
2326
2327 pub async fn cancel(&self) -> bool {
2334 let token = self.cancel_token.lock().await.clone();
2335 if let Some(token) = token {
2336 token.cancel();
2337 tracing::info!(session_id = %self.session_id, "Cancelled ongoing operation");
2338 true
2339 } else {
2340 tracing::debug!(session_id = %self.session_id, "No ongoing operation to cancel");
2341 false
2342 }
2343 }
2344
2345 pub fn history(&self) -> Vec<Message> {
2347 read_or_recover(&self.history).clone()
2348 }
2349
2350 pub fn memory(&self) -> Option<&Arc<crate::memory::AgentMemory>> {
2352 self.memory.as_ref()
2353 }
2354
2355 pub fn id(&self) -> &str {
2357 &self.session_id
2358 }
2359
2360 pub fn workspace(&self) -> &std::path::Path {
2362 &self.workspace
2363 }
2364
2365 pub fn init_warning(&self) -> Option<&str> {
2367 self.init_warning.as_deref()
2368 }
2369
2370 pub fn session_id(&self) -> &str {
2372 &self.session_id
2373 }
2374
2375 pub fn tool_definitions(&self) -> Vec<crate::llm::ToolDefinition> {
2381 self.tool_executor.definitions()
2382 }
2383
2384 pub fn tool_names(&self) -> Vec<String> {
2390 self.tool_executor
2391 .definitions()
2392 .into_iter()
2393 .map(|t| t.name)
2394 .collect()
2395 }
2396
2397 pub fn task_manager(&self) -> &Arc<TaskManager> {
2406 &self.task_manager
2407 }
2408
2409 pub fn spawn_task(&self, task: crate::task::Task) -> crate::task::TaskId {
2424 self.task_manager.spawn(task)
2425 }
2426
2427 pub fn track_tool_call(&self, tool_name: &str, args_summary: &str, success: bool) {
2431 if let Ok(mut guard) = self.progress_tracker.try_write() {
2432 guard.track_tool_call(tool_name, args_summary, success);
2433 }
2434 }
2435
2436 pub async fn get_progress(&self) -> crate::task::AgentProgress {
2440 self.progress_tracker.read().await.progress()
2441 }
2442
2443 pub fn subscribe_tasks(
2447 &self,
2448 task_id: crate::task::TaskId,
2449 ) -> Option<tokio::sync::broadcast::Receiver<crate::task::manager::TaskEvent>> {
2450 self.task_manager.subscribe(task_id)
2451 }
2452
2453 pub fn subscribe_all_tasks(
2455 &self,
2456 ) -> tokio::sync::broadcast::Receiver<crate::task::manager::TaskEvent> {
2457 self.task_manager.subscribe_all()
2458 }
2459
2460 pub fn register_hook(&self, hook: crate::hooks::Hook) {
2466 self.hook_engine.register(hook);
2467 }
2468
2469 pub fn unregister_hook(&self, hook_id: &str) -> Option<crate::hooks::Hook> {
2471 self.hook_engine.unregister(hook_id)
2472 }
2473
2474 pub fn register_hook_handler(
2476 &self,
2477 hook_id: &str,
2478 handler: Arc<dyn crate::hooks::HookHandler>,
2479 ) {
2480 self.hook_engine.register_handler(hook_id, handler);
2481 }
2482
2483 pub fn unregister_hook_handler(&self, hook_id: &str) {
2485 self.hook_engine.unregister_handler(hook_id);
2486 }
2487
2488 pub fn hook_count(&self) -> usize {
2490 self.hook_engine.hook_count()
2491 }
2492
2493 pub async fn save(&self) -> Result<()> {
2497 let store = match &self.session_store {
2498 Some(s) => s,
2499 None => return Ok(()),
2500 };
2501
2502 let history = read_or_recover(&self.history).clone();
2503 let now = chrono::Utc::now().timestamp();
2504
2505 let data = crate::store::SessionData {
2506 id: self.session_id.clone(),
2507 config: crate::session::SessionConfig {
2508 name: String::new(),
2509 workspace: self.workspace.display().to_string(),
2510 system_prompt: Some(self.config.prompt_slots.build()),
2511 max_context_length: 200_000,
2512 auto_compact: false,
2513 auto_compact_threshold: crate::session::DEFAULT_AUTO_COMPACT_THRESHOLD,
2514 storage_type: crate::config::StorageBackend::File,
2515 queue_config: None,
2516 confirmation_policy: None,
2517 permission_policy: None,
2518 parent_id: None,
2519 security_config: None,
2520 hook_engine: None,
2521 planning_mode: self.config.planning_mode,
2522 goal_tracking: self.config.goal_tracking,
2523 },
2524 state: crate::session::SessionState::Active,
2525 messages: history,
2526 context_usage: crate::session::ContextUsage::default(),
2527 total_usage: crate::llm::TokenUsage::default(),
2528 total_cost: 0.0,
2529 model_name: None,
2530 cost_records: Vec::new(),
2531 tool_names: crate::store::SessionData::tool_names_from_definitions(&self.config.tools),
2532 thinking_enabled: false,
2533 thinking_budget: None,
2534 created_at: now,
2535 updated_at: now,
2536 llm_config: None,
2537 tasks: Vec::new(),
2538 parent_id: None,
2539 };
2540
2541 store.save(&data).await?;
2542 tracing::debug!("Session {} saved", self.session_id);
2543 Ok(())
2544 }
2545
2546 pub async fn read_file(&self, path: &str) -> Result<String> {
2548 let args = serde_json::json!({ "file_path": path });
2549 let result = self.tool_executor.execute("read", &args).await?;
2550 Ok(result.output)
2551 }
2552
2553 pub async fn bash(&self, command: &str) -> Result<String> {
2558 let args = serde_json::json!({ "command": command });
2559 let result = self
2560 .tool_executor
2561 .execute_with_context("bash", &args, &self.tool_context)
2562 .await?;
2563 Ok(result.output)
2564 }
2565
2566 pub async fn glob(&self, pattern: &str) -> Result<Vec<String>> {
2568 let args = serde_json::json!({ "pattern": pattern });
2569 let result = self.tool_executor.execute("glob", &args).await?;
2570 let files: Vec<String> = result
2571 .output
2572 .lines()
2573 .filter(|l| !l.is_empty())
2574 .map(|l| l.to_string())
2575 .collect();
2576 Ok(files)
2577 }
2578
2579 pub async fn grep(&self, pattern: &str) -> Result<String> {
2581 let args = serde_json::json!({ "pattern": pattern });
2582 let result = self.tool_executor.execute("grep", &args).await?;
2583 Ok(result.output)
2584 }
2585
2586 pub async fn tool(&self, name: &str, args: serde_json::Value) -> Result<ToolCallResult> {
2588 let result = self.tool_executor.execute(name, &args).await?;
2589 Ok(ToolCallResult {
2590 name: name.to_string(),
2591 output: result.output,
2592 exit_code: result.exit_code,
2593 metadata: result.metadata,
2594 })
2595 }
2596
2597 pub fn has_queue(&self) -> bool {
2603 self.command_queue.is_some()
2604 }
2605
2606 pub async fn set_lane_handler(&self, lane: SessionLane, config: LaneHandlerConfig) {
2610 if let Some(ref queue) = self.command_queue {
2611 queue.set_lane_handler(lane, config).await;
2612 }
2613 }
2614
2615 pub async fn complete_external_task(&self, task_id: &str, result: ExternalTaskResult) -> bool {
2619 if let Some(ref queue) = self.command_queue {
2620 queue.complete_external_task(task_id, result).await
2621 } else {
2622 false
2623 }
2624 }
2625
2626 pub async fn pending_external_tasks(&self) -> Vec<ExternalTask> {
2628 if let Some(ref queue) = self.command_queue {
2629 queue.pending_external_tasks().await
2630 } else {
2631 Vec::new()
2632 }
2633 }
2634
2635 pub async fn queue_stats(&self) -> SessionQueueStats {
2637 if let Some(ref queue) = self.command_queue {
2638 queue.stats().await
2639 } else {
2640 SessionQueueStats::default()
2641 }
2642 }
2643
2644 pub async fn queue_metrics(&self) -> Option<MetricsSnapshot> {
2646 if let Some(ref queue) = self.command_queue {
2647 queue.metrics_snapshot().await
2648 } else {
2649 None
2650 }
2651 }
2652
2653 pub async fn submit(
2659 &self,
2660 lane: SessionLane,
2661 command: Box<dyn crate::queue::SessionCommand>,
2662 ) -> anyhow::Result<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>> {
2663 let queue = self
2664 .command_queue
2665 .as_ref()
2666 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2667 Ok(queue.submit(lane, command).await)
2668 }
2669
2670 pub async fn submit_batch(
2677 &self,
2678 lane: SessionLane,
2679 commands: Vec<Box<dyn crate::queue::SessionCommand>>,
2680 ) -> anyhow::Result<Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>>
2681 {
2682 let queue = self
2683 .command_queue
2684 .as_ref()
2685 .ok_or_else(|| anyhow::anyhow!("No queue configured on this session"))?;
2686 Ok(queue.submit_batch(lane, commands).await)
2687 }
2688
2689 pub async fn dead_letters(&self) -> Vec<DeadLetter> {
2691 if let Some(ref queue) = self.command_queue {
2692 queue.dead_letters().await
2693 } else {
2694 Vec::new()
2695 }
2696 }
2697
2698 pub fn register_agent_dir(&self, dir: &std::path::Path) -> usize {
2711 use crate::subagent::load_agents_from_dir;
2712 let agents = load_agents_from_dir(dir);
2713 let count = agents.len();
2714 for agent in agents {
2715 tracing::info!(
2716 session_id = %self.session_id,
2717 agent = agent.name,
2718 dir = %dir.display(),
2719 "Dynamically registered agent"
2720 );
2721 self.agent_registry.register(agent);
2722 }
2723 count
2724 }
2725
2726 pub async fn add_mcp_server(
2733 &self,
2734 config: crate::mcp::McpServerConfig,
2735 ) -> crate::error::Result<usize> {
2736 let server_name = config.name.clone();
2737 self.mcp_manager.register_server(config).await;
2738 self.mcp_manager.connect(&server_name).await.map_err(|e| {
2739 crate::error::CodeError::Tool {
2740 tool: server_name.clone(),
2741 message: format!("Failed to connect MCP server: {}", e),
2742 }
2743 })?;
2744
2745 let tools = self.mcp_manager.get_server_tools(&server_name).await;
2746 let count = tools.len();
2747
2748 for tool in
2749 crate::mcp::tools::create_mcp_tools(&server_name, tools, Arc::clone(&self.mcp_manager))
2750 {
2751 self.tool_executor.register_dynamic_tool(tool);
2752 }
2753
2754 tracing::info!(
2755 session_id = %self.session_id,
2756 server = server_name,
2757 tools = count,
2758 "MCP server added to live session"
2759 );
2760
2761 Ok(count)
2762 }
2763
2764 pub async fn remove_mcp_server(&self, server_name: &str) -> crate::error::Result<()> {
2769 self.tool_executor
2770 .unregister_tools_by_prefix(&format!("mcp__{server_name}__"));
2771 self.mcp_manager
2772 .disconnect(server_name)
2773 .await
2774 .map_err(|e| crate::error::CodeError::Tool {
2775 tool: server_name.to_string(),
2776 message: format!("Failed to disconnect MCP server: {}", e),
2777 })?;
2778 tracing::info!(
2779 session_id = %self.session_id,
2780 server = server_name,
2781 "MCP server removed from live session"
2782 );
2783 Ok(())
2784 }
2785
2786 pub async fn mcp_status(
2788 &self,
2789 ) -> std::collections::HashMap<String, crate::mcp::McpServerStatus> {
2790 self.mcp_manager.get_status().await
2791 }
2792}
2793
2794#[cfg(test)]
2799mod tests {
2800 use super::*;
2801 use crate::config::{ModelConfig, ModelModalities, ProviderConfig};
2802 use crate::store::SessionStore;
2803
2804 #[tokio::test]
2805 async fn test_session_submit_no_queue_returns_err() {
2806 let agent = Agent::from_config(test_config()).await.unwrap();
2807 let session = agent.session(".", None).unwrap();
2808 struct Noop;
2809 #[async_trait::async_trait]
2810 impl crate::queue::SessionCommand for Noop {
2811 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2812 Ok(serde_json::json!(null))
2813 }
2814 fn command_type(&self) -> &str {
2815 "noop"
2816 }
2817 }
2818 let result: anyhow::Result<
2819 tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>,
2820 > = session.submit(SessionLane::Query, Box::new(Noop)).await;
2821 assert!(result.is_err());
2822 assert!(result.unwrap_err().to_string().contains("No queue"));
2823 }
2824
2825 #[tokio::test]
2826 async fn test_session_submit_batch_no_queue_returns_err() {
2827 let agent = Agent::from_config(test_config()).await.unwrap();
2828 let session = agent.session(".", None).unwrap();
2829 struct Noop;
2830 #[async_trait::async_trait]
2831 impl crate::queue::SessionCommand for Noop {
2832 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
2833 Ok(serde_json::json!(null))
2834 }
2835 fn command_type(&self) -> &str {
2836 "noop"
2837 }
2838 }
2839 let cmds: Vec<Box<dyn crate::queue::SessionCommand>> = vec![Box::new(Noop)];
2840 let result: anyhow::Result<
2841 Vec<tokio::sync::oneshot::Receiver<anyhow::Result<serde_json::Value>>>,
2842 > = session.submit_batch(SessionLane::Query, cmds).await;
2843 assert!(result.is_err());
2844 assert!(result.unwrap_err().to_string().contains("No queue"));
2845 }
2846
2847 fn test_config() -> CodeConfig {
2848 CodeConfig {
2849 default_model: Some("anthropic/claude-sonnet-4-20250514".to_string()),
2850 providers: vec![
2851 ProviderConfig {
2852 name: "anthropic".to_string(),
2853 api_key: Some("test-key".to_string()),
2854 base_url: None,
2855 headers: std::collections::HashMap::new(),
2856 session_id_header: None,
2857 models: vec![ModelConfig {
2858 id: "claude-sonnet-4-20250514".to_string(),
2859 name: "Claude Sonnet 4".to_string(),
2860 family: "claude-sonnet".to_string(),
2861 api_key: None,
2862 base_url: None,
2863 headers: std::collections::HashMap::new(),
2864 session_id_header: None,
2865 attachment: false,
2866 reasoning: false,
2867 tool_call: true,
2868 temperature: true,
2869 release_date: None,
2870 modalities: ModelModalities::default(),
2871 cost: Default::default(),
2872 limit: Default::default(),
2873 }],
2874 },
2875 ProviderConfig {
2876 name: "openai".to_string(),
2877 api_key: Some("test-openai-key".to_string()),
2878 base_url: None,
2879 headers: std::collections::HashMap::new(),
2880 session_id_header: None,
2881 models: vec![ModelConfig {
2882 id: "gpt-4o".to_string(),
2883 name: "GPT-4o".to_string(),
2884 family: "gpt-4".to_string(),
2885 api_key: None,
2886 base_url: None,
2887 headers: std::collections::HashMap::new(),
2888 session_id_header: None,
2889 attachment: false,
2890 reasoning: false,
2891 tool_call: true,
2892 temperature: true,
2893 release_date: None,
2894 modalities: ModelModalities::default(),
2895 cost: Default::default(),
2896 limit: Default::default(),
2897 }],
2898 },
2899 ],
2900 ..Default::default()
2901 }
2902 }
2903
2904 fn build_effective_registry_for_test(
2905 agent_registry: Option<Arc<crate::skills::SkillRegistry>>,
2906 opts: &SessionOptions,
2907 ) -> Arc<crate::skills::SkillRegistry> {
2908 let base_registry = agent_registry
2909 .as_deref()
2910 .map(|r| r.fork())
2911 .unwrap_or_else(crate::skills::SkillRegistry::with_builtins);
2912 if let Some(ref r) = opts.skill_registry {
2913 for skill in r.all() {
2914 base_registry.register_unchecked(skill);
2915 }
2916 }
2917 for dir in &opts.skill_dirs {
2918 if let Err(e) = base_registry.load_from_dir(dir) {
2919 tracing::warn!(
2920 dir = %dir.display(),
2921 error = %e,
2922 "Failed to load session skill dir — skipping"
2923 );
2924 }
2925 }
2926 Arc::new(base_registry)
2927 }
2928
2929 #[tokio::test]
2930 async fn test_from_config() {
2931 let agent = Agent::from_config(test_config()).await;
2932 assert!(agent.is_ok());
2933 }
2934
2935 #[tokio::test]
2936 async fn test_session_default() {
2937 let agent = Agent::from_config(test_config()).await.unwrap();
2938 let session = agent.session("/tmp/test-workspace", None);
2939 assert!(session.is_ok());
2940 let debug = format!("{:?}", session.unwrap());
2941 assert!(debug.contains("AgentSession"));
2942 }
2943
2944 #[tokio::test]
2945 async fn test_session_registers_agentic_tools_by_default() {
2946 let agent = Agent::from_config(test_config()).await.unwrap();
2947 let session = agent.session("/tmp/test-workspace", None).unwrap();
2948 let tool_names = session.tool_names();
2949
2950 assert!(tool_names.iter().any(|name| name == "agentic_search"));
2951 assert!(tool_names.iter().any(|name| name == "agentic_parse"));
2952 }
2953
2954 #[tokio::test]
2955 async fn test_session_can_disable_agentic_tools_via_config() {
2956 let mut config = test_config();
2957 config.agentic_search = Some(crate::config::AgenticSearchConfig {
2958 enabled: false,
2959 ..Default::default()
2960 });
2961 config.agentic_parse = Some(crate::config::AgenticParseConfig {
2962 enabled: false,
2963 ..Default::default()
2964 });
2965
2966 let agent = Agent::from_config(config).await.unwrap();
2967 let session = agent.session("/tmp/test-workspace", None).unwrap();
2968 let tool_names = session.tool_names();
2969
2970 assert!(!tool_names.iter().any(|name| name == "agentic_search"));
2971 assert!(!tool_names.iter().any(|name| name == "agentic_parse"));
2972 }
2973
2974 #[tokio::test]
2975 async fn test_session_with_model_override() {
2976 let agent = Agent::from_config(test_config()).await.unwrap();
2977 let opts = SessionOptions::new().with_model("openai/gpt-4o");
2978 let session = agent.session("/tmp/test-workspace", Some(opts));
2979 assert!(session.is_ok());
2980 }
2981
2982 #[tokio::test]
2983 async fn test_session_with_invalid_model_format() {
2984 let agent = Agent::from_config(test_config()).await.unwrap();
2985 let opts = SessionOptions::new().with_model("gpt-4o");
2986 let session = agent.session("/tmp/test-workspace", Some(opts));
2987 assert!(session.is_err());
2988 }
2989
2990 #[test]
2991 fn test_session_options_with_document_ocr_provider() {
2992 struct MockOcrProvider;
2993
2994 impl crate::document_ocr::DocumentOcrProvider for MockOcrProvider {
2995 fn name(&self) -> &str {
2996 "mock-ocr"
2997 }
2998
2999 fn ocr_pdf(
3000 &self,
3001 _path: &std::path::Path,
3002 _config: &crate::config::DocumentOcrConfig,
3003 ) -> anyhow::Result<Option<String>> {
3004 Ok(None)
3005 }
3006 }
3007
3008 let opts = SessionOptions::new().with_document_ocr_provider(Arc::new(MockOcrProvider));
3009 assert!(opts.document_ocr_provider.is_some());
3010 }
3011
3012 #[test]
3013 fn test_session_options_with_document_parser_registry() {
3014 let registry = Arc::new(crate::document_parser::DocumentParserRegistry::empty());
3015 let opts = SessionOptions::new().with_document_parser_registry(Arc::clone(®istry));
3016 assert!(opts.document_parser_registry.is_some());
3017 assert!(Arc::ptr_eq(
3018 opts.document_parser_registry.as_ref().unwrap(),
3019 ®istry
3020 ));
3021 }
3022
3023 #[tokio::test]
3024 async fn test_session_with_model_not_found() {
3025 let agent = Agent::from_config(test_config()).await.unwrap();
3026 let opts = SessionOptions::new().with_model("openai/nonexistent");
3027 let session = agent.session("/tmp/test-workspace", Some(opts));
3028 assert!(session.is_err());
3029 }
3030
3031 #[tokio::test]
3032 async fn test_session_preserves_skill_scorer_from_agent_registry() {
3033 use crate::skills::feedback::{
3034 DefaultSkillScorer, SkillFeedback, SkillOutcome, SkillScorer,
3035 };
3036 use crate::skills::{Skill, SkillKind, SkillRegistry};
3037
3038 let registry = Arc::new(SkillRegistry::new());
3039 let scorer = Arc::new(DefaultSkillScorer::default());
3040 registry.set_scorer(scorer.clone());
3041
3042 registry.register_unchecked(Arc::new(Skill {
3043 name: "healthy-skill".to_string(),
3044 description: "healthy".to_string(),
3045 allowed_tools: None,
3046 disable_model_invocation: false,
3047 kind: SkillKind::Instruction,
3048 content: "healthy".to_string(),
3049 tags: vec![],
3050 version: None,
3051 }));
3052 registry.register_unchecked(Arc::new(Skill {
3053 name: "disabled-skill".to_string(),
3054 description: "disabled".to_string(),
3055 allowed_tools: None,
3056 disable_model_invocation: false,
3057 kind: SkillKind::Instruction,
3058 content: "disabled".to_string(),
3059 tags: vec![],
3060 version: None,
3061 }));
3062
3063 for _ in 0..5 {
3064 scorer.record(SkillFeedback {
3065 skill_name: "disabled-skill".to_string(),
3066 outcome: SkillOutcome::Failure,
3067 score_delta: -1.0,
3068 reason: "bad".to_string(),
3069 timestamp: 0,
3070 });
3071 }
3072
3073 let effective_registry =
3074 build_effective_registry_for_test(Some(registry), &SessionOptions::new());
3075 let prompt = effective_registry.to_system_prompt();
3076
3077 assert!(prompt.contains("healthy-skill"));
3078 assert!(!prompt.contains("disabled-skill"));
3079 }
3080
3081 #[tokio::test]
3082 async fn test_session_skill_dirs_preserve_agent_registry_validator() {
3083 use crate::skills::validator::DefaultSkillValidator;
3084 use crate::skills::SkillRegistry;
3085
3086 let registry = Arc::new(SkillRegistry::new());
3087 registry.set_validator(Arc::new(DefaultSkillValidator::default()));
3088
3089 let temp_dir = tempfile::tempdir().unwrap();
3090 let invalid_skill = temp_dir.path().join("invalid.md");
3091 std::fs::write(
3092 &invalid_skill,
3093 r#"---
3094name: BadName
3095description: "invalid skill name"
3096kind: instruction
3097---
3098# Invalid Skill
3099"#,
3100 )
3101 .unwrap();
3102
3103 let opts = SessionOptions::new().with_skill_dirs([temp_dir.path()]);
3104 let effective_registry = build_effective_registry_for_test(Some(registry), &opts);
3105 assert!(effective_registry.get("BadName").is_none());
3106 }
3107
3108 #[tokio::test]
3109 async fn test_session_skill_registry_overrides_agent_registry_without_polluting_parent() {
3110 use crate::skills::{Skill, SkillKind, SkillRegistry};
3111
3112 let registry = Arc::new(SkillRegistry::new());
3113 registry.register_unchecked(Arc::new(Skill {
3114 name: "shared-skill".to_string(),
3115 description: "agent level".to_string(),
3116 allowed_tools: None,
3117 disable_model_invocation: false,
3118 kind: SkillKind::Instruction,
3119 content: "agent content".to_string(),
3120 tags: vec![],
3121 version: None,
3122 }));
3123
3124 let session_registry = Arc::new(SkillRegistry::new());
3125 session_registry.register_unchecked(Arc::new(Skill {
3126 name: "shared-skill".to_string(),
3127 description: "session level".to_string(),
3128 allowed_tools: None,
3129 disable_model_invocation: false,
3130 kind: SkillKind::Instruction,
3131 content: "session content".to_string(),
3132 tags: vec![],
3133 version: None,
3134 }));
3135
3136 let opts = SessionOptions::new().with_skill_registry(session_registry);
3137 let effective_registry = build_effective_registry_for_test(Some(registry.clone()), &opts);
3138
3139 assert_eq!(
3140 effective_registry.get("shared-skill").unwrap().content,
3141 "session content"
3142 );
3143 assert_eq!(
3144 registry.get("shared-skill").unwrap().content,
3145 "agent content"
3146 );
3147 }
3148
3149 #[tokio::test]
3150 async fn test_session_skill_dirs_override_session_registry_and_skip_invalid_entries() {
3151 use crate::skills::{Skill, SkillKind, SkillRegistry};
3152
3153 let session_registry = Arc::new(SkillRegistry::new());
3154 session_registry.register_unchecked(Arc::new(Skill {
3155 name: "shared-skill".to_string(),
3156 description: "session registry".to_string(),
3157 allowed_tools: None,
3158 disable_model_invocation: false,
3159 kind: SkillKind::Instruction,
3160 content: "registry content".to_string(),
3161 tags: vec![],
3162 version: None,
3163 }));
3164
3165 let temp_dir = tempfile::tempdir().unwrap();
3166 std::fs::write(
3167 temp_dir.path().join("shared.md"),
3168 r#"---
3169name: shared-skill
3170description: "skill dir override"
3171kind: instruction
3172---
3173# Shared Skill
3174dir content
3175"#,
3176 )
3177 .unwrap();
3178 std::fs::write(temp_dir.path().join("README.md"), "# not a skill").unwrap();
3179
3180 let opts = SessionOptions::new()
3181 .with_skill_registry(session_registry)
3182 .with_skill_dirs([temp_dir.path()]);
3183 let effective_registry = build_effective_registry_for_test(None, &opts);
3184
3185 assert_eq!(
3186 effective_registry.get("shared-skill").unwrap().description,
3187 "skill dir override"
3188 );
3189 assert!(effective_registry.get("README").is_none());
3190 }
3191
3192 #[tokio::test]
3193 async fn test_session_plugin_skills_are_loaded_into_session_registry_only() {
3194 use crate::plugin::{Plugin, PluginContext};
3195 use crate::skills::{Skill, SkillKind, SkillRegistry};
3196 use crate::tools::ToolRegistry;
3197
3198 struct SessionOnlySkillPlugin;
3199
3200 impl Plugin for SessionOnlySkillPlugin {
3201 fn name(&self) -> &str {
3202 "session-only-skill"
3203 }
3204
3205 fn version(&self) -> &str {
3206 "0.1.0"
3207 }
3208
3209 fn tool_names(&self) -> &[&str] {
3210 &[]
3211 }
3212
3213 fn load(
3214 &self,
3215 _registry: &Arc<ToolRegistry>,
3216 _ctx: &PluginContext,
3217 ) -> anyhow::Result<()> {
3218 Ok(())
3219 }
3220
3221 fn skills(&self) -> Vec<Arc<Skill>> {
3222 vec![Arc::new(Skill {
3223 name: "plugin-session-skill".to_string(),
3224 description: "plugin skill".to_string(),
3225 allowed_tools: None,
3226 disable_model_invocation: false,
3227 kind: SkillKind::Instruction,
3228 content: "plugin content".to_string(),
3229 tags: vec!["plugin".to_string()],
3230 version: None,
3231 })]
3232 }
3233 }
3234
3235 let mut agent = Agent::from_config(test_config()).await.unwrap();
3236 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3237 agent.config.skill_registry = Some(Arc::clone(&agent_registry));
3238
3239 let opts = SessionOptions::new().with_plugin(SessionOnlySkillPlugin);
3240 let session = agent.session("/tmp/test-workspace", Some(opts)).unwrap();
3241
3242 let session_registry = session.config.skill_registry.as_ref().unwrap();
3243 assert!(session_registry.get("plugin-session-skill").is_some());
3244 assert!(agent_registry.get("plugin-session-skill").is_none());
3245 }
3246
3247 #[tokio::test]
3248 async fn test_session_specific_skills_do_not_leak_across_sessions() {
3249 use crate::skills::{Skill, SkillKind, SkillRegistry};
3250
3251 let mut agent = Agent::from_config(test_config()).await.unwrap();
3252 let agent_registry = Arc::new(SkillRegistry::with_builtins());
3253 agent.config.skill_registry = Some(agent_registry);
3254
3255 let session_registry = Arc::new(SkillRegistry::new());
3256 session_registry.register_unchecked(Arc::new(Skill {
3257 name: "session-only".to_string(),
3258 description: "only for first session".to_string(),
3259 allowed_tools: None,
3260 disable_model_invocation: false,
3261 kind: SkillKind::Instruction,
3262 content: "session one".to_string(),
3263 tags: vec![],
3264 version: None,
3265 }));
3266
3267 let session_one = agent
3268 .session(
3269 "/tmp/test-workspace",
3270 Some(SessionOptions::new().with_skill_registry(session_registry)),
3271 )
3272 .unwrap();
3273 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3274
3275 assert!(session_one
3276 .config
3277 .skill_registry
3278 .as_ref()
3279 .unwrap()
3280 .get("session-only")
3281 .is_some());
3282 assert!(session_two
3283 .config
3284 .skill_registry
3285 .as_ref()
3286 .unwrap()
3287 .get("session-only")
3288 .is_none());
3289 }
3290
3291 #[tokio::test]
3292 async fn test_plugin_skills_do_not_leak_across_sessions() {
3293 use crate::plugin::{Plugin, PluginContext};
3294 use crate::skills::{Skill, SkillKind, SkillRegistry};
3295 use crate::tools::ToolRegistry;
3296
3297 struct LeakyPlugin;
3298
3299 impl Plugin for LeakyPlugin {
3300 fn name(&self) -> &str {
3301 "leaky-plugin"
3302 }
3303
3304 fn version(&self) -> &str {
3305 "0.1.0"
3306 }
3307
3308 fn tool_names(&self) -> &[&str] {
3309 &[]
3310 }
3311
3312 fn load(
3313 &self,
3314 _registry: &Arc<ToolRegistry>,
3315 _ctx: &PluginContext,
3316 ) -> anyhow::Result<()> {
3317 Ok(())
3318 }
3319
3320 fn skills(&self) -> Vec<Arc<Skill>> {
3321 vec![Arc::new(Skill {
3322 name: "plugin-only".to_string(),
3323 description: "plugin only".to_string(),
3324 allowed_tools: None,
3325 disable_model_invocation: false,
3326 kind: SkillKind::Instruction,
3327 content: "plugin skill".to_string(),
3328 tags: vec![],
3329 version: None,
3330 })]
3331 }
3332 }
3333
3334 let mut agent = Agent::from_config(test_config()).await.unwrap();
3335 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3336
3337 let session_one = agent
3338 .session(
3339 "/tmp/test-workspace",
3340 Some(SessionOptions::new().with_plugin(LeakyPlugin)),
3341 )
3342 .unwrap();
3343 let session_two = agent.session("/tmp/test-workspace", None).unwrap();
3344
3345 assert!(session_one
3346 .config
3347 .skill_registry
3348 .as_ref()
3349 .unwrap()
3350 .get("plugin-only")
3351 .is_some());
3352 assert!(session_two
3353 .config
3354 .skill_registry
3355 .as_ref()
3356 .unwrap()
3357 .get("plugin-only")
3358 .is_none());
3359 }
3360
3361 #[tokio::test]
3362 async fn test_session_for_agent_applies_definition_and_keeps_skill_overrides_isolated() {
3363 use crate::skills::{Skill, SkillKind, SkillRegistry};
3364 use crate::subagent::AgentDefinition;
3365
3366 let mut agent = Agent::from_config(test_config()).await.unwrap();
3367 agent.config.skill_registry = Some(Arc::new(SkillRegistry::with_builtins()));
3368
3369 let definition = AgentDefinition::new("reviewer", "Review code")
3370 .with_prompt("Agent definition prompt")
3371 .with_max_steps(7);
3372
3373 let session_registry = Arc::new(SkillRegistry::new());
3374 session_registry.register_unchecked(Arc::new(Skill {
3375 name: "agent-session-skill".to_string(),
3376 description: "agent session only".to_string(),
3377 allowed_tools: None,
3378 disable_model_invocation: false,
3379 kind: SkillKind::Instruction,
3380 content: "agent session content".to_string(),
3381 tags: vec![],
3382 version: None,
3383 }));
3384
3385 let session_one = agent
3386 .session_for_agent(
3387 "/tmp/test-workspace",
3388 &definition,
3389 Some(SessionOptions::new().with_skill_registry(session_registry)),
3390 )
3391 .unwrap();
3392 let session_two = agent
3393 .session_for_agent("/tmp/test-workspace", &definition, None)
3394 .unwrap();
3395
3396 assert_eq!(session_one.config.max_tool_rounds, 7);
3397 let extra = session_one.config.prompt_slots.extra.as_deref().unwrap();
3398 assert!(extra.contains("Agent definition prompt"));
3399 assert!(extra.contains("agent-session-skill"));
3400 assert!(session_one
3401 .config
3402 .skill_registry
3403 .as_ref()
3404 .unwrap()
3405 .get("agent-session-skill")
3406 .is_some());
3407 assert!(session_two
3408 .config
3409 .skill_registry
3410 .as_ref()
3411 .unwrap()
3412 .get("agent-session-skill")
3413 .is_none());
3414 }
3415
3416 #[tokio::test]
3417 async fn test_session_for_agent_preserves_existing_prompt_slots_when_injecting_definition_prompt(
3418 ) {
3419 use crate::prompts::SystemPromptSlots;
3420 use crate::subagent::AgentDefinition;
3421
3422 let agent = Agent::from_config(test_config()).await.unwrap();
3423 let definition = AgentDefinition::new("planner", "Plan work")
3424 .with_prompt("Definition extra prompt")
3425 .with_max_steps(3);
3426
3427 let opts = SessionOptions::new().with_prompt_slots(SystemPromptSlots {
3428 style: None,
3429 role: Some("Custom role".to_string()),
3430 guidelines: None,
3431 response_style: None,
3432 extra: None,
3433 });
3434
3435 let session = agent
3436 .session_for_agent("/tmp/test-workspace", &definition, Some(opts))
3437 .unwrap();
3438
3439 assert_eq!(
3440 session.config.prompt_slots.role.as_deref(),
3441 Some("Custom role")
3442 );
3443 assert!(session
3444 .config
3445 .prompt_slots
3446 .extra
3447 .as_deref()
3448 .unwrap()
3449 .contains("Definition extra prompt"));
3450 assert_eq!(session.config.max_tool_rounds, 3);
3451 }
3452
3453 #[tokio::test]
3454 async fn test_new_with_hcl_string() {
3455 let hcl = r#"
3456 default_model = "anthropic/claude-sonnet-4-20250514"
3457 providers {
3458 name = "anthropic"
3459 api_key = "test-key"
3460 models {
3461 id = "claude-sonnet-4-20250514"
3462 name = "Claude Sonnet 4"
3463 }
3464 }
3465 "#;
3466 let agent = Agent::new(hcl).await;
3467 assert!(agent.is_ok());
3468 }
3469
3470 #[tokio::test]
3471 async fn test_create_alias_hcl() {
3472 let hcl = r#"
3473 default_model = "anthropic/claude-sonnet-4-20250514"
3474 providers {
3475 name = "anthropic"
3476 api_key = "test-key"
3477 models {
3478 id = "claude-sonnet-4-20250514"
3479 name = "Claude Sonnet 4"
3480 }
3481 }
3482 "#;
3483 let agent = Agent::create(hcl).await;
3484 assert!(agent.is_ok());
3485 }
3486
3487 #[tokio::test]
3488 async fn test_create_and_new_produce_same_result() {
3489 let hcl = r#"
3490 default_model = "anthropic/claude-sonnet-4-20250514"
3491 providers {
3492 name = "anthropic"
3493 api_key = "test-key"
3494 models {
3495 id = "claude-sonnet-4-20250514"
3496 name = "Claude Sonnet 4"
3497 }
3498 }
3499 "#;
3500 let agent_new = Agent::new(hcl).await;
3501 let agent_create = Agent::create(hcl).await;
3502 assert!(agent_new.is_ok());
3503 assert!(agent_create.is_ok());
3504
3505 let session_new = agent_new.unwrap().session("/tmp/test-ws-new", None);
3507 let session_create = agent_create.unwrap().session("/tmp/test-ws-create", None);
3508 assert!(session_new.is_ok());
3509 assert!(session_create.is_ok());
3510 }
3511
3512 #[tokio::test]
3513 async fn test_new_with_existing_hcl_file_uses_file_loading() {
3514 let temp_dir = tempfile::tempdir().unwrap();
3515 let config_path = temp_dir.path().join("agent.hcl");
3516 std::fs::write(&config_path, "this is not valid hcl").unwrap();
3517
3518 let err = Agent::new(config_path.display().to_string())
3519 .await
3520 .unwrap_err();
3521 let msg = err.to_string();
3522
3523 assert!(msg.contains("Failed to load config"));
3524 assert!(msg.contains("agent.hcl"));
3525 assert!(!msg.contains("Failed to parse config as HCL string"));
3526 }
3527
3528 #[tokio::test]
3529 async fn test_new_with_missing_hcl_file_reports_not_found() {
3530 let temp_dir = tempfile::tempdir().unwrap();
3531 let missing_path = temp_dir.path().join("agent.hcl");
3532
3533 let err = Agent::new(missing_path.display().to_string())
3534 .await
3535 .unwrap_err();
3536 let msg = err.to_string();
3537
3538 assert!(msg.contains("Config file not found"));
3539 assert!(msg.contains("agent.hcl"));
3540 assert!(!msg.contains("Failed to parse config as HCL string"));
3541 }
3542
3543 #[test]
3544 fn test_from_config_requires_default_model() {
3545 let rt = tokio::runtime::Runtime::new().unwrap();
3546 let config = CodeConfig {
3547 providers: vec![ProviderConfig {
3548 name: "anthropic".to_string(),
3549 api_key: Some("test-key".to_string()),
3550 base_url: None,
3551 headers: std::collections::HashMap::new(),
3552 session_id_header: None,
3553 models: vec![],
3554 }],
3555 ..Default::default()
3556 };
3557 let result = rt.block_on(Agent::from_config(config));
3558 assert!(result.is_err());
3559 }
3560
3561 #[tokio::test]
3562 async fn test_history_empty_on_new_session() {
3563 let agent = Agent::from_config(test_config()).await.unwrap();
3564 let session = agent.session("/tmp/test-workspace", None).unwrap();
3565 assert!(session.history().is_empty());
3566 }
3567
3568 #[tokio::test]
3569 async fn test_session_options_with_agent_dir() {
3570 let opts = SessionOptions::new()
3571 .with_agent_dir("/tmp/agents")
3572 .with_agent_dir("/tmp/more-agents");
3573 assert_eq!(opts.agent_dirs.len(), 2);
3574 assert_eq!(opts.agent_dirs[0], PathBuf::from("/tmp/agents"));
3575 assert_eq!(opts.agent_dirs[1], PathBuf::from("/tmp/more-agents"));
3576 }
3577
3578 #[test]
3583 fn test_session_options_with_queue_config() {
3584 let qc = SessionQueueConfig::default().with_lane_features();
3585 let opts = SessionOptions::new().with_queue_config(qc.clone());
3586 assert!(opts.queue_config.is_some());
3587
3588 let config = opts.queue_config.unwrap();
3589 assert!(config.enable_dlq);
3590 assert!(config.enable_metrics);
3591 assert!(config.enable_alerts);
3592 assert_eq!(config.default_timeout_ms, Some(60_000));
3593 }
3594
3595 #[tokio::test(flavor = "multi_thread")]
3596 async fn test_session_with_queue_config() {
3597 let agent = Agent::from_config(test_config()).await.unwrap();
3598 let qc = SessionQueueConfig::default();
3599 let opts = SessionOptions::new().with_queue_config(qc);
3600 let session = agent.session("/tmp/test-workspace-queue", Some(opts));
3601 assert!(session.is_ok());
3602 let session = session.unwrap();
3603 assert!(session.has_queue());
3604 }
3605
3606 #[tokio::test]
3607 async fn test_session_without_queue_config() {
3608 let agent = Agent::from_config(test_config()).await.unwrap();
3609 let session = agent.session("/tmp/test-workspace-noqueue", None).unwrap();
3610 assert!(!session.has_queue());
3611 }
3612
3613 #[tokio::test]
3614 async fn test_session_queue_stats_without_queue() {
3615 let agent = Agent::from_config(test_config()).await.unwrap();
3616 let session = agent.session("/tmp/test-workspace-stats", None).unwrap();
3617 let stats = session.queue_stats().await;
3618 assert_eq!(stats.total_pending, 0);
3620 assert_eq!(stats.total_active, 0);
3621 }
3622
3623 #[tokio::test(flavor = "multi_thread")]
3624 async fn test_session_queue_stats_with_queue() {
3625 let agent = Agent::from_config(test_config()).await.unwrap();
3626 let qc = SessionQueueConfig::default();
3627 let opts = SessionOptions::new().with_queue_config(qc);
3628 let session = agent
3629 .session("/tmp/test-workspace-qstats", Some(opts))
3630 .unwrap();
3631 let stats = session.queue_stats().await;
3632 assert_eq!(stats.total_pending, 0);
3634 assert_eq!(stats.total_active, 0);
3635 }
3636
3637 #[tokio::test(flavor = "multi_thread")]
3638 async fn test_session_pending_external_tasks_empty() {
3639 let agent = Agent::from_config(test_config()).await.unwrap();
3640 let qc = SessionQueueConfig::default();
3641 let opts = SessionOptions::new().with_queue_config(qc);
3642 let session = agent
3643 .session("/tmp/test-workspace-ext", Some(opts))
3644 .unwrap();
3645 let tasks = session.pending_external_tasks().await;
3646 assert!(tasks.is_empty());
3647 }
3648
3649 #[tokio::test(flavor = "multi_thread")]
3650 async fn test_session_dead_letters_empty() {
3651 let agent = Agent::from_config(test_config()).await.unwrap();
3652 let qc = SessionQueueConfig::default().with_dlq(Some(100));
3653 let opts = SessionOptions::new().with_queue_config(qc);
3654 let session = agent
3655 .session("/tmp/test-workspace-dlq", Some(opts))
3656 .unwrap();
3657 let dead = session.dead_letters().await;
3658 assert!(dead.is_empty());
3659 }
3660
3661 #[tokio::test(flavor = "multi_thread")]
3662 async fn test_session_queue_metrics_disabled() {
3663 let agent = Agent::from_config(test_config()).await.unwrap();
3664 let qc = SessionQueueConfig::default();
3666 let opts = SessionOptions::new().with_queue_config(qc);
3667 let session = agent
3668 .session("/tmp/test-workspace-nomet", Some(opts))
3669 .unwrap();
3670 let metrics = session.queue_metrics().await;
3671 assert!(metrics.is_none());
3672 }
3673
3674 #[tokio::test(flavor = "multi_thread")]
3675 async fn test_session_queue_metrics_enabled() {
3676 let agent = Agent::from_config(test_config()).await.unwrap();
3677 let qc = SessionQueueConfig::default().with_metrics();
3678 let opts = SessionOptions::new().with_queue_config(qc);
3679 let session = agent
3680 .session("/tmp/test-workspace-met", Some(opts))
3681 .unwrap();
3682 let metrics = session.queue_metrics().await;
3683 assert!(metrics.is_some());
3684 }
3685
3686 #[tokio::test(flavor = "multi_thread")]
3687 async fn test_session_set_lane_handler() {
3688 let agent = Agent::from_config(test_config()).await.unwrap();
3689 let qc = SessionQueueConfig::default();
3690 let opts = SessionOptions::new().with_queue_config(qc);
3691 let session = agent
3692 .session("/tmp/test-workspace-handler", Some(opts))
3693 .unwrap();
3694
3695 session
3697 .set_lane_handler(
3698 SessionLane::Execute,
3699 LaneHandlerConfig {
3700 mode: crate::queue::TaskHandlerMode::External,
3701 timeout_ms: 30_000,
3702 },
3703 )
3704 .await;
3705
3706 }
3709
3710 #[tokio::test(flavor = "multi_thread")]
3715 async fn test_session_has_id() {
3716 let agent = Agent::from_config(test_config()).await.unwrap();
3717 let session = agent.session("/tmp/test-ws-id", None).unwrap();
3718 assert!(!session.session_id().is_empty());
3720 assert_eq!(session.session_id().len(), 36); }
3722
3723 #[tokio::test(flavor = "multi_thread")]
3724 async fn test_session_explicit_id() {
3725 let agent = Agent::from_config(test_config()).await.unwrap();
3726 let opts = SessionOptions::new().with_session_id("my-session-42");
3727 let session = agent.session("/tmp/test-ws-eid", Some(opts)).unwrap();
3728 assert_eq!(session.session_id(), "my-session-42");
3729 }
3730
3731 #[tokio::test(flavor = "multi_thread")]
3732 async fn test_session_save_no_store() {
3733 let agent = Agent::from_config(test_config()).await.unwrap();
3734 let session = agent.session("/tmp/test-ws-save", None).unwrap();
3735 session.save().await.unwrap();
3737 }
3738
3739 #[tokio::test(flavor = "multi_thread")]
3740 async fn test_session_save_and_load() {
3741 let store = Arc::new(crate::store::MemorySessionStore::new());
3742 let agent = Agent::from_config(test_config()).await.unwrap();
3743
3744 let opts = SessionOptions::new()
3745 .with_session_store(store.clone())
3746 .with_session_id("persist-test");
3747 let session = agent.session("/tmp/test-ws-persist", Some(opts)).unwrap();
3748
3749 session.save().await.unwrap();
3751
3752 assert!(store.exists("persist-test").await.unwrap());
3754
3755 let data = store.load("persist-test").await.unwrap().unwrap();
3756 assert_eq!(data.id, "persist-test");
3757 assert!(data.messages.is_empty());
3758 }
3759
3760 #[tokio::test(flavor = "multi_thread")]
3761 async fn test_session_save_with_history() {
3762 let store = Arc::new(crate::store::MemorySessionStore::new());
3763 let agent = Agent::from_config(test_config()).await.unwrap();
3764
3765 let opts = SessionOptions::new()
3766 .with_session_store(store.clone())
3767 .with_session_id("history-test");
3768 let session = agent.session("/tmp/test-ws-hist", Some(opts)).unwrap();
3769
3770 {
3772 let mut h = session.history.write().unwrap();
3773 h.push(Message::user("Hello"));
3774 h.push(Message::user("How are you?"));
3775 }
3776
3777 session.save().await.unwrap();
3778
3779 let data = store.load("history-test").await.unwrap().unwrap();
3780 assert_eq!(data.messages.len(), 2);
3781 }
3782
3783 #[tokio::test(flavor = "multi_thread")]
3784 async fn test_resume_session() {
3785 let store = Arc::new(crate::store::MemorySessionStore::new());
3786 let agent = Agent::from_config(test_config()).await.unwrap();
3787
3788 let opts = SessionOptions::new()
3790 .with_session_store(store.clone())
3791 .with_session_id("resume-test");
3792 let session = agent.session("/tmp/test-ws-resume", Some(opts)).unwrap();
3793 {
3794 let mut h = session.history.write().unwrap();
3795 h.push(Message::user("What is Rust?"));
3796 h.push(Message::user("Tell me more"));
3797 }
3798 session.save().await.unwrap();
3799
3800 let opts2 = SessionOptions::new().with_session_store(store.clone());
3802 let resumed = agent.resume_session("resume-test", opts2).unwrap();
3803
3804 assert_eq!(resumed.session_id(), "resume-test");
3805 let history = resumed.history();
3806 assert_eq!(history.len(), 2);
3807 assert_eq!(history[0].text(), "What is Rust?");
3808 }
3809
3810 #[tokio::test(flavor = "multi_thread")]
3811 async fn test_resume_session_not_found() {
3812 let store = Arc::new(crate::store::MemorySessionStore::new());
3813 let agent = Agent::from_config(test_config()).await.unwrap();
3814
3815 let opts = SessionOptions::new().with_session_store(store.clone());
3816 let result = agent.resume_session("nonexistent", opts);
3817 assert!(result.is_err());
3818 assert!(result.unwrap_err().to_string().contains("not found"));
3819 }
3820
3821 #[tokio::test(flavor = "multi_thread")]
3822 async fn test_resume_session_no_store() {
3823 let agent = Agent::from_config(test_config()).await.unwrap();
3824 let opts = SessionOptions::new();
3825 let result = agent.resume_session("any-id", opts);
3826 assert!(result.is_err());
3827 assert!(result.unwrap_err().to_string().contains("session_store"));
3828 }
3829
3830 #[tokio::test(flavor = "multi_thread")]
3831 async fn test_file_session_store_persistence() {
3832 let dir = tempfile::TempDir::new().unwrap();
3833 let store = Arc::new(
3834 crate::store::FileSessionStore::new(dir.path())
3835 .await
3836 .unwrap(),
3837 );
3838 let agent = Agent::from_config(test_config()).await.unwrap();
3839
3840 let opts = SessionOptions::new()
3842 .with_session_store(store.clone())
3843 .with_session_id("file-persist");
3844 let session = agent
3845 .session("/tmp/test-ws-file-persist", Some(opts))
3846 .unwrap();
3847 {
3848 let mut h = session.history.write().unwrap();
3849 h.push(Message::user("test message"));
3850 }
3851 session.save().await.unwrap();
3852
3853 let store2 = Arc::new(
3855 crate::store::FileSessionStore::new(dir.path())
3856 .await
3857 .unwrap(),
3858 );
3859 let data = store2.load("file-persist").await.unwrap().unwrap();
3860 assert_eq!(data.messages.len(), 1);
3861 }
3862
3863 #[tokio::test(flavor = "multi_thread")]
3864 async fn test_session_options_builders() {
3865 let opts = SessionOptions::new()
3866 .with_session_id("test-id")
3867 .with_auto_save(true);
3868 assert_eq!(opts.session_id, Some("test-id".to_string()));
3869 assert!(opts.auto_save);
3870 }
3871
3872 #[test]
3877 fn test_session_options_with_sandbox_sets_config() {
3878 use crate::sandbox::SandboxConfig;
3879 let cfg = SandboxConfig {
3880 image: "ubuntu:22.04".into(),
3881 memory_mb: 1024,
3882 ..SandboxConfig::default()
3883 };
3884 let opts = SessionOptions::new().with_sandbox(cfg);
3885 assert!(opts.sandbox_config.is_some());
3886 let sc = opts.sandbox_config.unwrap();
3887 assert_eq!(sc.image, "ubuntu:22.04");
3888 assert_eq!(sc.memory_mb, 1024);
3889 }
3890
3891 #[test]
3892 fn test_session_options_default_has_no_sandbox() {
3893 let opts = SessionOptions::default();
3894 assert!(opts.sandbox_config.is_none());
3895 }
3896
3897 #[tokio::test]
3898 async fn test_session_debug_includes_sandbox_config() {
3899 use crate::sandbox::SandboxConfig;
3900 let opts = SessionOptions::new().with_sandbox(SandboxConfig::default());
3901 let debug = format!("{:?}", opts);
3902 assert!(debug.contains("sandbox_config"));
3903 }
3904
3905 #[tokio::test]
3906 async fn test_session_build_with_sandbox_config_no_feature_warn() {
3907 let agent = Agent::from_config(test_config()).await.unwrap();
3910 let opts = SessionOptions::new().with_sandbox(crate::sandbox::SandboxConfig::default());
3911 let session = agent.session("/tmp/test-sandbox-session", Some(opts));
3913 assert!(session.is_ok());
3914 }
3915
3916 #[tokio::test(flavor = "multi_thread")]
3921 async fn test_session_with_memory_store() {
3922 use a3s_memory::InMemoryStore;
3923 let store = Arc::new(InMemoryStore::new());
3924 let agent = Agent::from_config(test_config()).await.unwrap();
3925 let opts = SessionOptions::new().with_memory(store);
3926 let session = agent.session("/tmp/test-ws-memory", Some(opts)).unwrap();
3927 assert!(session.memory().is_some());
3928 }
3929
3930 #[tokio::test(flavor = "multi_thread")]
3931 async fn test_session_without_memory_store() {
3932 let agent = Agent::from_config(test_config()).await.unwrap();
3933 let session = agent.session("/tmp/test-ws-no-memory", None).unwrap();
3934 assert!(session.memory().is_none());
3935 }
3936
3937 #[tokio::test(flavor = "multi_thread")]
3938 async fn test_session_memory_wired_into_config() {
3939 use a3s_memory::InMemoryStore;
3940 let store = Arc::new(InMemoryStore::new());
3941 let agent = Agent::from_config(test_config()).await.unwrap();
3942 let opts = SessionOptions::new().with_memory(store);
3943 let session = agent
3944 .session("/tmp/test-ws-mem-config", Some(opts))
3945 .unwrap();
3946 assert!(session.memory().is_some());
3948 }
3949
3950 #[tokio::test(flavor = "multi_thread")]
3951 async fn test_session_with_file_memory() {
3952 let dir = tempfile::TempDir::new().unwrap();
3953 let agent = Agent::from_config(test_config()).await.unwrap();
3954 let opts = SessionOptions::new().with_file_memory(dir.path());
3955 let session = agent.session("/tmp/test-ws-file-mem", Some(opts)).unwrap();
3956 assert!(session.memory().is_some());
3957 }
3958
3959 #[tokio::test(flavor = "multi_thread")]
3960 async fn test_memory_remember_and_recall() {
3961 use a3s_memory::InMemoryStore;
3962 let store = Arc::new(InMemoryStore::new());
3963 let agent = Agent::from_config(test_config()).await.unwrap();
3964 let opts = SessionOptions::new().with_memory(store);
3965 let session = agent
3966 .session("/tmp/test-ws-mem-recall", Some(opts))
3967 .unwrap();
3968
3969 let memory = session.memory().unwrap();
3970 memory
3971 .remember_success("write a file", &["write".to_string()], "done")
3972 .await
3973 .unwrap();
3974
3975 let results = memory.recall_similar("write", 5).await.unwrap();
3976 assert!(!results.is_empty());
3977 let stats = memory.stats().await.unwrap();
3978 assert_eq!(stats.long_term_count, 1);
3979 }
3980
3981 #[tokio::test(flavor = "multi_thread")]
3986 async fn test_session_tool_timeout_configured() {
3987 let agent = Agent::from_config(test_config()).await.unwrap();
3988 let opts = SessionOptions::new().with_tool_timeout(5000);
3989 let session = agent.session("/tmp/test-ws-timeout", Some(opts)).unwrap();
3990 assert!(!session.id().is_empty());
3991 }
3992
3993 #[tokio::test(flavor = "multi_thread")]
3998 async fn test_session_without_queue_builds_ok() {
3999 let agent = Agent::from_config(test_config()).await.unwrap();
4000 let session = agent.session("/tmp/test-ws-no-queue", None).unwrap();
4001 assert!(!session.id().is_empty());
4002 }
4003
4004 #[tokio::test(flavor = "multi_thread")]
4009 async fn test_concurrent_history_reads() {
4010 let agent = Agent::from_config(test_config()).await.unwrap();
4011 let session = Arc::new(agent.session("/tmp/test-ws-concurrent", None).unwrap());
4012
4013 let handles: Vec<_> = (0..10)
4014 .map(|_| {
4015 let s = Arc::clone(&session);
4016 tokio::spawn(async move { s.history().len() })
4017 })
4018 .collect();
4019
4020 for h in handles {
4021 h.await.unwrap();
4022 }
4023 }
4024
4025 #[tokio::test(flavor = "multi_thread")]
4030 async fn test_session_no_init_warning_without_file_memory() {
4031 let agent = Agent::from_config(test_config()).await.unwrap();
4032 let session = agent.session("/tmp/test-ws-no-warn", None).unwrap();
4033 assert!(session.init_warning().is_none());
4034 }
4035
4036 #[tokio::test(flavor = "multi_thread")]
4037 async fn test_register_agent_dir_loads_agents_into_live_session() {
4038 let temp_dir = tempfile::tempdir().unwrap();
4039
4040 std::fs::write(
4042 temp_dir.path().join("my-agent.yaml"),
4043 "name: my-dynamic-agent\ndescription: Dynamically registered agent\n",
4044 )
4045 .unwrap();
4046
4047 let agent = Agent::from_config(test_config()).await.unwrap();
4048 let session = agent.session(".", None).unwrap();
4049
4050 assert!(!session.agent_registry.exists("my-dynamic-agent"));
4052
4053 let count = session.register_agent_dir(temp_dir.path());
4054 assert_eq!(count, 1);
4055 assert!(session.agent_registry.exists("my-dynamic-agent"));
4056 }
4057
4058 #[tokio::test(flavor = "multi_thread")]
4059 async fn test_register_agent_dir_empty_dir_returns_zero() {
4060 let temp_dir = tempfile::tempdir().unwrap();
4061 let agent = Agent::from_config(test_config()).await.unwrap();
4062 let session = agent.session(".", None).unwrap();
4063 let count = session.register_agent_dir(temp_dir.path());
4064 assert_eq!(count, 0);
4065 }
4066
4067 #[tokio::test(flavor = "multi_thread")]
4068 async fn test_register_agent_dir_nonexistent_returns_zero() {
4069 let agent = Agent::from_config(test_config()).await.unwrap();
4070 let session = agent.session(".", None).unwrap();
4071 let count = session.register_agent_dir(std::path::Path::new("/nonexistent/path/abc"));
4072 assert_eq!(count, 0);
4073 }
4074
4075 #[tokio::test(flavor = "multi_thread")]
4076 async fn test_session_with_mcp_manager_builds_ok() {
4077 use crate::mcp::manager::McpManager;
4078 let mcp = Arc::new(McpManager::new());
4079 let agent = Agent::from_config(test_config()).await.unwrap();
4080 let opts = SessionOptions::new().with_mcp(mcp);
4081 let session = agent.session("/tmp/test-ws-mcp", Some(opts)).unwrap();
4083 assert!(!session.id().is_empty());
4084 }
4085
4086 #[test]
4087 fn test_session_command_is_pub() {
4088 use crate::SessionCommand;
4090 let _ = std::marker::PhantomData::<Box<dyn SessionCommand>>;
4091 }
4092
4093 #[tokio::test(flavor = "multi_thread")]
4094 async fn test_session_submit_with_queue_executes() {
4095 let agent = Agent::from_config(test_config()).await.unwrap();
4096 let qc = SessionQueueConfig::default();
4097 let opts = SessionOptions::new().with_queue_config(qc);
4098 let session = agent
4099 .session("/tmp/test-ws-submit-exec", Some(opts))
4100 .unwrap();
4101
4102 struct Echo(serde_json::Value);
4103 #[async_trait::async_trait]
4104 impl crate::queue::SessionCommand for Echo {
4105 async fn execute(&self) -> anyhow::Result<serde_json::Value> {
4106 Ok(self.0.clone())
4107 }
4108 fn command_type(&self) -> &str {
4109 "echo"
4110 }
4111 }
4112
4113 let rx = session
4114 .submit(
4115 SessionLane::Query,
4116 Box::new(Echo(serde_json::json!({"ok": true}))),
4117 )
4118 .await
4119 .expect("submit should succeed with queue configured");
4120
4121 let result = tokio::time::timeout(std::time::Duration::from_secs(2), rx)
4122 .await
4123 .expect("timed out waiting for command result")
4124 .expect("channel closed before result")
4125 .expect("command returned an error");
4126
4127 assert_eq!(result["ok"], true);
4128 }
4129}