1use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use serde_json::{Value, json};
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use tracing::error;
19
20use crate::background::{
21 BackgroundEventSink, BackgroundExecutableTool, BackgroundOutcome, BackgroundProgress,
22};
23use crate::session_resource::{RegisterSessionResource, SessionResourceStatus};
24use crate::session_schedule::MAX_ACTIVE_SCHEDULES_PER_SESSION;
25use crate::tool_types::{
26 BuiltinTool, DeferrablePolicy, ToolCall, ToolDefinition, ToolHints, ToolPolicy, ToolResult,
27};
28use crate::traits::ToolContext;
29use crate::typed_id::SessionId;
30use tokio::sync::Semaphore;
31
32use crate::error::Result;
33use crate::traits::ToolExecutor;
34
35pub const MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION: usize = 5;
40
41const MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER: usize = 64;
46static ACTIVE_BACKGROUND_RUNS_PER_WORKER: Semaphore =
47 Semaphore::const_new(MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER);
48
49static SESSION_BACKGROUND_PERMITS: OnceLock<std::sync::Mutex<HashMap<SessionId, Arc<Semaphore>>>> =
55 OnceLock::new();
56
57fn session_background_semaphore(session_id: SessionId) -> Arc<Semaphore> {
58 SESSION_BACKGROUND_PERMITS
59 .get_or_init(Default::default)
60 .lock()
61 .unwrap()
62 .entry(session_id)
63 .or_insert_with(|| Arc::new(Semaphore::new(MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION)))
64 .clone()
65}
66
67#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ToolResultImage {
77 pub base64: String,
79 pub media_type: String,
81}
82
83#[derive(Debug)]
99pub enum ToolExecutionResult {
100 Success(Value),
102
103 SuccessWithImages {
107 result: Value,
108 images: Vec<ToolResultImage>,
109 },
110
111 ToolError(String),
116
117 InternalError(ToolInternalError),
123
124 ConnectionRequired {
131 provider: String,
133 },
134}
135
136impl ToolExecutionResult {
137 pub fn success(value: impl Into<Value>) -> Self {
139 ToolExecutionResult::Success(value.into())
140 }
141
142 pub fn success_with_raw_output(value: impl Into<Value>, raw_output: String) -> Self {
145 let mut value = value.into();
146 match value.as_object_mut() {
150 Some(obj) => {
151 obj.insert("_raw_output".to_string(), Value::String(raw_output));
152 }
153 None => {
154 value = serde_json::json!({
155 "_raw_output_scalar": value,
156 "_raw_output": raw_output,
157 });
158 }
159 }
160 ToolExecutionResult::Success(value)
161 }
162
163 pub fn success_with_images(value: impl Into<Value>, images: Vec<ToolResultImage>) -> Self {
165 ToolExecutionResult::SuccessWithImages {
166 result: value.into(),
167 images,
168 }
169 }
170
171 pub fn tool_error(message: impl Into<String>) -> Self {
173 ToolExecutionResult::ToolError(message.into())
174 }
175
176 pub fn internal_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
178 ToolExecutionResult::InternalError(ToolInternalError::new(error))
179 }
180
181 pub fn internal_error_msg(message: impl Into<String>) -> Self {
183 ToolExecutionResult::InternalError(ToolInternalError::from_message(message))
184 }
185
186 pub fn connection_required(provider: impl Into<String>) -> Self {
188 ToolExecutionResult::ConnectionRequired {
189 provider: provider.into(),
190 }
191 }
192
193 pub fn is_success(&self) -> bool {
195 matches!(
196 self,
197 ToolExecutionResult::Success(_) | ToolExecutionResult::SuccessWithImages { .. }
198 )
199 }
200
201 pub fn is_error(&self) -> bool {
203 matches!(
204 self,
205 ToolExecutionResult::ToolError(_) | ToolExecutionResult::InternalError(_)
206 )
207 }
208
209 pub fn is_connection_required(&self) -> bool {
211 matches!(self, ToolExecutionResult::ConnectionRequired { .. })
212 }
213
214 pub fn into_tool_result(self, tool_call_id: &str, tool_name: &str) -> ToolResult {
222 match self {
223 ToolExecutionResult::Success(mut value) => {
224 let raw_output = value
226 .as_object_mut()
227 .and_then(|obj| obj.remove("_raw_output"))
228 .and_then(|v| v.as_str().map(|s| s.to_string()));
229 let result_value = if let Some(obj) = value.as_object_mut() {
232 let is_scalar_carrier = raw_output.is_some()
233 && obj.len() == 1
234 && obj.contains_key("_raw_output_scalar");
235 if is_scalar_carrier {
236 obj.remove("_raw_output_scalar").unwrap_or(Value::Null)
237 } else {
238 value
239 }
240 } else {
241 value
242 };
243 ToolResult {
244 tool_call_id: tool_call_id.to_string(),
245 result: Some(result_value),
246 images: None,
247 error: None,
248 connection_required: None,
249 raw_output,
250 }
251 }
252 ToolExecutionResult::SuccessWithImages { result, images } => ToolResult {
253 tool_call_id: tool_call_id.to_string(),
254 result: Some(result),
255 images: if images.is_empty() {
256 None
257 } else {
258 Some(images)
259 },
260 error: None,
261 connection_required: None,
262 raw_output: None,
263 },
264 ToolExecutionResult::ToolError(message) => ToolResult {
265 tool_call_id: tool_call_id.to_string(),
266 result: Some(serde_json::json!({ "error": &message })),
267 images: None,
268 error: Some(message),
269 connection_required: None,
270 raw_output: None,
271 },
272 ToolExecutionResult::InternalError(err) => {
273 error!(
275 tool_name = %tool_name,
276 tool_call_id = %tool_call_id,
277 error = %err.message,
278 error_chain = %err.chain_string(),
279 "Tool internal error (details hidden from LLM)"
280 );
281
282 let generic_msg = "An internal error occurred while executing the tool";
284 ToolResult {
285 tool_call_id: tool_call_id.to_string(),
286 result: Some(serde_json::json!({
287 "error": generic_msg
288 })),
289 images: None,
290 error: Some(generic_msg.to_string()),
291 connection_required: None,
292 raw_output: None,
293 }
294 }
295 ToolExecutionResult::ConnectionRequired { ref provider } => ToolResult {
296 tool_call_id: tool_call_id.to_string(),
297 result: Some(serde_json::json!({
298 "connection_required": provider,
299 })),
300 images: None,
301 error: None,
302 connection_required: Some(provider.clone()),
303 raw_output: None,
304 },
305 }
306 }
307}
308
309#[derive(Debug)]
311pub struct ToolInternalError {
312 pub message: String,
314 pub source: Option<Box<dyn std::error::Error + Send + Sync>>,
316}
317
318impl ToolInternalError {
319 pub fn new(error: impl std::error::Error + Send + Sync + 'static) -> Self {
321 Self {
322 message: error.to_string(),
323 source: Some(Box::new(error)),
324 }
325 }
326
327 pub fn from_message(message: impl Into<String>) -> Self {
329 Self {
330 message: message.into(),
331 source: None,
332 }
333 }
334
335 pub fn chain_string(&self) -> String {
336 let mut parts = vec![self.message.clone()];
337 let mut current = <Self as std::error::Error>::source(self);
338 while let Some(source) = current {
339 let message = source.to_string();
340 if parts.last() != Some(&message) {
341 parts.push(message);
342 }
343 current = source.source();
344 }
345 parts.join(": ")
346 }
347}
348
349impl std::fmt::Display for ToolInternalError {
350 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351 write!(f, "{}", self.message)
352 }
353}
354
355impl std::error::Error for ToolInternalError {
356 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
357 self.source
358 .as_ref()
359 .map(|e| e.as_ref() as &(dyn std::error::Error + 'static))
360 }
361}
362
363#[async_trait]
412pub trait Tool: Send + Sync {
413 fn name(&self) -> &str;
418
419 fn display_name(&self) -> Option<&str> {
426 None
427 }
428
429 fn description(&self) -> &str;
434
435 fn parameters_schema(&self) -> Value;
441
442 async fn execute(&self, arguments: Value) -> ToolExecutionResult;
453
454 async fn execute_with_context(
471 &self,
472 arguments: Value,
473 _context: &ToolContext,
474 ) -> ToolExecutionResult {
475 self.execute(arguments).await
477 }
478
479 fn requires_context(&self) -> bool {
484 false
485 }
486
487 fn policy(&self) -> ToolPolicy {
492 ToolPolicy::Auto
493 }
494
495 fn hints(&self) -> ToolHints {
500 ToolHints::default()
501 }
502
503 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
506 None
507 }
508
509 fn to_definition(&self) -> ToolDefinition {
514 ToolDefinition::Builtin(BuiltinTool {
515 name: self.name().to_string(),
516 display_name: self.display_name().map(|s| s.to_string()),
517 description: self.description().to_string(),
518 parameters: self.parameters_schema(),
519 policy: self.policy(),
520 category: None,
521 deferrable: DeferrablePolicy::default(),
522 hints: self.hints(),
523 full_parameters: None,
524 })
525 }
526}
527
528#[derive(Default, Clone)]
555pub struct ToolRegistry {
556 tools: HashMap<String, Arc<dyn Tool>>,
557}
558
559impl ToolRegistry {
560 pub fn new() -> Self {
562 Self {
563 tools: HashMap::new(),
564 }
565 }
566
567 pub fn with_defaults() -> Self {
579 use crate::capabilities::{
580 AddTool, DeleteFileTool, DivideTool, EditFileTool, GetCurrentTimeTool, GetForecastTool,
581 GetWeatherTool, GrepFilesTool, ListDirectoryTool, MultiplyTool, ReadFileTool,
582 StatFileTool, SubtractTool, WebFetchTool, WriteFileTool, WriteTodosTool,
583 };
584 use crate::progress_reporting::ReportProgressTool;
585
586 ToolRegistry::builder()
587 .tool(GetCurrentTimeTool)
588 .tool(EchoTool)
589 .tool(ReportProgressTool)
599 .tool(AddTool)
601 .tool(SubtractTool)
602 .tool(MultiplyTool)
603 .tool(DivideTool)
604 .tool(GetWeatherTool)
606 .tool(GetForecastTool)
607 .tool(WriteTodosTool)
609 .tool(ReadFileTool)
611 .tool(WriteFileTool)
612 .tool(EditFileTool)
613 .tool(ListDirectoryTool)
614 .tool(GrepFilesTool)
615 .tool(DeleteFileTool)
616 .tool(StatFileTool)
617 .tool(WebFetchTool::default())
619 .build()
620 }
621
622 pub fn register(&mut self, tool: impl Tool + 'static) {
626 self.tools.insert(tool.name().to_string(), Arc::new(tool));
627 }
628
629 pub fn register_boxed(&mut self, tool: Box<dyn Tool>) {
631 self.tools.insert(tool.name().to_string(), Arc::from(tool));
632 }
633
634 pub fn register_arc(&mut self, tool: Arc<dyn Tool>) {
636 self.tools.insert(tool.name().to_string(), tool);
637 }
638
639 pub fn get(&self, name: &str) -> Option<&Arc<dyn Tool>> {
641 self.tools.get(name)
642 }
643
644 pub fn has(&self, name: &str) -> bool {
646 self.tools.contains_key(name)
647 }
648
649 pub fn len(&self) -> usize {
651 self.tools.len()
652 }
653
654 pub fn is_empty(&self) -> bool {
656 self.tools.is_empty()
657 }
658
659 pub fn tool_names(&self) -> Vec<&str> {
661 self.tools.keys().map(|s| s.as_str()).collect()
662 }
663
664 pub fn tool_definitions(&self) -> Vec<ToolDefinition> {
669 self.tools.values().map(|t| t.to_definition()).collect()
670 }
671
672 pub fn unregister(&mut self, name: &str) -> Option<Arc<dyn Tool>> {
674 self.tools.remove(name)
675 }
676
677 pub fn clear(&mut self) {
679 self.tools.clear();
680 }
681
682 pub fn builder() -> ToolRegistryBuilder {
684 ToolRegistryBuilder::new()
685 }
686}
687
688impl std::fmt::Debug for ToolRegistry {
689 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690 f.debug_struct("ToolRegistry")
691 .field("tools", &self.tool_names())
692 .finish()
693 }
694}
695
696#[async_trait]
697impl ToolExecutor for ToolRegistry {
698 async fn execute(
699 &self,
700 tool_call: &ToolCall,
701 _tool_def: &ToolDefinition,
702 ) -> Result<ToolResult> {
703 let tool = self.tools.get(&tool_call.name).ok_or_else(|| {
704 crate::error::AgentLoopError::tool(format!("Tool not found: {}", tool_call.name))
705 })?;
706
707 let result = tool.execute(tool_call.arguments.clone()).await;
708 Ok(result.into_tool_result(&tool_call.id, &tool_call.name))
709 }
710
711 async fn execute_with_context(
712 &self,
713 tool_call: &ToolCall,
714 _tool_def: &ToolDefinition,
715 context: &ToolContext,
716 ) -> Result<ToolResult> {
717 let tool = self.tools.get(&tool_call.name).ok_or_else(|| {
718 crate::error::AgentLoopError::tool(format!("Tool not found: {}", tool_call.name))
719 })?;
720
721 let result = tool
724 .execute_with_context(tool_call.arguments.clone(), context)
725 .await;
726 Ok(result.into_tool_result(&tool_call.id, &tool_call.name))
727 }
728}
729
730pub struct ToolRegistryBuilder {
745 registry: ToolRegistry,
746}
747
748impl ToolRegistryBuilder {
749 pub fn new() -> Self {
751 Self {
752 registry: ToolRegistry::new(),
753 }
754 }
755
756 pub fn tool(mut self, tool: impl Tool + 'static) -> Self {
758 self.registry.register(tool);
759 self
760 }
761
762 pub fn tool_boxed(mut self, tool: Box<dyn Tool>) -> Self {
764 self.registry.register_boxed(tool);
765 self
766 }
767
768 pub fn tool_arc(mut self, tool: Arc<dyn Tool>) -> Self {
770 self.registry.register_arc(tool);
771 self
772 }
773
774 pub fn build(self) -> ToolRegistry {
776 self.registry
777 }
778}
779
780impl Default for ToolRegistryBuilder {
781 fn default() -> Self {
782 Self::new()
783 }
784}
785
786pub struct EchoTool;
792
793#[async_trait]
794impl Tool for EchoTool {
795 fn name(&self) -> &str {
796 "echo"
797 }
798
799 fn display_name(&self) -> Option<&str> {
800 Some("Echo")
801 }
802
803 fn description(&self) -> &str {
804 "Echo back the provided message. Useful for testing tool execution."
805 }
806
807 fn parameters_schema(&self) -> Value {
808 serde_json::json!({
809 "type": "object",
810 "properties": {
811 "message": {
812 "type": "string",
813 "description": "The message to echo back"
814 }
815 },
816 "required": ["message"],
817 "additionalProperties": false
818 })
819 }
820
821 fn hints(&self) -> ToolHints {
822 ToolHints::default()
823 .with_readonly(true)
824 .with_idempotent(true)
825 }
826
827 async fn execute(&self, arguments: Value) -> ToolExecutionResult {
828 let message = arguments
829 .get("message")
830 .and_then(|v| v.as_str())
831 .unwrap_or("");
832
833 ToolExecutionResult::success(serde_json::json!({
834 "echoed": message,
835 "length": message.len()
836 }))
837 }
838}
839
840pub struct SpawnBackgroundTool;
842
843#[derive(Debug, Clone)]
844struct BackgroundScheduleRequest {
845 cron_expression: Option<String>,
846 scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
847 timezone: String,
848}
849
850fn parse_background_schedule(
851 arguments: &Value,
852) -> std::result::Result<Option<BackgroundScheduleRequest>, String> {
853 let Some(schedule) = arguments.get("schedule") else {
854 return Ok(None);
855 };
856 let Some(schedule) = schedule.as_object() else {
857 return Err("schedule must be an object".to_string());
858 };
859
860 let cron_expression = schedule
861 .get("cron_expression")
862 .and_then(Value::as_str)
863 .map(str::trim)
864 .filter(|value| !value.is_empty())
865 .map(ToString::to_string);
866 let scheduled_at = match schedule.get("scheduled_at").and_then(Value::as_str) {
867 Some(value) => {
868 let value = value.trim();
869 if value.is_empty() {
870 None
871 } else {
872 Some(
873 chrono::DateTime::parse_from_rfc3339(value)
874 .map_err(|_| "scheduled_at must be RFC3339".to_string())?
875 .with_timezone(&chrono::Utc),
876 )
877 }
878 }
879 None => None,
880 };
881
882 match (cron_expression.is_some(), scheduled_at.is_some()) {
883 (false, false) => {
884 return Err(
885 "schedule must include exactly one of cron_expression (recurring) or scheduled_at (one-shot)"
886 .to_string(),
887 );
888 }
889 (true, true) => {
890 return Err(
891 "schedule must not include both cron_expression and scheduled_at; provide exactly one"
892 .to_string(),
893 );
894 }
895 _ => {}
896 }
897
898 let timezone = schedule
899 .get("timezone")
900 .and_then(Value::as_str)
901 .map(str::trim)
902 .filter(|value| !value.is_empty())
903 .unwrap_or("UTC")
904 .to_string();
905
906 Ok(Some(BackgroundScheduleRequest {
907 cron_expression,
908 scheduled_at,
909 timezone,
910 }))
911}
912
913fn build_background_schedule_description(
914 tool_name: &str,
915 tool_args: &Value,
916 title: &str,
917 signal_on_completion: bool,
918) -> String {
919 let payload = json!({
920 "tool": tool_name,
921 "title": title,
922 "signal_on_completion": signal_on_completion,
923 "args": tool_args,
924 });
925 let payload_json =
926 serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string());
927
928 format!(
929 "Monitor: {title}\n\n\
930This scheduled monitor fired. Start the background run now.\n\n\
931spawn_background payload:\n{payload_json}"
932 )
933}
934
935#[async_trait]
936impl Tool for SpawnBackgroundTool {
937 fn name(&self) -> &str {
938 "spawn_background"
939 }
940
941 fn display_name(&self) -> Option<&str> {
942 Some("Spawn Background")
943 }
944
945 fn description(&self) -> &str {
946 "Run a background-capable built-in tool asynchronously. Returns immediately and signals the session when the background run completes."
947 }
948
949 fn parameters_schema(&self) -> Value {
950 json!({
951 "type": "object",
952 "properties": {
953 "tool": {
954 "type": "string",
955 "description": "Name of the built-in tool to execute in the background"
956 },
957 "args": {
958 "type": "object",
959 "description": "Arguments to pass to the target tool"
960 },
961 "title": {
962 "type": "string",
963 "description": "Optional human-readable label for the background run"
964 },
965 "schedule": {
966 "type": "object",
967 "description": "Optional session schedule. When provided, this creates a scheduled monitor instead of starting the run immediately.",
968 "properties": {
969 "cron_expression": {
970 "type": "string",
971 "description": "Standard 5-field cron expression for recurring runs (e.g. '*/10 * * * *' for every 10 minutes)"
972 },
973 "scheduled_at": {
974 "type": "string",
975 "description": "ISO 8601 datetime for a one-shot run (e.g. '2026-04-16T15:30:00Z')"
976 },
977 "timezone": {
978 "type": "string",
979 "description": "IANA timezone for the schedule. Default: UTC"
980 }
981 },
982 "additionalProperties": false
983 },
984 "signal_on_completion": {
985 "type": "boolean",
986 "description": "Send a synthetic user message back to the session when the run completes",
987 "default": true
988 }
989 },
990 "required": ["tool", "args"],
991 "additionalProperties": false
992 })
993 }
994
995 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
996 ToolExecutionResult::tool_error(
997 "spawn_background requires context. This tool must be executed with session context.",
998 )
999 }
1000
1001 async fn execute_with_context(
1002 &self,
1003 arguments: Value,
1004 context: &ToolContext,
1005 ) -> ToolExecutionResult {
1006 let tool_name = match arguments.get("tool").and_then(|v| v.as_str()) {
1007 Some(name) if !name.trim().is_empty() => name.trim(),
1008 _ => return ToolExecutionResult::tool_error("Missing required parameter: tool"),
1009 };
1010 let tool_args = match arguments.get("args") {
1011 Some(args) if args.is_object() => args.clone(),
1012 _ => {
1013 return ToolExecutionResult::tool_error(
1014 "Missing required parameter: args (object expected)",
1015 );
1016 }
1017 };
1018 let signal_on_completion = arguments
1019 .get("signal_on_completion")
1020 .and_then(|v| v.as_bool())
1021 .unwrap_or(true);
1022 let schedule_request = match parse_background_schedule(&arguments) {
1023 Ok(schedule) => schedule,
1024 Err(message) => return ToolExecutionResult::tool_error(message),
1025 };
1026
1027 let Some(tool_registry) = &context.tool_registry else {
1028 return ToolExecutionResult::tool_error(
1029 "Tool registry not available in this context. spawn_background requires worker-side tool execution.",
1030 );
1031 };
1032
1033 let Some(tool) = tool_registry.get(tool_name).cloned() else {
1034 return ToolExecutionResult::tool_error(format!("Unknown tool: {tool_name}"));
1035 };
1036 if tool_name == self.name() {
1037 return ToolExecutionResult::tool_error(
1038 "spawn_background cannot target itself recursively",
1039 );
1040 }
1041 if tool.hints().supports_background != Some(true) {
1042 return ToolExecutionResult::tool_error(format!(
1043 "Tool does not support background execution: {tool_name}"
1044 ));
1045 }
1046 if tool.as_background_executable().is_none() {
1047 return ToolExecutionResult::tool_error(format!(
1048 "Tool declared background support but has no background executor: {tool_name}"
1049 ));
1050 }
1051 let title = arguments
1052 .get("title")
1053 .and_then(|v| v.as_str())
1054 .map(str::trim)
1055 .filter(|s| !s.is_empty())
1056 .map(|s| s.to_string())
1057 .unwrap_or_else(|| {
1058 tool.display_name()
1059 .map(ToString::to_string)
1060 .unwrap_or_else(|| format!("Background {tool_name}"))
1061 });
1062
1063 if let Some(schedule_request) = schedule_request {
1064 let Some(schedule_store) = &context.schedule_store else {
1065 return ToolExecutionResult::tool_error(
1066 "Schedule store not available in this context. Scheduled monitors require session schedules.",
1067 );
1068 };
1069
1070 match schedule_store
1071 .count_active_schedules(context.session_id)
1072 .await
1073 {
1074 Ok(count) if count >= MAX_ACTIVE_SCHEDULES_PER_SESSION => {
1075 return ToolExecutionResult::tool_error(format!(
1076 "Maximum {MAX_ACTIVE_SCHEDULES_PER_SESSION} active schedules per session. Cancel an existing schedule first."
1077 ));
1078 }
1079 Err(err) => return ToolExecutionResult::internal_error(err),
1080 _ => {}
1081 }
1082
1083 let description = build_background_schedule_description(
1084 tool_name,
1085 &tool_args,
1086 &title,
1087 signal_on_completion,
1088 );
1089
1090 return match schedule_store
1091 .create_schedule(
1092 context.session_id,
1093 description,
1094 schedule_request.cron_expression.clone(),
1095 schedule_request.scheduled_at,
1096 schedule_request.timezone.clone(),
1097 )
1098 .await
1099 {
1100 Ok(schedule) => ToolExecutionResult::success(json!({
1101 "created": true,
1102 "status": "scheduled",
1103 "title": title,
1104 "tool": tool_name,
1105 "signal_on_completion": signal_on_completion,
1106 "schedule_id": schedule.id.to_string(),
1107 "schedule_type": schedule.schedule_type,
1108 "cron_expression": schedule.cron_expression,
1109 "scheduled_at": schedule.scheduled_at,
1110 "timezone": schedule.timezone,
1111 "next_trigger_at": schedule.next_trigger_at,
1112 "enabled": schedule.enabled
1113 })),
1114 Err(err) => ToolExecutionResult::internal_error(err),
1115 };
1116 }
1117
1118 let Some(resource_registry) = &context.session_resource_registry else {
1119 return ToolExecutionResult::tool_error(
1120 "Session resource registry not available in this context",
1121 );
1122 };
1123 if context.file_store.is_none() {
1124 return ToolExecutionResult::tool_error(
1125 "Session file store not available in this context. spawn_background requires artifact persistence.",
1126 );
1127 }
1128
1129 let background_run_permit = match ACTIVE_BACKGROUND_RUNS_PER_WORKER.try_acquire() {
1130 Ok(permit) => permit,
1131 Err(_) => {
1132 return ToolExecutionResult::tool_error(format!(
1133 "Worker is already running the maximum {MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER} active background runs. Try again after an existing run finishes."
1134 ));
1135 }
1136 };
1137
1138 let session_run_permit = match session_background_semaphore(context.session_id)
1139 .try_acquire_owned()
1140 {
1141 Ok(permit) => permit,
1142 Err(_) => {
1143 return ToolExecutionResult::tool_error(format!(
1144 "Maximum {MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION} active background runs per session. Wait for an existing run to finish before starting another."
1145 ));
1146 }
1147 };
1148
1149 let run_id = format!("bg_{}", uuid::Uuid::now_v7().simple());
1150 let artifact_dir = format!("/.background/{run_id}");
1151 let log_path = format!("{artifact_dir}/output.log");
1152 let result_path = format!("{artifact_dir}/result.json");
1153 let metadata = json!({
1154 "tool": tool_name,
1155 "status_text": "Queued",
1156 "signal_on_completion": signal_on_completion,
1157 "artifact_dir": artifact_dir,
1158 "log_path": log_path,
1159 "result_path": result_path,
1160 });
1161
1162 if let Err(e) = resource_registry
1163 .register(RegisterSessionResource {
1164 session_id: context.session_id,
1165 resource_id: run_id.clone(),
1166 kind: "background_run".to_string(),
1167 display_name: title.clone(),
1168 status: SessionResourceStatus::Active,
1169 metadata,
1170 })
1171 .await
1172 {
1173 return ToolExecutionResult::internal_error_msg(format!(
1174 "Failed to register background run: {e}"
1175 ));
1176 }
1177
1178 let background_context = context.clone().with_tool_registry(tool_registry.clone());
1179 let sink = Arc::new(SessionBackgroundSink::new(
1180 background_context.clone(),
1181 run_id.clone(),
1182 title.clone(),
1183 tool_name.to_string(),
1184 log_path.clone(),
1185 result_path.clone(),
1186 signal_on_completion,
1187 ));
1188 let run_id_for_task = run_id.clone();
1189 let tool_for_task = tool.clone();
1190 let tool_name_for_task = tool_name.to_string();
1191
1192 tokio::spawn(async move {
1193 let _background_run_permit = background_run_permit;
1194 let _session_run_permit = session_run_permit;
1195 let _ = sink.status("Starting").await;
1196 let outcome = match tool_for_task.as_background_executable() {
1197 Some(background_tool) => {
1198 background_tool
1199 .execute_background(tool_args, background_context, sink.clone())
1200 .await
1201 }
1202 None => Err(ToolExecutionResult::tool_error(format!(
1203 "Tool declared background support but has no background executor: {}",
1204 tool_name_for_task
1205 ))),
1206 };
1207
1208 if let Err(err) = sink.finalize(outcome).await {
1209 tracing::warn!(
1210 run_id = run_id_for_task,
1211 error = %err,
1212 "Background run finalization failed"
1213 );
1214 }
1215 });
1216
1217 ToolExecutionResult::success(json!({
1218 "run_id": run_id,
1219 "resource_id": run_id,
1220 "title": title,
1221 "tool": tool_name,
1222 "status": "running",
1223 "signal_on_completion": signal_on_completion,
1224 "artifact_dir": artifact_dir,
1225 "log_path": log_path,
1226 "result_path": result_path
1227 }))
1228 }
1229
1230 fn requires_context(&self) -> bool {
1231 true
1232 }
1233}
1234
1235#[derive(Debug, Default)]
1236struct SessionBackgroundState {
1237 status_text: String,
1238 progress: Option<BackgroundProgress>,
1239 output_tail: String,
1240 output_log: String,
1241 output_log_chars: usize,
1242 output_log_truncated: bool,
1243}
1244
1245const MAX_BACKGROUND_OUTPUT_LOG_CHARS: usize = 256 * 1024;
1246
1247struct SessionBackgroundSink {
1248 context: ToolContext,
1249 run_id: String,
1250 display_name: String,
1251 tool_name: String,
1252 log_path: String,
1253 result_path: String,
1254 signal_on_completion: bool,
1255 state: tokio::sync::Mutex<SessionBackgroundState>,
1256}
1257
1258impl SessionBackgroundSink {
1259 fn new(
1260 context: ToolContext,
1261 run_id: String,
1262 display_name: String,
1263 tool_name: String,
1264 log_path: String,
1265 result_path: String,
1266 signal_on_completion: bool,
1267 ) -> Self {
1268 Self {
1269 context,
1270 run_id,
1271 display_name,
1272 tool_name,
1273 log_path,
1274 result_path,
1275 signal_on_completion,
1276 state: tokio::sync::Mutex::new(SessionBackgroundState {
1277 status_text: "Queued".to_string(),
1278 ..Default::default()
1279 }),
1280 }
1281 }
1282
1283 async fn finalize(
1284 &self,
1285 outcome: std::result::Result<BackgroundOutcome, ToolExecutionResult>,
1286 ) -> Result<()> {
1287 match outcome {
1288 Ok(outcome) => {
1289 let output_log = if let Some(raw_output) = &outcome.raw_output {
1290 raw_output.clone()
1291 } else {
1292 let state = self.state.lock().await;
1293 Self::final_output_log(&state)
1294 };
1295 self.write_text_file(&self.log_path, &output_log).await?;
1296 let result_json = serde_json::to_string_pretty(&outcome.result)
1297 .unwrap_or_else(|_| outcome.result.to_string());
1298 self.write_text_file(&self.result_path, &result_json)
1299 .await?;
1300
1301 let mut state = self.state.lock().await;
1302 state.status_text = "Completed".to_string();
1303 drop(state);
1304 self.update_resource(SessionResourceStatus::Completed, Some(&outcome.summary))
1305 .await?;
1306 if self.signal_on_completion {
1307 self.signal_session("completed", &outcome.summary).await?;
1308 }
1309 }
1310 Err(err) => {
1311 let message = match err {
1312 ToolExecutionResult::ToolError(msg) => msg,
1313 ToolExecutionResult::InternalError(inner) => inner.message,
1314 ToolExecutionResult::ConnectionRequired { provider } => {
1315 format!("Background tool requires connection setup: {provider}")
1316 }
1317 ToolExecutionResult::Success(_)
1318 | ToolExecutionResult::SuccessWithImages { .. } => {
1319 "Background run ended unexpectedly".to_string()
1320 }
1321 };
1322 let output_log = {
1323 let state = self.state.lock().await;
1324 Self::final_output_log(&state)
1325 };
1326 self.write_text_file(&self.log_path, &output_log).await?;
1327 let error_json = serde_json::to_string_pretty(&json!({
1328 "status": "failed",
1329 "error": &message,
1330 }))
1331 .unwrap_or_else(|_| {
1332 json!({
1333 "status": "failed",
1334 "error": &message,
1335 })
1336 .to_string()
1337 });
1338 self.write_text_file(&self.result_path, &error_json).await?;
1339 let mut state = self.state.lock().await;
1340 state.status_text = "Failed".to_string();
1341 drop(state);
1342 self.update_resource(SessionResourceStatus::Failed, Some(&message))
1343 .await?;
1344 if self.signal_on_completion {
1345 self.signal_session("failed", &message).await?;
1346 }
1347 }
1348 }
1349
1350 Ok(())
1351 }
1352
1353 async fn signal_session(&self, status: &str, summary: &str) -> Result<()> {
1354 let Some(platform_store) = &self.context.platform_store else {
1355 return Ok(());
1356 };
1357 let message = format!(
1358 "Background run {status}.\n- run_id: {}\n- title: {}\n- tool: {}\n- summary: {}\n- result_path: {}\n- log_path: {}",
1359 self.run_id,
1360 self.display_name,
1361 self.tool_name,
1362 summary,
1363 self.result_path,
1364 self.log_path
1365 );
1366 platform_store
1367 .send_message(self.context.session_id, &message)
1368 .await
1369 }
1370
1371 async fn update_resource(
1372 &self,
1373 status: SessionResourceStatus,
1374 summary: Option<&str>,
1375 ) -> Result<()> {
1376 let Some(registry) = &self.context.session_resource_registry else {
1377 return Ok(());
1378 };
1379 let state = self.state.lock().await;
1380 let status_text = state.status_text.clone();
1381 let progress = state.progress.clone();
1382 let output_tail = state.output_tail.clone();
1383 drop(state);
1384 registry
1385 .register(RegisterSessionResource {
1386 session_id: self.context.session_id,
1387 resource_id: self.run_id.clone(),
1388 kind: "background_run".to_string(),
1389 display_name: self.display_name.clone(),
1390 status,
1391 metadata: json!({
1392 "tool": self.tool_name,
1393 "status_text": status_text,
1394 "progress": progress,
1395 "output_tail": output_tail,
1396 "log_path": self.log_path,
1397 "result_path": self.result_path,
1398 "summary": summary,
1399 "signal_on_completion": self.signal_on_completion,
1400 }),
1401 })
1402 .await?;
1403 Ok(())
1404 }
1405
1406 async fn write_text_file(&self, path: &str, content: &str) -> Result<()> {
1407 let file_store = self.context.file_store.as_ref().ok_or_else(|| {
1408 anyhow::anyhow!(
1409 "background run {} cannot persist artifact {} because no session file store is configured",
1410 self.run_id,
1411 path
1412 )
1413 })?;
1414
1415 ensure_directory(file_store.as_ref(), self.context.session_id, "/.background").await?;
1416 let run_dir = format!("/.background/{}", self.run_id);
1417 ensure_directory(file_store.as_ref(), self.context.session_id, &run_dir).await?;
1418 file_store
1419 .write_file(self.context.session_id, path, content, "text")
1420 .await?;
1421 Ok(())
1422 }
1423}
1424
1425#[async_trait]
1426impl BackgroundEventSink for SessionBackgroundSink {
1427 async fn status(&self, message: &str) -> Result<()> {
1428 let mut state = self.state.lock().await;
1429 state.status_text = message.to_string();
1430 drop(state);
1431 self.update_resource(SessionResourceStatus::Active, None)
1432 .await
1433 }
1434
1435 async fn output(&self, stream: &str, delta: &str) -> Result<()> {
1436 let mut state = self.state.lock().await;
1437 if !delta.is_empty() {
1438 let prefix = format!("[{stream}] ");
1439 state.output_tail.push_str(&prefix);
1440 state.output_tail.push_str(delta);
1441 Self::append_to_output_log(&mut state, &prefix, delta);
1442 if state.output_tail.chars().count() > 2048 {
1443 state.output_tail = state
1444 .output_tail
1445 .chars()
1446 .rev()
1447 .take(2048)
1448 .collect::<Vec<_>>()
1449 .into_iter()
1450 .rev()
1451 .collect();
1452 }
1453 }
1454 drop(state);
1455 self.update_resource(SessionResourceStatus::Active, None)
1456 .await
1457 }
1458
1459 async fn progress(&self, progress: BackgroundProgress) -> Result<()> {
1460 let mut state = self.state.lock().await;
1461 state.progress = Some(progress);
1462 drop(state);
1463 self.update_resource(SessionResourceStatus::Active, None)
1464 .await
1465 }
1466}
1467
1468impl SessionBackgroundSink {
1469 fn append_to_output_log(state: &mut SessionBackgroundState, prefix: &str, delta: &str) {
1470 if state.output_log_chars >= MAX_BACKGROUND_OUTPUT_LOG_CHARS {
1471 state.output_log_truncated = true;
1472 return;
1473 }
1474
1475 let chunk = format!("{prefix}{delta}");
1476 let remaining = MAX_BACKGROUND_OUTPUT_LOG_CHARS - state.output_log_chars;
1477 let chunk_chars = chunk.chars().count();
1478
1479 if chunk_chars <= remaining {
1480 state.output_log.push_str(&chunk);
1481 state.output_log_chars += chunk_chars;
1482 return;
1483 }
1484
1485 let truncated_chunk: String = chunk.chars().take(remaining).collect();
1486 state.output_log.push_str(&truncated_chunk);
1487 state.output_log_chars += truncated_chunk.chars().count();
1488 state.output_log_truncated = true;
1489 }
1490
1491 fn final_output_log(state: &SessionBackgroundState) -> String {
1492 if !state.output_log_truncated {
1493 return state.output_log.clone();
1494 }
1495
1496 format!(
1497 "{}\n[system] background output truncated at {} characters\n",
1498 state.output_log, MAX_BACKGROUND_OUTPUT_LOG_CHARS
1499 )
1500 }
1501}
1502
1503async fn ensure_directory(
1504 file_store: &dyn crate::traits::SessionFileSystem,
1505 session_id: crate::SessionId,
1506 path: &str,
1507) -> Result<()> {
1508 if let Some(entry) = file_store.stat_file(session_id, path).await? {
1509 if entry.is_directory {
1510 return Ok(());
1511 }
1512 return Err(anyhow::anyhow!("path exists but is not a directory: {path}").into());
1513 }
1514 let _ = file_store.create_directory(session_id, path).await?;
1515 Ok(())
1516}
1517
1518pub struct FailingTool {
1520 error_message: String,
1521 use_internal_error: bool,
1522}
1523
1524impl FailingTool {
1525 pub fn with_tool_error(message: impl Into<String>) -> Self {
1527 Self {
1528 error_message: message.into(),
1529 use_internal_error: false,
1530 }
1531 }
1532
1533 pub fn with_internal_error(message: impl Into<String>) -> Self {
1535 Self {
1536 error_message: message.into(),
1537 use_internal_error: true,
1538 }
1539 }
1540}
1541
1542impl Default for FailingTool {
1543 fn default() -> Self {
1544 Self::with_tool_error("Tool execution failed")
1545 }
1546}
1547
1548#[async_trait]
1549impl Tool for FailingTool {
1550 fn name(&self) -> &str {
1551 "failing_tool"
1552 }
1553
1554 fn display_name(&self) -> Option<&str> {
1555 Some("Failing Tool")
1556 }
1557
1558 fn description(&self) -> &str {
1559 "A tool that always fails (for testing error handling)"
1560 }
1561
1562 fn parameters_schema(&self) -> Value {
1563 serde_json::json!({
1564 "type": "object",
1565 "properties": {},
1566 "additionalProperties": false
1567 })
1568 }
1569
1570 fn hints(&self) -> ToolHints {
1571 ToolHints::default()
1572 .with_readonly(true)
1573 .with_idempotent(true)
1574 }
1575
1576 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1577 if self.use_internal_error {
1578 ToolExecutionResult::internal_error_msg(&self.error_message)
1579 } else {
1580 ToolExecutionResult::tool_error(&self.error_message)
1581 }
1582 }
1583}
1584
1585#[cfg(test)]
1590mod tests {
1591 use super::*;
1592 use crate::capabilities::GetCurrentTimeTool;
1593 use crate::platform_store::PlatformStore;
1594 use crate::session_file::{FileInfo, FileStat, SessionFile};
1595 use crate::session_resource::{SessionResourceEntry, SessionResourceFilter};
1596 use crate::traits::{SessionFileSystem, SessionResourceRegistry, SessionScheduleStore};
1597 use crate::typed_id::{HarnessId, SessionId};
1598 use crate::{AgentId, KeyInfo, PlatformMessage, SecretInfo};
1599 use async_trait::async_trait;
1600 use std::sync::{
1601 Arc as StdArc, Mutex,
1602 atomic::{AtomicBool, Ordering},
1603 };
1604
1605 #[derive(Default)]
1606 struct TestBackgroundTool;
1607
1608 #[async_trait]
1609 impl BackgroundExecutableTool for TestBackgroundTool {
1610 async fn execute_background(
1611 &self,
1612 arguments: Value,
1613 _context: ToolContext,
1614 sink: Arc<dyn BackgroundEventSink>,
1615 ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
1616 sink.status("Waiting for test result")
1617 .await
1618 .map_err(ToolExecutionResult::internal_error)?;
1619 sink.output("stdout", "hello from background")
1620 .await
1621 .map_err(ToolExecutionResult::internal_error)?;
1622 sink.progress(BackgroundProgress {
1623 current: Some(1),
1624 total: Some(1),
1625 unit: Some("step".to_string()),
1626 label: Some("done".to_string()),
1627 })
1628 .await
1629 .map_err(ToolExecutionResult::internal_error)?;
1630
1631 Ok(BackgroundOutcome {
1632 summary: arguments["summary"].as_str().unwrap_or("done").to_string(),
1633 result: json!({"ok": true}),
1634 raw_output: None,
1635 })
1636 }
1637 }
1638
1639 #[async_trait]
1640 impl Tool for TestBackgroundTool {
1641 fn name(&self) -> &str {
1642 "test_background"
1643 }
1644
1645 fn display_name(&self) -> Option<&str> {
1646 Some("Test Background")
1647 }
1648
1649 fn description(&self) -> &str {
1650 "test tool"
1651 }
1652
1653 fn parameters_schema(&self) -> Value {
1654 json!({
1655 "type": "object",
1656 "properties": {
1657 "summary": { "type": "string" }
1658 }
1659 })
1660 }
1661
1662 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1663 ToolExecutionResult::tool_error("foreground unsupported")
1664 }
1665
1666 fn hints(&self) -> ToolHints {
1667 ToolHints::default().with_supports_background(true)
1668 }
1669
1670 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
1671 Some(self)
1672 }
1673 }
1674
1675 #[derive(Default)]
1676 struct TestFailingBackgroundTool;
1677
1678 #[async_trait]
1679 impl BackgroundExecutableTool for TestFailingBackgroundTool {
1680 async fn execute_background(
1681 &self,
1682 _arguments: Value,
1683 _context: ToolContext,
1684 sink: Arc<dyn BackgroundEventSink>,
1685 ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
1686 sink.status("Running failing test")
1687 .await
1688 .map_err(ToolExecutionResult::internal_error)?;
1689 sink.output("stderr", "background failed")
1690 .await
1691 .map_err(ToolExecutionResult::internal_error)?;
1692 Err(ToolExecutionResult::tool_error("boom"))
1693 }
1694 }
1695
1696 #[async_trait]
1697 impl Tool for TestFailingBackgroundTool {
1698 fn name(&self) -> &str {
1699 "test_background_fail"
1700 }
1701
1702 fn display_name(&self) -> Option<&str> {
1703 Some("Test Background Fail")
1704 }
1705
1706 fn description(&self) -> &str {
1707 "failing background test tool"
1708 }
1709
1710 fn parameters_schema(&self) -> Value {
1711 json!({
1712 "type": "object",
1713 "properties": {}
1714 })
1715 }
1716
1717 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1718 ToolExecutionResult::tool_error("foreground unsupported")
1719 }
1720
1721 fn hints(&self) -> ToolHints {
1722 ToolHints::default().with_supports_background(true)
1723 }
1724
1725 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
1726 Some(self)
1727 }
1728 }
1729
1730 #[derive(Default)]
1731 struct TestLargeOutputBackgroundTool;
1732
1733 #[async_trait]
1734 impl BackgroundExecutableTool for TestLargeOutputBackgroundTool {
1735 async fn execute_background(
1736 &self,
1737 _arguments: Value,
1738 _context: ToolContext,
1739 sink: Arc<dyn BackgroundEventSink>,
1740 ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
1741 let large_chunk = "x".repeat(MAX_BACKGROUND_OUTPUT_LOG_CHARS + 4096);
1742 sink.output("stdout", &large_chunk)
1743 .await
1744 .map_err(ToolExecutionResult::internal_error)?;
1745 Ok(BackgroundOutcome {
1746 summary: "large output complete".to_string(),
1747 result: json!({"ok": true}),
1748 raw_output: None,
1749 })
1750 }
1751 }
1752
1753 #[async_trait]
1754 impl Tool for TestLargeOutputBackgroundTool {
1755 fn name(&self) -> &str {
1756 "test_background_large_output"
1757 }
1758
1759 fn display_name(&self) -> Option<&str> {
1760 Some("Test Background Large Output")
1761 }
1762
1763 fn description(&self) -> &str {
1764 "background test tool with huge output"
1765 }
1766
1767 fn parameters_schema(&self) -> Value {
1768 json!({
1769 "type": "object",
1770 "properties": {}
1771 })
1772 }
1773
1774 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1775 ToolExecutionResult::tool_error("foreground unsupported")
1776 }
1777
1778 fn hints(&self) -> ToolHints {
1779 ToolHints::default().with_supports_background(true)
1780 }
1781
1782 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
1783 Some(self)
1784 }
1785 }
1786
1787 struct BlockingBackgroundTool {
1788 release: StdArc<AtomicBool>,
1789 }
1790
1791 #[async_trait]
1792 impl BackgroundExecutableTool for BlockingBackgroundTool {
1793 async fn execute_background(
1794 &self,
1795 _arguments: Value,
1796 _context: ToolContext,
1797 sink: Arc<dyn BackgroundEventSink>,
1798 ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
1799 sink.status("Blocking until released")
1800 .await
1801 .map_err(ToolExecutionResult::internal_error)?;
1802 while !self.release.load(Ordering::SeqCst) {
1803 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1804 }
1805 Ok(BackgroundOutcome {
1806 summary: "released".to_string(),
1807 result: json!({"ok": true}),
1808 raw_output: None,
1809 })
1810 }
1811 }
1812
1813 #[async_trait]
1814 impl Tool for BlockingBackgroundTool {
1815 fn name(&self) -> &str {
1816 "test_background_blocking"
1817 }
1818
1819 fn display_name(&self) -> Option<&str> {
1820 Some("Test Background Blocking")
1821 }
1822
1823 fn description(&self) -> &str {
1824 "background test tool that waits for test release"
1825 }
1826
1827 fn parameters_schema(&self) -> Value {
1828 json!({
1829 "type": "object",
1830 "properties": {}
1831 })
1832 }
1833
1834 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1835 ToolExecutionResult::tool_error("foreground unsupported")
1836 }
1837
1838 fn hints(&self) -> ToolHints {
1839 ToolHints::default().with_supports_background(true)
1840 }
1841
1842 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
1843 Some(self)
1844 }
1845 }
1846
1847 #[derive(Default)]
1848 struct TestSessionResourceRegistry {
1849 entries: Mutex<HashMap<String, SessionResourceEntry>>,
1850 }
1851
1852 #[async_trait]
1853 impl crate::traits::SessionResourceRegistry for TestSessionResourceRegistry {
1854 async fn register(
1855 &self,
1856 entry: RegisterSessionResource,
1857 ) -> crate::Result<SessionResourceEntry> {
1858 let stored = SessionResourceEntry {
1859 resource_id: entry.resource_id.clone(),
1860 session_id: entry.session_id,
1861 kind: entry.kind,
1862 display_name: entry.display_name,
1863 status: entry.status,
1864 metadata: entry.metadata,
1865 created_at: chrono::Utc::now(),
1866 updated_at: chrono::Utc::now(),
1867 };
1868 self.entries
1869 .lock()
1870 .unwrap()
1871 .insert(entry.resource_id, stored.clone());
1872 Ok(stored)
1873 }
1874
1875 async fn update_status(
1876 &self,
1877 _session_id: SessionId,
1878 resource_id: &str,
1879 status: SessionResourceStatus,
1880 ) -> crate::Result<Option<SessionResourceEntry>> {
1881 let mut entries = self.entries.lock().unwrap();
1882 if let Some(entry) = entries.get_mut(resource_id) {
1883 entry.status = status;
1884 entry.updated_at = chrono::Utc::now();
1885 return Ok(Some(entry.clone()));
1886 }
1887 Ok(None)
1888 }
1889
1890 async fn get(
1891 &self,
1892 _session_id: SessionId,
1893 resource_id: &str,
1894 ) -> crate::Result<Option<SessionResourceEntry>> {
1895 Ok(self.entries.lock().unwrap().get(resource_id).cloned())
1896 }
1897
1898 async fn list(
1899 &self,
1900 session_id: SessionId,
1901 filter: Option<&SessionResourceFilter>,
1902 ) -> crate::Result<Vec<SessionResourceEntry>> {
1903 Ok(self
1904 .entries
1905 .lock()
1906 .unwrap()
1907 .values()
1908 .filter(|entry| entry.session_id == session_id)
1909 .filter(|entry| {
1910 filter.is_none_or(|filter| {
1911 filter.kind.as_ref().is_none_or(|kind| &entry.kind == kind)
1912 && filter.status.is_none_or(|status| entry.status == status)
1913 })
1914 })
1915 .cloned()
1916 .collect())
1917 }
1918
1919 async fn deregister(
1920 &self,
1921 _session_id: SessionId,
1922 resource_id: &str,
1923 ) -> crate::Result<bool> {
1924 Ok(self.entries.lock().unwrap().remove(resource_id).is_some())
1925 }
1926 }
1927
1928 #[derive(Default)]
1929 struct TestFileStore {
1930 files: Mutex<HashMap<String, SessionFile>>,
1931 }
1932
1933 #[async_trait]
1934 impl crate::traits::SessionFileSystem for TestFileStore {
1935 async fn read_file(
1936 &self,
1937 _session_id: SessionId,
1938 path: &str,
1939 ) -> crate::Result<Option<SessionFile>> {
1940 Ok(self.files.lock().unwrap().get(path).cloned())
1941 }
1942
1943 async fn write_file(
1944 &self,
1945 session_id: SessionId,
1946 path: &str,
1947 content: &str,
1948 encoding: &str,
1949 ) -> crate::Result<SessionFile> {
1950 let now = chrono::Utc::now();
1951 let file = SessionFile {
1952 id: uuid::Uuid::now_v7(),
1953 session_id: session_id.uuid(),
1954 path: path.to_string(),
1955 name: FileInfo::name_from_path(path),
1956 content: Some(content.to_string()),
1957 encoding: encoding.to_string(),
1958 is_directory: false,
1959 is_readonly: false,
1960 size_bytes: content.len() as i64,
1961 created_at: now,
1962 updated_at: now,
1963 };
1964 self.files
1965 .lock()
1966 .unwrap()
1967 .insert(path.to_string(), file.clone());
1968 Ok(file)
1969 }
1970
1971 async fn delete_file(
1972 &self,
1973 _session_id: SessionId,
1974 _path: &str,
1975 _recursive: bool,
1976 ) -> crate::Result<bool> {
1977 Ok(false)
1978 }
1979
1980 async fn list_directory(
1981 &self,
1982 _session_id: SessionId,
1983 _path: &str,
1984 ) -> crate::Result<Vec<FileInfo>> {
1985 Ok(Vec::new())
1986 }
1987
1988 async fn stat_file(
1989 &self,
1990 _session_id: SessionId,
1991 path: &str,
1992 ) -> crate::Result<Option<FileStat>> {
1993 let file = self.files.lock().unwrap().get(path).cloned();
1994 Ok(file.map(|entry| FileStat {
1995 path: entry.path,
1996 name: entry.name,
1997 is_directory: entry.is_directory,
1998 is_readonly: entry.is_readonly,
1999 size_bytes: entry.size_bytes,
2000 created_at: entry.created_at,
2001 updated_at: entry.updated_at,
2002 }))
2003 }
2004
2005 async fn grep_files(
2006 &self,
2007 _session_id: SessionId,
2008 _pattern: &str,
2009 _path_pattern: Option<&str>,
2010 ) -> crate::Result<Vec<crate::session_file::GrepMatch>> {
2011 Ok(Vec::new())
2012 }
2013
2014 async fn create_directory(
2015 &self,
2016 session_id: SessionId,
2017 path: &str,
2018 ) -> crate::Result<FileInfo> {
2019 let now = chrono::Utc::now();
2020 let id = uuid::Uuid::now_v7();
2021 let dir = SessionFile {
2022 id,
2023 session_id: session_id.uuid(),
2024 path: path.to_string(),
2025 name: FileInfo::name_from_path(path),
2026 content: None,
2027 encoding: "text".to_string(),
2028 is_directory: true,
2029 is_readonly: false,
2030 size_bytes: 0,
2031 created_at: now,
2032 updated_at: now,
2033 };
2034 self.files.lock().unwrap().insert(path.to_string(), dir);
2035 Ok(FileInfo {
2036 id,
2037 session_id: session_id.uuid(),
2038 path: path.to_string(),
2039 name: FileInfo::name_from_path(path),
2040 is_directory: true,
2041 is_readonly: false,
2042 size_bytes: 0,
2043 created_at: now,
2044 updated_at: now,
2045 })
2046 }
2047 }
2048
2049 #[derive(Default)]
2050 struct TestPlatformStore {
2051 sent_messages: Mutex<Vec<String>>,
2052 }
2053
2054 #[async_trait]
2055 impl PlatformStore for TestPlatformStore {
2056 async fn list_harnesses(&self) -> crate::Result<Vec<crate::Harness>> {
2057 Ok(Vec::new())
2058 }
2059 async fn get_harness(&self, _id: HarnessId) -> crate::Result<Option<crate::Harness>> {
2060 Ok(None)
2061 }
2062 async fn create_harness(
2063 &self,
2064 _name: &str,
2065 _display_name: Option<&str>,
2066 _description: Option<&str>,
2067 _system_prompt: &str,
2068 _parent_harness_id: Option<HarnessId>,
2069 _capabilities: &[String],
2070 ) -> crate::Result<crate::Harness> {
2071 unreachable!()
2072 }
2073 async fn update_harness(
2074 &self,
2075 _id: HarnessId,
2076 _name: Option<&str>,
2077 _display_name: Option<&str>,
2078 _description: Option<&str>,
2079 _system_prompt: Option<&str>,
2080 _parent_harness_id: Option<Option<HarnessId>>,
2081 ) -> crate::Result<crate::Harness> {
2082 unreachable!()
2083 }
2084 async fn delete_harness(&self, _id: HarnessId) -> crate::Result<()> {
2085 Ok(())
2086 }
2087 async fn copy_harness(
2088 &self,
2089 _id: HarnessId,
2090 _new_name: Option<&str>,
2091 ) -> crate::Result<crate::Harness> {
2092 unreachable!()
2093 }
2094 async fn list_agents(&self) -> crate::Result<Vec<crate::Agent>> {
2095 Ok(Vec::new())
2096 }
2097 async fn get_agent_by_id(&self, _id: AgentId) -> crate::Result<Option<crate::Agent>> {
2098 Ok(None)
2099 }
2100 async fn create_agent(
2101 &self,
2102 _name: &str,
2103 _display_name: Option<&str>,
2104 _description: Option<&str>,
2105 _system_prompt: &str,
2106 _capabilities: &[String],
2107 ) -> crate::Result<crate::Agent> {
2108 unreachable!()
2109 }
2110 async fn update_agent(
2111 &self,
2112 _id: AgentId,
2113 _name: Option<&str>,
2114 _display_name: Option<&str>,
2115 _description: Option<&str>,
2116 _system_prompt: Option<&str>,
2117 ) -> crate::Result<crate::Agent> {
2118 unreachable!()
2119 }
2120 async fn delete_agent(&self, _id: AgentId) -> crate::Result<()> {
2121 Ok(())
2122 }
2123 async fn list_apps(
2124 &self,
2125 _search: Option<&str>,
2126 _include_archived: bool,
2127 ) -> crate::Result<Vec<crate::App>> {
2128 Ok(Vec::new())
2129 }
2130 async fn get_app(&self, _id: crate::AppId) -> crate::Result<Option<crate::App>> {
2131 Ok(None)
2132 }
2133 async fn create_app(
2134 &self,
2135 _name: &str,
2136 _description: Option<&str>,
2137 _harness_id: HarnessId,
2138 _agent_id: Option<AgentId>,
2139 _agent_identity_id: Option<crate::AgentIdentityId>,
2140 _channel_type: Option<crate::ChannelType>,
2141 _channel_config: Option<&serde_json::Value>,
2142 ) -> crate::Result<crate::App> {
2143 unreachable!()
2144 }
2145 async fn update_app(
2146 &self,
2147 _id: crate::AppId,
2148 _name: Option<&str>,
2149 _description: Option<&str>,
2150 _harness_id: Option<HarnessId>,
2151 _agent_id: Option<AgentId>,
2152 _agent_identity_id: Option<Option<crate::AgentIdentityId>>,
2153 ) -> crate::Result<crate::App> {
2154 unreachable!()
2155 }
2156 async fn delete_app(&self, _id: crate::AppId) -> crate::Result<()> {
2157 Ok(())
2158 }
2159 async fn destroy_app(&self, _id: crate::AppId) -> crate::Result<()> {
2160 Ok(())
2161 }
2162 async fn publish_app(&self, _id: crate::AppId) -> crate::Result<crate::App> {
2163 unreachable!()
2164 }
2165 async fn unpublish_app(&self, _id: crate::AppId) -> crate::Result<crate::App> {
2166 unreachable!()
2167 }
2168 async fn add_app_channel(
2169 &self,
2170 _app_id: crate::AppId,
2171 _channel_type: crate::ChannelType,
2172 _channel_config: Option<&serde_json::Value>,
2173 _enabled: Option<bool>,
2174 ) -> crate::Result<crate::AppChannel> {
2175 unreachable!()
2176 }
2177 async fn update_app_channel(
2178 &self,
2179 _app_id: crate::AppId,
2180 _channel_id: crate::AppChannelId,
2181 _channel_type: Option<crate::ChannelType>,
2182 _channel_config: Option<&serde_json::Value>,
2183 _enabled: Option<bool>,
2184 ) -> crate::Result<crate::AppChannel> {
2185 unreachable!()
2186 }
2187 async fn delete_app_channel(
2188 &self,
2189 _app_id: crate::AppId,
2190 _channel_id: crate::AppChannelId,
2191 ) -> crate::Result<()> {
2192 Ok(())
2193 }
2194 async fn list_sessions(
2195 &self,
2196 _limit: Option<usize>,
2197 _agent_id: Option<AgentId>,
2198 ) -> crate::Result<Vec<crate::Session>> {
2199 Ok(Vec::new())
2200 }
2201 async fn create_session(
2202 &self,
2203 _harness_id: HarnessId,
2204 _agent_id: Option<AgentId>,
2205 _title: Option<&str>,
2206 _locale: Option<&str>,
2207 _blueprint_id: Option<&str>,
2208 _blueprint_config: Option<&serde_json::Value>,
2209 ) -> crate::Result<crate::Session> {
2210 unreachable!()
2211 }
2212 async fn get_session_by_id(&self, _id: SessionId) -> crate::Result<Option<crate::Session>> {
2213 Ok(None)
2214 }
2215 async fn get_session_context_report(
2216 &self,
2217 id: SessionId,
2218 ) -> crate::Result<crate::SessionContextReport> {
2219 Ok(crate::SessionContextReport {
2220 session_id: id.to_string(),
2221 model: "llmsim".to_string(),
2222 context_window_tokens: None,
2223 estimated_input_tokens: 0,
2224 sections: vec![],
2225 contributions: vec![],
2226 cumulative_usage: None,
2227 })
2228 }
2229 async fn set_subagent_metadata(
2230 &self,
2231 _session_id: SessionId,
2232 _parent_session_id: SessionId,
2233 _subagent_name: &str,
2234 _subagent_task: &str,
2235 _subagent_status: crate::session::SubagentStatus,
2236 ) -> crate::Result<crate::Session> {
2237 unreachable!()
2238 }
2239 async fn delete_session(&self, _id: SessionId) -> crate::Result<()> {
2240 Ok(())
2241 }
2242 async fn send_message(&self, _session_id: SessionId, content: &str) -> crate::Result<()> {
2243 self.sent_messages.lock().unwrap().push(content.to_string());
2244 Ok(())
2245 }
2246 async fn get_messages(
2247 &self,
2248 _session_id: SessionId,
2249 _limit: Option<usize>,
2250 ) -> crate::Result<Vec<PlatformMessage>> {
2251 Ok(Vec::new())
2252 }
2253 async fn wait_for_idle(
2254 &self,
2255 _session_id: SessionId,
2256 _timeout_secs: Option<u64>,
2257 ) -> crate::Result<String> {
2258 Ok("idle".to_string())
2259 }
2260 async fn list_capabilities(
2261 &self,
2262 _search: Option<&str>,
2263 ) -> crate::Result<Vec<crate::CapabilityInfo>> {
2264 Ok(Vec::new())
2265 }
2266 fn base_url(&self) -> &str {
2267 "http://localhost:9300"
2268 }
2269 }
2270
2271 #[derive(Default)]
2272 struct NoopStorageStore;
2273
2274 #[derive(Default)]
2275 struct TestScheduleStore {
2276 schedules: Mutex<Vec<crate::session_schedule::SessionSchedule>>,
2277 }
2278
2279 #[async_trait]
2280 impl crate::traits::SessionScheduleStore for TestScheduleStore {
2281 async fn create_schedule(
2282 &self,
2283 session_id: SessionId,
2284 description: String,
2285 cron_expression: Option<String>,
2286 scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
2287 timezone: String,
2288 ) -> crate::Result<crate::session_schedule::SessionSchedule> {
2289 let schedule = crate::session_schedule::SessionSchedule {
2290 id: crate::typed_id::ScheduleId::new(),
2291 session_id,
2292 owner_principal_id: crate::PrincipalId::from_seed(1),
2293 resolved_owner_user_id: None,
2294 owner: None,
2295 effective_owner: None,
2296 description,
2297 cron_expression: cron_expression.clone(),
2298 scheduled_at,
2299 timezone,
2300 enabled: true,
2301 schedule_type: crate::session_schedule::SessionSchedule::derive_type(
2302 &cron_expression,
2303 ),
2304 next_trigger_at: Some(chrono::Utc::now() + chrono::Duration::minutes(10)),
2305 last_triggered_at: None,
2306 trigger_count: 0,
2307 created_at: chrono::Utc::now(),
2308 updated_at: chrono::Utc::now(),
2309 };
2310 self.schedules.lock().unwrap().push(schedule.clone());
2311 Ok(schedule)
2312 }
2313
2314 async fn cancel_schedule(
2315 &self,
2316 _session_id: SessionId,
2317 schedule_id: crate::ScheduleId,
2318 ) -> crate::Result<crate::session_schedule::SessionSchedule> {
2319 let mut schedules = self.schedules.lock().unwrap();
2320 let schedule = schedules
2321 .iter_mut()
2322 .find(|schedule| schedule.id == schedule_id)
2323 .ok_or_else(|| crate::AgentLoopError::tool("Schedule not found".to_string()))?;
2324 schedule.enabled = false;
2325 Ok(schedule.clone())
2326 }
2327
2328 async fn list_schedules(
2329 &self,
2330 session_id: SessionId,
2331 ) -> crate::Result<Vec<crate::session_schedule::SessionSchedule>> {
2332 Ok(self
2333 .schedules
2334 .lock()
2335 .unwrap()
2336 .iter()
2337 .filter(|schedule| schedule.session_id == session_id)
2338 .cloned()
2339 .collect())
2340 }
2341
2342 async fn count_active_schedules(&self, session_id: SessionId) -> crate::Result<u32> {
2343 Ok(self
2344 .schedules
2345 .lock()
2346 .unwrap()
2347 .iter()
2348 .filter(|schedule| schedule.session_id == session_id && schedule.enabled)
2349 .count() as u32)
2350 }
2351 }
2352
2353 #[async_trait]
2354 impl crate::traits::SessionStorageStore for NoopStorageStore {
2355 async fn set_value(
2356 &self,
2357 _session_id: SessionId,
2358 _key: &str,
2359 _value: &str,
2360 ) -> crate::Result<()> {
2361 Ok(())
2362 }
2363 async fn get_value(
2364 &self,
2365 _session_id: SessionId,
2366 _key: &str,
2367 ) -> crate::Result<Option<String>> {
2368 Ok(None)
2369 }
2370 async fn delete_value(&self, _session_id: SessionId, _key: &str) -> crate::Result<bool> {
2371 Ok(false)
2372 }
2373 async fn list_keys(&self, _session_id: SessionId) -> crate::Result<Vec<KeyInfo>> {
2374 Ok(Vec::new())
2375 }
2376 async fn set_secret(
2377 &self,
2378 _session_id: SessionId,
2379 _name: &str,
2380 _value: &str,
2381 ) -> crate::Result<()> {
2382 Ok(())
2383 }
2384 async fn get_secret(
2385 &self,
2386 _session_id: SessionId,
2387 _name: &str,
2388 ) -> crate::Result<Option<String>> {
2389 Ok(None)
2390 }
2391 async fn delete_secret(&self, _session_id: SessionId, _name: &str) -> crate::Result<bool> {
2392 Ok(false)
2393 }
2394 async fn list_secrets(&self, _session_id: SessionId) -> crate::Result<Vec<SecretInfo>> {
2395 Ok(Vec::new())
2396 }
2397 }
2398
2399 #[tokio::test]
2400 async fn test_echo_tool() {
2401 let tool = EchoTool;
2402
2403 let result = tool
2404 .execute(serde_json::json!({"message": "Hello, world!"}))
2405 .await;
2406
2407 if let ToolExecutionResult::Success(value) = result {
2408 assert_eq!(
2409 value.get("echoed").unwrap().as_str().unwrap(),
2410 "Hello, world!"
2411 );
2412 assert_eq!(value.get("length").unwrap().as_u64().unwrap(), 13);
2413 } else {
2414 panic!("Expected success");
2415 }
2416 }
2417
2418 #[tokio::test]
2419 async fn test_failing_tool_with_tool_error() {
2420 let tool = FailingTool::with_tool_error("Something went wrong");
2421
2422 let result = tool.execute(serde_json::json!({})).await;
2423
2424 if let ToolExecutionResult::ToolError(msg) = result {
2425 assert_eq!(msg, "Something went wrong");
2426 } else {
2427 panic!("Expected tool error");
2428 }
2429 }
2430
2431 #[tokio::test]
2432 async fn test_failing_tool_with_internal_error() {
2433 let tool = FailingTool::with_internal_error("Database connection failed");
2434
2435 let result = tool.execute(serde_json::json!({})).await;
2436
2437 if let ToolExecutionResult::InternalError(err) = result {
2438 assert_eq!(err.message, "Database connection failed");
2439 } else {
2440 panic!("Expected internal error");
2441 }
2442 }
2443
2444 #[tokio::test]
2445 async fn test_tool_result_conversion() {
2446 let result = ToolExecutionResult::success(serde_json::json!({"value": 42}));
2448 let tool_result = result.into_tool_result("call_1", "test_tool");
2449 assert!(tool_result.error.is_none());
2450 assert_eq!(tool_result.result.unwrap()["value"], 42);
2451
2452 let result = ToolExecutionResult::tool_error("Invalid input");
2454 let tool_result = result.into_tool_result("call_2", "test_tool");
2455 assert_eq!(tool_result.error.as_deref(), Some("Invalid input"));
2456 assert_eq!(
2457 tool_result.result.unwrap(),
2458 serde_json::json!({"error": "Invalid input"})
2459 );
2460
2461 let result = ToolExecutionResult::internal_error_msg("Secret database error");
2463 let tool_result = result.into_tool_result("call_3", "test_tool");
2464 assert_eq!(
2465 tool_result.error.as_deref(),
2466 Some("An internal error occurred while executing the tool")
2467 );
2468 assert_eq!(
2469 tool_result.result.unwrap(),
2470 serde_json::json!({"error": "An internal error occurred while executing the tool"})
2471 );
2472 }
2473
2474 #[tokio::test]
2475 async fn test_tool_registry() {
2476 let mut registry = ToolRegistry::new();
2477 registry.register(GetCurrentTimeTool);
2478 registry.register(EchoTool);
2479
2480 assert_eq!(registry.len(), 2);
2481 assert!(registry.has("get_current_time"));
2482 assert!(registry.has("echo"));
2483 assert!(!registry.has("nonexistent"));
2484
2485 let definitions = registry.tool_definitions();
2486 assert_eq!(definitions.len(), 2);
2487 }
2488
2489 #[tokio::test]
2490 async fn test_tool_registry_builder() {
2491 let registry = ToolRegistry::builder()
2492 .tool(GetCurrentTimeTool)
2493 .tool(EchoTool)
2494 .build();
2495
2496 assert_eq!(registry.len(), 2);
2497 }
2498
2499 #[test]
2500 fn test_tool_display_name_in_definition() {
2501 let tool = GetCurrentTimeTool;
2503 assert_eq!(tool.display_name(), Some("Get Current Time"));
2504
2505 let def = tool.to_definition();
2506 assert_eq!(def.display_name(), Some("Get Current Time"));
2507 }
2508
2509 #[test]
2510 fn test_success_with_raw_output_object_preserves_shape() {
2511 let res = ToolExecutionResult::success_with_raw_output(
2512 serde_json::json!({"stdout": "hello"}),
2513 "raw stdout bytes".to_string(),
2514 );
2515 let tr = res.into_tool_result("call_1", "demo");
2516 assert_eq!(tr.result.as_ref().unwrap()["stdout"], "hello");
2517 assert!(
2518 tr.result
2519 .as_ref()
2520 .unwrap()
2521 .as_object()
2522 .unwrap()
2523 .get("_raw_output")
2524 .is_none(),
2525 "sidecar key must not leak to the LLM-visible result"
2526 );
2527 assert_eq!(tr.raw_output.as_deref(), Some("raw stdout bytes"));
2528 }
2529
2530 #[test]
2531 fn test_success_with_raw_output_scalar_unwraps_to_string() {
2532 let res = ToolExecutionResult::success_with_raw_output(
2533 "compact summary".to_string(),
2534 "full output bytes".to_string(),
2535 );
2536 let tr = res.into_tool_result("call_1", "demo");
2537 assert_eq!(
2538 tr.result,
2539 Some(serde_json::Value::String("compact summary".into()))
2540 );
2541 assert_eq!(tr.raw_output.as_deref(), Some("full output bytes"));
2542 }
2543
2544 #[test]
2545 fn test_success_result_with_raw_output_scalar_key_is_not_unwrapped() {
2546 let res = ToolExecutionResult::success(
2547 serde_json::json!({"_raw_output_scalar": "user_value", "kept": true}),
2548 );
2549 let tr = res.into_tool_result("call_1", "demo");
2550 assert_eq!(
2551 tr.result,
2552 Some(serde_json::json!({"_raw_output_scalar": "user_value", "kept": true}))
2553 );
2554 assert_eq!(tr.raw_output, None);
2555 }
2556
2557 #[test]
2558 fn test_success_result_with_only_raw_output_scalar_key_is_not_unwrapped() {
2559 let res = ToolExecutionResult::success(serde_json::json!({"_raw_output_scalar": "v"}));
2562 let tr = res.into_tool_result("call_1", "demo");
2563 assert_eq!(
2564 tr.result,
2565 Some(serde_json::json!({"_raw_output_scalar": "v"}))
2566 );
2567 assert_eq!(tr.raw_output, None);
2568 }
2569
2570 #[test]
2571 fn test_echo_tool_display_name() {
2572 let tool = EchoTool;
2573 assert_eq!(tool.display_name(), Some("Echo"));
2574
2575 let def = tool.to_definition();
2576 assert_eq!(def.display_name(), Some("Echo"));
2577 }
2578
2579 #[test]
2580 fn test_all_default_tools_have_display_names() {
2581 let registry = ToolRegistry::with_defaults();
2582 let definitions = registry.tool_definitions();
2583
2584 for def in &definitions {
2585 assert!(
2586 def.display_name().is_some(),
2587 "Tool '{}' should have a display_name",
2588 def.name()
2589 );
2590 }
2591 }
2592
2593 #[tokio::test]
2594 async fn test_tool_registry_as_executor() {
2595 let mut registry = ToolRegistry::new();
2596 registry.register(EchoTool);
2597
2598 let tool_call = ToolCall {
2599 id: "call_1".to_string(),
2600 name: "echo".to_string(),
2601 arguments: serde_json::json!({"message": "test"}),
2602 };
2603
2604 let tool_def = registry.get("echo").unwrap().to_definition();
2605 let result = registry.execute(&tool_call, &tool_def).await.unwrap();
2606
2607 assert!(result.error.is_none());
2608 assert_eq!(result.result.unwrap()["echoed"], "test");
2609 }
2610
2611 #[test]
2612 fn test_tool_to_definition() {
2613 let tool = GetCurrentTimeTool;
2614 let def = tool.to_definition();
2615
2616 let ToolDefinition::Builtin(builtin) = def else {
2617 panic!("expected Builtin variant");
2618 };
2619 assert_eq!(builtin.name, "get_current_time");
2620 assert_eq!(builtin.policy, ToolPolicy::Auto);
2621 }
2622
2623 #[test]
2624 fn test_with_defaults_has_expected_tools() {
2625 let registry = ToolRegistry::with_defaults();
2626
2627 assert!(
2629 registry.has("get_current_time"),
2630 "should have get_current_time"
2631 );
2632 assert!(registry.has("echo"), "should have echo");
2633 assert!(
2636 !registry.has("spawn_background"),
2637 "spawn_background must NOT be in defaults — it comes from the \
2638 background_execution capability"
2639 );
2640 assert!(
2641 registry.has("report_progress"),
2642 "should have report_progress"
2643 );
2644
2645 assert!(registry.has("add"), "should have add");
2647 assert!(registry.has("subtract"), "should have subtract");
2648 assert!(registry.has("multiply"), "should have multiply");
2649 assert!(registry.has("divide"), "should have divide");
2650
2651 assert!(registry.has("get_weather"), "should have get_weather");
2653 assert!(registry.has("get_forecast"), "should have get_forecast");
2654
2655 assert!(registry.has("write_todos"), "should have write_todos");
2657
2658 assert!(registry.has("read_file"), "should have read_file");
2660 assert!(registry.has("write_file"), "should have write_file");
2661 assert!(registry.has("edit_file"), "should have edit_file");
2662 assert!(registry.has("list_directory"), "should have list_directory");
2663 assert!(registry.has("grep_files"), "should have grep_files");
2664 assert!(registry.has("delete_file"), "should have delete_file");
2665 assert!(registry.has("stat_file"), "should have stat_file");
2666
2667 assert!(registry.has("web_fetch"), "should have web_fetch");
2669
2670 assert_eq!(registry.len(), 18, "should have 18 default tools");
2672 }
2673
2674 #[tokio::test]
2675 async fn test_with_defaults_tools_are_executable() {
2676 let registry = ToolRegistry::with_defaults();
2677
2678 let tool_call = ToolCall {
2680 id: "call_1".to_string(),
2681 name: "echo".to_string(),
2682 arguments: serde_json::json!({"message": "hello from defaults"}),
2683 };
2684
2685 let tool_def = registry.get("echo").unwrap().to_definition();
2686 let result = registry.execute(&tool_call, &tool_def).await.unwrap();
2687
2688 assert!(result.error.is_none());
2689 assert_eq!(result.result.unwrap()["echoed"], "hello from defaults");
2690 }
2691
2692 #[tokio::test]
2693 async fn test_with_defaults_math_tools() {
2694 let registry = ToolRegistry::with_defaults();
2695
2696 let tool_call = ToolCall {
2698 id: "call_add".to_string(),
2699 name: "add".to_string(),
2700 arguments: serde_json::json!({"a": 5, "b": 3}),
2701 };
2702
2703 let tool_def = registry.get("add").unwrap().to_definition();
2704 let result = registry.execute(&tool_call, &tool_def).await.unwrap();
2705
2706 assert!(result.error.is_none());
2707 assert_eq!(result.result.unwrap()["result"].as_f64().unwrap(), 8.0);
2709 }
2710
2711 #[test]
2715 fn test_with_defaults_excludes_capability_only_tools() {
2716 let registry = ToolRegistry::with_defaults();
2717
2718 assert!(
2720 !registry.has("bash"),
2721 "bash must not be in defaults — it comes from virtual_bash capability"
2722 );
2723 assert!(
2725 !registry.has("kv_store"),
2726 "kv_store must not be in defaults — it comes from session_storage capability"
2727 );
2728 assert!(
2732 !registry.has("spawn_background"),
2733 "spawn_background must not be in defaults — it comes from the \
2734 background_execution capability (auto-activated by tool hints)"
2735 );
2736 }
2737
2738 #[tokio::test]
2739 async fn test_spawn_background_executes_and_signals_session() {
2740 let session_id = SessionId::new();
2741 let resource_registry = Arc::new(TestSessionResourceRegistry::default());
2742 let file_store = Arc::new(TestFileStore::default());
2743 let platform_store = Arc::new(TestPlatformStore::default());
2744 let storage_store = Arc::new(NoopStorageStore);
2745 let tool_registry = ToolRegistry::builder()
2746 .tool(SpawnBackgroundTool)
2747 .tool(TestBackgroundTool)
2748 .build();
2749
2750 let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
2751 .with_tool_registry(Arc::new(tool_registry))
2752 .with_platform_store(platform_store.clone())
2753 .with_session_resource_registry(resource_registry.clone());
2754
2755 let tool = SpawnBackgroundTool;
2756 let result = tool
2757 .execute_with_context(
2758 json!({
2759 "tool": "test_background",
2760 "args": { "summary": "Background complete" }
2761 }),
2762 &context,
2763 )
2764 .await;
2765
2766 let ToolExecutionResult::Success(value) = result else {
2767 panic!("spawn_background should succeed");
2768 };
2769 let run_id = value["run_id"].as_str().unwrap().to_string();
2770
2771 tokio::time::timeout(std::time::Duration::from_secs(2), async {
2772 loop {
2773 let entry = resource_registry
2774 .get(session_id, &run_id)
2775 .await
2776 .unwrap()
2777 .expect("resource exists");
2778 if entry.status == SessionResourceStatus::Completed {
2779 break entry;
2780 }
2781 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2782 }
2783 })
2784 .await
2785 .expect("background run should complete");
2786
2787 let messages = platform_store.sent_messages.lock().unwrap().clone();
2788 assert_eq!(messages.len(), 1);
2789 assert!(messages[0].contains("Background run completed"));
2790 assert!(messages[0].contains(&run_id));
2791
2792 let log_file = file_store
2793 .read_file(session_id, &format!("/.background/{run_id}/output.log"))
2794 .await
2795 .unwrap()
2796 .expect("log file");
2797 assert!(
2798 log_file
2799 .content
2800 .as_deref()
2801 .unwrap_or_default()
2802 .contains("hello from background")
2803 );
2804 }
2805
2806 #[tokio::test]
2807 async fn test_spawn_background_persists_failure_artifacts() {
2808 let session_id = SessionId::new();
2809 let resource_registry = Arc::new(TestSessionResourceRegistry::default());
2810 let file_store = Arc::new(TestFileStore::default());
2811 let storage_store = Arc::new(NoopStorageStore);
2812 let tool_registry = ToolRegistry::builder()
2813 .tool(SpawnBackgroundTool)
2814 .tool(TestFailingBackgroundTool)
2815 .build();
2816
2817 let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
2818 .with_tool_registry(Arc::new(tool_registry))
2819 .with_session_resource_registry(resource_registry.clone());
2820
2821 let result = SpawnBackgroundTool
2822 .execute_with_context(
2823 json!({
2824 "tool": "test_background_fail",
2825 "args": {}
2826 }),
2827 &context,
2828 )
2829 .await;
2830
2831 let ToolExecutionResult::Success(value) = result else {
2832 panic!("spawn_background should succeed");
2833 };
2834 let run_id = value["run_id"].as_str().unwrap().to_string();
2835
2836 tokio::time::timeout(std::time::Duration::from_secs(2), async {
2837 loop {
2838 let entry = resource_registry
2839 .get(session_id, &run_id)
2840 .await
2841 .unwrap()
2842 .expect("resource exists");
2843 if entry.status == SessionResourceStatus::Failed {
2844 break entry;
2845 }
2846 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2847 }
2848 })
2849 .await
2850 .expect("background run should fail");
2851
2852 let log_file = file_store
2853 .read_file(session_id, &format!("/.background/{run_id}/output.log"))
2854 .await
2855 .unwrap()
2856 .expect("log file");
2857 assert!(
2858 log_file
2859 .content
2860 .as_deref()
2861 .unwrap_or_default()
2862 .contains("background failed")
2863 );
2864
2865 let result_file = file_store
2866 .read_file(session_id, &format!("/.background/{run_id}/result.json"))
2867 .await
2868 .unwrap()
2869 .expect("result file");
2870 let result_json: Value =
2871 serde_json::from_str(result_file.content.as_deref().unwrap_or_default())
2872 .expect("valid json");
2873 assert_eq!(result_json["status"], "failed");
2874 assert_eq!(result_json["error"], "boom");
2875 }
2876
2877 #[tokio::test]
2878 async fn test_spawn_background_rejects_when_session_active_run_limit_reached() {
2879 let session_id = SessionId::new();
2880 let resource_registry = Arc::new(TestSessionResourceRegistry::default());
2881 let file_store = Arc::new(TestFileStore::default());
2882 let storage_store = Arc::new(NoopStorageStore);
2883 let release = StdArc::new(AtomicBool::new(false));
2884 let tool_registry = ToolRegistry::builder()
2885 .tool(SpawnBackgroundTool)
2886 .tool(BlockingBackgroundTool {
2887 release: release.clone(),
2888 })
2889 .build();
2890
2891 let context = ToolContext::with_stores(session_id, file_store, storage_store)
2892 .with_tool_registry(Arc::new(tool_registry))
2893 .with_session_resource_registry(resource_registry.clone());
2894
2895 let mut run_ids = Vec::new();
2896 for _ in 0..MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION {
2897 let result = SpawnBackgroundTool
2898 .execute_with_context(
2899 json!({
2900 "tool": "test_background_blocking",
2901 "args": {}
2902 }),
2903 &context,
2904 )
2905 .await;
2906
2907 let ToolExecutionResult::Success(value) = result else {
2908 panic!("background run below the session limit should start");
2909 };
2910 run_ids.push(value["run_id"].as_str().unwrap().to_string());
2911 }
2912
2913 tokio::time::timeout(std::time::Duration::from_secs(2), async {
2914 loop {
2915 let active_runs = resource_registry
2916 .list(
2917 session_id,
2918 Some(&SessionResourceFilter {
2919 kind: Some("background_run".to_string()),
2920 status: Some(SessionResourceStatus::Active),
2921 }),
2922 )
2923 .await
2924 .unwrap();
2925 if active_runs.len() == MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION {
2926 break;
2927 }
2928 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2929 }
2930 })
2931 .await
2932 .expect("background runs should become active");
2933
2934 let result = SpawnBackgroundTool
2935 .execute_with_context(
2936 json!({
2937 "tool": "test_background_blocking",
2938 "args": {}
2939 }),
2940 &context,
2941 )
2942 .await;
2943
2944 let ToolExecutionResult::ToolError(message) = result else {
2945 release.store(true, Ordering::SeqCst);
2946 panic!("spawn_background should reject once the session limit is reached");
2947 };
2948 assert!(message.contains("active background runs per session"));
2949
2950 release.store(true, Ordering::SeqCst);
2951 tokio::time::timeout(std::time::Duration::from_secs(2), async {
2952 for run_id in run_ids {
2953 loop {
2954 let entry = resource_registry
2955 .get(session_id, &run_id)
2956 .await
2957 .unwrap()
2958 .expect("resource exists");
2959 if entry.status == SessionResourceStatus::Completed {
2960 break;
2961 }
2962 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2963 }
2964 }
2965 })
2966 .await
2967 .expect("blocking background runs should complete after release");
2968 }
2969
2970 #[tokio::test]
2971 async fn test_spawn_background_requires_file_store() {
2972 let session_id = SessionId::new();
2973 let resource_registry = Arc::new(TestSessionResourceRegistry::default());
2974 let storage_store = Arc::new(NoopStorageStore);
2975 let tool_registry = ToolRegistry::builder()
2976 .tool(SpawnBackgroundTool)
2977 .tool(TestBackgroundTool)
2978 .build();
2979
2980 let context = ToolContext::with_storage_store(session_id, storage_store)
2981 .with_tool_registry(Arc::new(tool_registry))
2982 .with_session_resource_registry(resource_registry);
2983
2984 let result = SpawnBackgroundTool
2985 .execute_with_context(
2986 json!({
2987 "tool": "test_background",
2988 "args": {}
2989 }),
2990 &context,
2991 )
2992 .await;
2993
2994 let ToolExecutionResult::ToolError(message) = result else {
2995 panic!("spawn_background should reject missing file store");
2996 };
2997 assert!(message.contains("Session file store not available"));
2998 }
2999
3000 #[tokio::test]
3001 async fn test_spawn_background_caps_output_log_size() {
3002 let session_id = SessionId::new();
3003 let resource_registry = Arc::new(TestSessionResourceRegistry::default());
3004 let file_store = Arc::new(TestFileStore::default());
3005 let storage_store = Arc::new(NoopStorageStore);
3006 let tool_registry = ToolRegistry::builder()
3007 .tool(SpawnBackgroundTool)
3008 .tool(TestLargeOutputBackgroundTool)
3009 .build();
3010
3011 let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3012 .with_tool_registry(Arc::new(tool_registry))
3013 .with_session_resource_registry(resource_registry.clone());
3014
3015 let result = SpawnBackgroundTool
3016 .execute_with_context(
3017 json!({
3018 "tool": "test_background_large_output",
3019 "args": {}
3020 }),
3021 &context,
3022 )
3023 .await;
3024
3025 let ToolExecutionResult::Success(value) = result else {
3026 panic!("spawn_background should succeed");
3027 };
3028 let run_id = value["run_id"].as_str().unwrap().to_string();
3029
3030 tokio::time::timeout(std::time::Duration::from_secs(2), async {
3031 loop {
3032 let entry = resource_registry
3033 .get(session_id, &run_id)
3034 .await
3035 .unwrap()
3036 .expect("resource exists");
3037 if entry.status == SessionResourceStatus::Completed {
3038 break;
3039 }
3040 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3041 }
3042 })
3043 .await
3044 .expect("background run should complete");
3045
3046 let log_content = file_store
3047 .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3048 .await
3049 .unwrap()
3050 .expect("log file")
3051 .content
3052 .unwrap_or_default();
3053
3054 assert!(log_content.contains("[system] background output truncated"));
3055 assert!(log_content.chars().count() <= MAX_BACKGROUND_OUTPUT_LOG_CHARS + 128);
3056 }
3057
3058 #[tokio::test]
3059 async fn test_spawn_background_can_create_scheduled_monitor() {
3060 let session_id = SessionId::new();
3061 let schedule_store = Arc::new(TestScheduleStore::default());
3062 let storage_store = Arc::new(NoopStorageStore);
3063 let tool_registry = ToolRegistry::builder()
3064 .tool(SpawnBackgroundTool)
3065 .tool(TestBackgroundTool)
3066 .build();
3067
3068 let context = ToolContext::with_storage_store(session_id, storage_store)
3069 .with_tool_registry(Arc::new(tool_registry))
3070 .with_schedule_store(schedule_store.clone());
3071
3072 let result = SpawnBackgroundTool
3073 .execute_with_context(
3074 json!({
3075 "tool": "test_background",
3076 "title": "Watch PR 1319",
3077 "args": { "summary": "Background complete" },
3078 "schedule": {
3079 "cron_expression": "*/10 * * * *",
3080 "timezone": "America/Chicago"
3081 }
3082 }),
3083 &context,
3084 )
3085 .await;
3086
3087 let ToolExecutionResult::Success(value) = result else {
3088 panic!("spawn_background should create a schedule: {result:?}");
3089 };
3090
3091 assert_eq!(value["status"], "scheduled");
3092 assert_eq!(value["title"], "Watch PR 1319");
3093 assert_eq!(value["cron_expression"], "*/10 * * * *");
3094 assert_eq!(value["timezone"], "America/Chicago");
3095
3096 let schedules = schedule_store.list_schedules(session_id).await.unwrap();
3097 assert_eq!(schedules.len(), 1);
3098 assert_eq!(
3099 schedules[0].cron_expression.as_deref(),
3100 Some("*/10 * * * *")
3101 );
3102 assert!(schedules[0].description.contains("Monitor: Watch PR 1319"));
3103 assert!(
3104 schedules[0]
3105 .description
3106 .contains("\"summary\": \"Background complete\"")
3107 );
3108 }
3109
3110 #[tokio::test]
3111 async fn test_spawn_background_rejects_invalid_scheduled_at() {
3112 let session_id = SessionId::new();
3113 let storage_store = Arc::new(NoopStorageStore);
3114 let tool_registry = ToolRegistry::builder()
3115 .tool(SpawnBackgroundTool)
3116 .tool(TestBackgroundTool)
3117 .build();
3118 let context = ToolContext::with_storage_store(session_id, storage_store)
3119 .with_tool_registry(Arc::new(tool_registry));
3120
3121 let result = SpawnBackgroundTool
3122 .execute_with_context(
3123 json!({
3124 "tool": "test_background",
3125 "args": {},
3126 "schedule": {
3127 "scheduled_at": "tomorrow at noon"
3128 }
3129 }),
3130 &context,
3131 )
3132 .await;
3133
3134 let ToolExecutionResult::ToolError(message) = result else {
3135 panic!("spawn_background should reject invalid scheduled_at");
3136 };
3137 assert!(message.contains("scheduled_at must be RFC3339"));
3138 }
3139
3140 #[tokio::test]
3141 async fn test_spawn_background_rejects_ambiguous_schedule_shape() {
3142 let session_id = SessionId::new();
3143 let storage_store = Arc::new(NoopStorageStore);
3144 let tool_registry = ToolRegistry::builder()
3145 .tool(SpawnBackgroundTool)
3146 .tool(TestBackgroundTool)
3147 .build();
3148 let context = ToolContext::with_storage_store(session_id, storage_store)
3149 .with_tool_registry(Arc::new(tool_registry));
3150
3151 let result = SpawnBackgroundTool
3152 .execute_with_context(
3153 json!({
3154 "tool": "test_background",
3155 "args": {},
3156 "schedule": {
3157 "cron_expression": "*/10 * * * *",
3158 "scheduled_at": "2026-04-16T15:30:00Z"
3159 }
3160 }),
3161 &context,
3162 )
3163 .await;
3164
3165 let ToolExecutionResult::ToolError(message) = result else {
3166 panic!("spawn_background should reject ambiguous schedule shape");
3167 };
3168 assert!(message.contains("must not include both cron_expression and scheduled_at"));
3169 }
3170}