1use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use serde_json::{Value, json};
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use tracing::error;
19
20use crate::background::{
21 BackgroundEventSink, BackgroundExecutableTool, BackgroundOutcome, BackgroundProgress,
22};
23use crate::session_schedule::MAX_ACTIVE_SCHEDULES_PER_SESSION;
24use crate::tool_types::{
25 BuiltinTool, DeferrablePolicy, ToolCall, ToolDefinition, ToolHints, ToolPolicy, ToolResult,
26};
27use crate::traits::ToolContext;
28use crate::typed_id::SessionId;
29use tokio::sync::{OwnedSemaphorePermit, Semaphore};
30
31use crate::error::Result;
32use crate::traits::ToolExecutor;
33
34pub const MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION: usize = 5;
39
40const MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER: usize = 64;
45static ACTIVE_BACKGROUND_RUNS_PER_WORKER: Semaphore =
46 Semaphore::const_new(MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER);
47
48static SESSION_BACKGROUND_PERMITS: OnceLock<std::sync::Mutex<HashMap<SessionId, Arc<Semaphore>>>> =
54 OnceLock::new();
55
56struct SessionBackgroundPermit {
57 session_id: SessionId,
58 semaphore: Arc<Semaphore>,
59 permit: Option<OwnedSemaphorePermit>,
60}
61
62impl Drop for SessionBackgroundPermit {
63 fn drop(&mut self) {
64 drop(self.permit.take());
65
66 let Some(permits) = SESSION_BACKGROUND_PERMITS.get() else {
67 return;
68 };
69 let mut permits = permits.lock().unwrap();
70 let should_remove = permits.get(&self.session_id).is_some_and(|current| {
71 Arc::ptr_eq(current, &self.semaphore)
72 && self.semaphore.available_permits() == MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION
73 && Arc::strong_count(&self.semaphore) == 2
74 });
75 if should_remove {
76 permits.remove(&self.session_id);
77 }
78 }
79}
80
81fn try_acquire_session_background_permit(
82 session_id: SessionId,
83) -> std::result::Result<SessionBackgroundPermit, tokio::sync::TryAcquireError> {
84 let permits = SESSION_BACKGROUND_PERMITS.get_or_init(Default::default);
85 let mut permits = permits.lock().unwrap();
86 let semaphore = permits
87 .entry(session_id)
88 .or_insert_with(|| Arc::new(Semaphore::new(MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION)))
89 .clone();
90 let permit = semaphore.clone().try_acquire_owned()?;
91
92 Ok(SessionBackgroundPermit {
93 session_id,
94 semaphore,
95 permit: Some(permit),
96 })
97}
98
99#[cfg(test)]
100fn has_session_background_permits(session_id: SessionId) -> bool {
101 SESSION_BACKGROUND_PERMITS
102 .get()
103 .and_then(|permits| permits.lock().unwrap().get(&session_id).cloned())
104 .is_some()
105}
106
107#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ToolResultImage {
117 pub base64: String,
119 pub media_type: String,
121}
122
123#[derive(Debug)]
139pub enum ToolExecutionResult {
140 Success(Value),
142
143 SuccessWithImages {
147 result: Value,
148 images: Vec<ToolResultImage>,
149 },
150
151 ToolError(String),
156
157 InternalError(ToolInternalError),
163
164 ConnectionRequired {
171 provider: String,
173 },
174}
175
176impl ToolExecutionResult {
177 pub fn success(value: impl Into<Value>) -> Self {
179 ToolExecutionResult::Success(value.into())
180 }
181
182 pub fn success_with_raw_output(value: impl Into<Value>, raw_output: String) -> Self {
185 let mut value = value.into();
186 match value.as_object_mut() {
190 Some(obj) => {
191 obj.insert("_raw_output".to_string(), Value::String(raw_output));
192 }
193 None => {
194 value = serde_json::json!({
195 "_raw_output_scalar": value,
196 "_raw_output": raw_output,
197 });
198 }
199 }
200 ToolExecutionResult::Success(value)
201 }
202
203 pub fn success_with_images(value: impl Into<Value>, images: Vec<ToolResultImage>) -> Self {
205 ToolExecutionResult::SuccessWithImages {
206 result: value.into(),
207 images,
208 }
209 }
210
211 pub fn tool_error(message: impl Into<String>) -> Self {
213 ToolExecutionResult::ToolError(message.into())
214 }
215
216 pub fn internal_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
218 ToolExecutionResult::InternalError(ToolInternalError::new(error))
219 }
220
221 pub fn internal_error_msg(message: impl Into<String>) -> Self {
223 ToolExecutionResult::InternalError(ToolInternalError::from_message(message))
224 }
225
226 pub fn connection_required(provider: impl Into<String>) -> Self {
228 ToolExecutionResult::ConnectionRequired {
229 provider: provider.into(),
230 }
231 }
232
233 pub fn is_success(&self) -> bool {
235 matches!(
236 self,
237 ToolExecutionResult::Success(_) | ToolExecutionResult::SuccessWithImages { .. }
238 )
239 }
240
241 pub fn is_error(&self) -> bool {
243 matches!(
244 self,
245 ToolExecutionResult::ToolError(_) | ToolExecutionResult::InternalError(_)
246 )
247 }
248
249 pub fn is_connection_required(&self) -> bool {
251 matches!(self, ToolExecutionResult::ConnectionRequired { .. })
252 }
253
254 pub fn into_tool_result(self, tool_call_id: &str, tool_name: &str) -> ToolResult {
262 match self {
263 ToolExecutionResult::Success(mut value) => {
264 let raw_output = value
266 .as_object_mut()
267 .and_then(|obj| obj.remove("_raw_output"))
268 .and_then(|v| v.as_str().map(|s| s.to_string()));
269 let result_value = if let Some(obj) = value.as_object_mut() {
272 let is_scalar_carrier = raw_output.is_some()
273 && obj.len() == 1
274 && obj.contains_key("_raw_output_scalar");
275 if is_scalar_carrier {
276 obj.remove("_raw_output_scalar").unwrap_or(Value::Null)
277 } else {
278 value
279 }
280 } else {
281 value
282 };
283 ToolResult {
284 tool_call_id: tool_call_id.to_string(),
285 result: Some(result_value),
286 images: None,
287 error: None,
288 connection_required: None,
289 raw_output,
290 }
291 }
292 ToolExecutionResult::SuccessWithImages { result, images } => ToolResult {
293 tool_call_id: tool_call_id.to_string(),
294 result: Some(result),
295 images: if images.is_empty() {
296 None
297 } else {
298 Some(images)
299 },
300 error: None,
301 connection_required: None,
302 raw_output: None,
303 },
304 ToolExecutionResult::ToolError(message) => ToolResult {
305 tool_call_id: tool_call_id.to_string(),
306 result: Some(serde_json::json!({ "error": &message })),
307 images: None,
308 error: Some(message),
309 connection_required: None,
310 raw_output: None,
311 },
312 ToolExecutionResult::InternalError(err) => {
313 error!(
315 tool_name = %tool_name,
316 tool_call_id = %tool_call_id,
317 error = %err.message,
318 error_chain = %err.chain_string(),
319 "Tool internal error (details hidden from LLM)"
320 );
321
322 let generic_msg = "An internal error occurred while executing the tool";
324 ToolResult {
325 tool_call_id: tool_call_id.to_string(),
326 result: Some(serde_json::json!({
327 "error": generic_msg
328 })),
329 images: None,
330 error: Some(generic_msg.to_string()),
331 connection_required: None,
332 raw_output: None,
333 }
334 }
335 ToolExecutionResult::ConnectionRequired { ref provider } => ToolResult {
336 tool_call_id: tool_call_id.to_string(),
337 result: Some(serde_json::json!({
338 "connection_required": provider,
339 })),
340 images: None,
341 error: None,
342 connection_required: Some(provider.clone()),
343 raw_output: None,
344 },
345 }
346 }
347}
348
349#[derive(Debug)]
351pub struct ToolInternalError {
352 pub message: String,
354 pub source: Option<Box<dyn std::error::Error + Send + Sync>>,
356}
357
358impl ToolInternalError {
359 pub fn new(error: impl std::error::Error + Send + Sync + 'static) -> Self {
361 Self {
362 message: error.to_string(),
363 source: Some(Box::new(error)),
364 }
365 }
366
367 pub fn from_message(message: impl Into<String>) -> Self {
369 Self {
370 message: message.into(),
371 source: None,
372 }
373 }
374
375 pub fn chain_string(&self) -> String {
376 let mut parts = vec![self.message.clone()];
377 let mut current = <Self as std::error::Error>::source(self);
378 while let Some(source) = current {
379 let message = source.to_string();
380 if parts.last() != Some(&message) {
381 parts.push(message);
382 }
383 current = source.source();
384 }
385 parts.join(": ")
386 }
387}
388
389impl std::fmt::Display for ToolInternalError {
390 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
391 write!(f, "{}", self.message)
392 }
393}
394
395impl std::error::Error for ToolInternalError {
396 fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
397 self.source
398 .as_ref()
399 .map(|e| e.as_ref() as &(dyn std::error::Error + 'static))
400 }
401}
402
403#[async_trait]
452pub trait Tool: Send + Sync {
453 fn name(&self) -> &str;
458
459 fn display_name(&self) -> Option<&str> {
466 None
467 }
468
469 fn description(&self) -> &str;
474
475 fn parameters_schema(&self) -> Value;
481
482 async fn execute(&self, arguments: Value) -> ToolExecutionResult;
493
494 async fn execute_with_context(
511 &self,
512 arguments: Value,
513 _context: &ToolContext,
514 ) -> ToolExecutionResult {
515 self.execute(arguments).await
517 }
518
519 fn requires_context(&self) -> bool {
524 false
525 }
526
527 fn policy(&self) -> ToolPolicy {
532 ToolPolicy::Auto
533 }
534
535 fn hints(&self) -> ToolHints {
540 ToolHints::default()
541 }
542
543 fn narrate(
553 &self,
554 _tool_call: &crate::tool_types::ToolCall,
555 _phase: crate::tool_narration::ToolNarrationPhase,
556 _locale: Option<&str>,
557 ) -> Option<String> {
558 None
559 }
560
561 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
564 None
565 }
566
567 fn deferrable_policy(&self) -> DeferrablePolicy {
572 DeferrablePolicy::default()
573 }
574
575 fn to_definition(&self) -> ToolDefinition {
580 ToolDefinition::Builtin(BuiltinTool {
581 name: self.name().to_string(),
582 display_name: self.display_name().map(|s| s.to_string()),
583 description: self.description().to_string(),
584 parameters: self.parameters_schema(),
585 policy: self.policy(),
586 category: None,
587 deferrable: self.deferrable_policy(),
588 hints: self.hints(),
589 full_parameters: None,
590 })
591 }
592}
593
594#[derive(Default, Clone)]
621pub struct ToolRegistry {
622 tools: HashMap<String, Arc<dyn Tool>>,
623}
624
625impl ToolRegistry {
626 pub fn new() -> Self {
628 Self {
629 tools: HashMap::new(),
630 }
631 }
632
633 pub fn with_defaults() -> Self {
645 use crate::capabilities::{
646 AddTool, DeleteFileTool, DivideTool, EditFileTool, GetCurrentTimeTool, GetForecastTool,
647 GetWeatherTool, GrepFilesTool, ListDirectoryTool, MultiplyTool, ReadFileTool,
648 StatFileTool, SubtractTool, WebFetchTool, WriteFileTool, WriteTodosTool,
649 };
650 use crate::progress_reporting::ReportProgressTool;
651
652 ToolRegistry::builder()
653 .tool(GetCurrentTimeTool)
654 .tool(EchoTool)
655 .tool(ReportProgressTool)
665 .tool(AddTool)
667 .tool(SubtractTool)
668 .tool(MultiplyTool)
669 .tool(DivideTool)
670 .tool(GetWeatherTool)
672 .tool(GetForecastTool)
673 .tool(WriteTodosTool)
675 .tool(ReadFileTool)
677 .tool(WriteFileTool)
678 .tool(EditFileTool)
679 .tool(ListDirectoryTool)
680 .tool(GrepFilesTool)
681 .tool(DeleteFileTool)
682 .tool(StatFileTool)
683 .tool(WebFetchTool::default())
685 .build()
686 }
687
688 pub fn register(&mut self, tool: impl Tool + 'static) {
692 self.tools.insert(tool.name().to_string(), Arc::new(tool));
693 }
694
695 pub fn register_boxed(&mut self, tool: Box<dyn Tool>) {
697 self.tools.insert(tool.name().to_string(), Arc::from(tool));
698 }
699
700 pub fn register_arc(&mut self, tool: Arc<dyn Tool>) {
702 self.tools.insert(tool.name().to_string(), tool);
703 }
704
705 pub fn get(&self, name: &str) -> Option<&Arc<dyn Tool>> {
707 self.tools.get(name)
708 }
709
710 pub fn has(&self, name: &str) -> bool {
712 self.tools.contains_key(name)
713 }
714
715 pub fn len(&self) -> usize {
717 self.tools.len()
718 }
719
720 pub fn is_empty(&self) -> bool {
722 self.tools.is_empty()
723 }
724
725 pub fn tool_names(&self) -> Vec<&str> {
727 self.tools.keys().map(|s| s.as_str()).collect()
728 }
729
730 pub fn tool_definitions(&self) -> Vec<ToolDefinition> {
735 self.tools.values().map(|t| t.to_definition()).collect()
736 }
737
738 pub fn unregister(&mut self, name: &str) -> Option<Arc<dyn Tool>> {
740 self.tools.remove(name)
741 }
742
743 pub fn clear(&mut self) {
745 self.tools.clear();
746 }
747
748 pub fn builder() -> ToolRegistryBuilder {
750 ToolRegistryBuilder::new()
751 }
752}
753
754impl std::fmt::Debug for ToolRegistry {
755 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
756 f.debug_struct("ToolRegistry")
757 .field("tools", &self.tool_names())
758 .finish()
759 }
760}
761
762#[async_trait]
763impl ToolExecutor for ToolRegistry {
764 async fn execute(
765 &self,
766 tool_call: &ToolCall,
767 _tool_def: &ToolDefinition,
768 ) -> Result<ToolResult> {
769 let tool = self.tools.get(&tool_call.name).ok_or_else(|| {
770 crate::error::AgentLoopError::tool(format!("Tool not found: {}", tool_call.name))
771 })?;
772
773 let result = tool.execute(tool_call.arguments.clone()).await;
774 Ok(result.into_tool_result(&tool_call.id, &tool_call.name))
775 }
776
777 async fn execute_with_context(
778 &self,
779 tool_call: &ToolCall,
780 _tool_def: &ToolDefinition,
781 context: &ToolContext,
782 ) -> Result<ToolResult> {
783 let tool = self.tools.get(&tool_call.name).ok_or_else(|| {
784 crate::error::AgentLoopError::tool(format!("Tool not found: {}", tool_call.name))
785 })?;
786
787 let result = tool
790 .execute_with_context(tool_call.arguments.clone(), context)
791 .await;
792 Ok(result.into_tool_result(&tool_call.id, &tool_call.name))
793 }
794}
795
796pub struct ToolRegistryBuilder {
811 registry: ToolRegistry,
812}
813
814impl ToolRegistryBuilder {
815 pub fn new() -> Self {
817 Self {
818 registry: ToolRegistry::new(),
819 }
820 }
821
822 pub fn tool(mut self, tool: impl Tool + 'static) -> Self {
824 self.registry.register(tool);
825 self
826 }
827
828 pub fn tool_boxed(mut self, tool: Box<dyn Tool>) -> Self {
830 self.registry.register_boxed(tool);
831 self
832 }
833
834 pub fn tool_arc(mut self, tool: Arc<dyn Tool>) -> Self {
836 self.registry.register_arc(tool);
837 self
838 }
839
840 pub fn build(self) -> ToolRegistry {
842 self.registry
843 }
844}
845
846impl Default for ToolRegistryBuilder {
847 fn default() -> Self {
848 Self::new()
849 }
850}
851
852pub struct EchoTool;
858
859#[async_trait]
860impl Tool for EchoTool {
861 fn name(&self) -> &str {
862 "echo"
863 }
864
865 fn display_name(&self) -> Option<&str> {
866 Some("Echo")
867 }
868
869 fn description(&self) -> &str {
870 "Echo back the provided message. Useful for testing tool execution."
871 }
872
873 fn parameters_schema(&self) -> Value {
874 serde_json::json!({
875 "type": "object",
876 "properties": {
877 "message": {
878 "type": "string",
879 "description": "The message to echo back"
880 }
881 },
882 "required": ["message"],
883 "additionalProperties": false
884 })
885 }
886
887 fn hints(&self) -> ToolHints {
888 ToolHints::default()
889 .with_readonly(true)
890 .with_idempotent(true)
891 }
892
893 async fn execute(&self, arguments: Value) -> ToolExecutionResult {
894 let message = arguments
895 .get("message")
896 .and_then(|v| v.as_str())
897 .unwrap_or("");
898
899 ToolExecutionResult::success(serde_json::json!({
900 "echoed": message,
901 "length": message.len()
902 }))
903 }
904}
905
906pub struct SpawnBackgroundTool;
908
909#[derive(Debug, Clone)]
910struct BackgroundScheduleRequest {
911 cron_expression: Option<String>,
912 scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
913 timezone: String,
914}
915
916fn parse_background_schedule(
917 arguments: &Value,
918) -> std::result::Result<Option<BackgroundScheduleRequest>, String> {
919 let Some(schedule) = arguments.get("schedule") else {
920 return Ok(None);
921 };
922 let Some(schedule) = schedule.as_object() else {
923 return Err("schedule must be an object".to_string());
924 };
925
926 let cron_expression = schedule
927 .get("cron_expression")
928 .and_then(Value::as_str)
929 .map(str::trim)
930 .filter(|value| !value.is_empty())
931 .map(ToString::to_string);
932 let scheduled_at = match schedule.get("scheduled_at").and_then(Value::as_str) {
933 Some(value) => {
934 let value = value.trim();
935 if value.is_empty() {
936 None
937 } else {
938 Some(
939 chrono::DateTime::parse_from_rfc3339(value)
940 .map_err(|_| "scheduled_at must be RFC3339".to_string())?
941 .with_timezone(&chrono::Utc),
942 )
943 }
944 }
945 None => None,
946 };
947
948 match (cron_expression.is_some(), scheduled_at.is_some()) {
949 (false, false) => {
950 return Err(
951 "schedule must include exactly one of cron_expression (recurring) or scheduled_at (one-shot)"
952 .to_string(),
953 );
954 }
955 (true, true) => {
956 return Err(
957 "schedule must not include both cron_expression and scheduled_at; provide exactly one"
958 .to_string(),
959 );
960 }
961 _ => {}
962 }
963
964 let timezone = schedule
965 .get("timezone")
966 .and_then(Value::as_str)
967 .map(str::trim)
968 .filter(|value| !value.is_empty())
969 .unwrap_or("UTC")
970 .to_string();
971
972 Ok(Some(BackgroundScheduleRequest {
973 cron_expression,
974 scheduled_at,
975 timezone,
976 }))
977}
978
979fn build_background_schedule_description(
980 tool_name: &str,
981 tool_args: &Value,
982 title: &str,
983 signal_on_completion: bool,
984) -> String {
985 let payload = json!({
986 "tool": tool_name,
987 "title": title,
988 "signal_on_completion": signal_on_completion,
989 "args": tool_args,
990 });
991 let payload_json =
992 serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string());
993
994 format!(
995 "Monitor: {title}\n\n\
996This scheduled monitor fired. Start the background run now.\n\n\
997spawn_background payload:\n{payload_json}"
998 )
999}
1000
1001#[async_trait]
1002impl Tool for SpawnBackgroundTool {
1003 fn name(&self) -> &str {
1004 "spawn_background"
1005 }
1006
1007 fn display_name(&self) -> Option<&str> {
1008 Some("Spawn Background")
1009 }
1010
1011 fn description(&self) -> &str {
1012 "Run a background-capable built-in tool asynchronously. Returns immediately and signals the session when the background run completes."
1013 }
1014
1015 fn parameters_schema(&self) -> Value {
1016 json!({
1017 "type": "object",
1018 "properties": {
1019 "tool": {
1020 "type": "string",
1021 "description": "Name of the built-in tool to execute in the background"
1022 },
1023 "args": {
1024 "type": "object",
1025 "description": "Arguments to pass to the target tool"
1026 },
1027 "title": {
1028 "type": "string",
1029 "description": "Optional human-readable label for the background run"
1030 },
1031 "schedule": {
1032 "type": "object",
1033 "description": "Optional session schedule. When provided, this creates a scheduled monitor instead of starting the run immediately.",
1034 "properties": {
1035 "cron_expression": {
1036 "type": "string",
1037 "description": "Standard 5-field cron expression for recurring runs (e.g. '*/10 * * * *' for every 10 minutes)"
1038 },
1039 "scheduled_at": {
1040 "type": "string",
1041 "description": "ISO 8601 datetime for a one-shot run (e.g. '2026-04-16T15:30:00Z')"
1042 },
1043 "timezone": {
1044 "type": "string",
1045 "description": "IANA timezone for the schedule. Default: UTC"
1046 }
1047 },
1048 "additionalProperties": false
1049 },
1050 "signal_on_completion": {
1051 "type": "boolean",
1052 "description": "Send a synthetic user message back to the session when the run completes",
1053 "default": true
1054 }
1055 },
1056 "required": ["tool", "args"],
1057 "additionalProperties": false
1058 })
1059 }
1060
1061 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1062 ToolExecutionResult::tool_error(
1063 "spawn_background requires context. This tool must be executed with session context.",
1064 )
1065 }
1066
1067 async fn execute_with_context(
1068 &self,
1069 arguments: Value,
1070 context: &ToolContext,
1071 ) -> ToolExecutionResult {
1072 let tool_name = match arguments.get("tool").and_then(|v| v.as_str()) {
1073 Some(name) if !name.trim().is_empty() => name.trim(),
1074 _ => return ToolExecutionResult::tool_error("Missing required parameter: tool"),
1075 };
1076 let tool_args = match arguments.get("args") {
1077 Some(args) if args.is_object() => args.clone(),
1078 _ => {
1079 return ToolExecutionResult::tool_error(
1080 "Missing required parameter: args (object expected)",
1081 );
1082 }
1083 };
1084 let signal_on_completion = arguments
1085 .get("signal_on_completion")
1086 .and_then(|v| v.as_bool())
1087 .unwrap_or(true);
1088 let schedule_request = match parse_background_schedule(&arguments) {
1089 Ok(schedule) => schedule,
1090 Err(message) => return ToolExecutionResult::tool_error(message),
1091 };
1092
1093 let Some(tool_registry) = &context.tool_registry else {
1094 return ToolExecutionResult::tool_error(
1095 "Tool registry not available in this context. spawn_background requires worker-side tool execution.",
1096 );
1097 };
1098
1099 let Some(tool) = tool_registry.get(tool_name).cloned() else {
1100 return ToolExecutionResult::tool_error(format!("Unknown tool: {tool_name}"));
1101 };
1102 if tool_name == self.name() {
1103 return ToolExecutionResult::tool_error(
1104 "spawn_background cannot target itself recursively",
1105 );
1106 }
1107 if tool.hints().supports_background != Some(true) {
1108 return ToolExecutionResult::tool_error(format!(
1109 "Tool does not support background execution: {tool_name}"
1110 ));
1111 }
1112 if tool.as_background_executable().is_none() {
1113 return ToolExecutionResult::tool_error(format!(
1114 "Tool declared background support but has no background executor: {tool_name}"
1115 ));
1116 }
1117 let title = arguments
1118 .get("title")
1119 .and_then(|v| v.as_str())
1120 .map(str::trim)
1121 .filter(|s| !s.is_empty())
1122 .map(|s| s.to_string())
1123 .unwrap_or_else(|| {
1124 tool.display_name()
1125 .map(ToString::to_string)
1126 .unwrap_or_else(|| format!("Background {tool_name}"))
1127 });
1128
1129 if let Some(schedule_request) = schedule_request {
1130 let Some(schedule_store) = &context.schedule_store else {
1131 return ToolExecutionResult::tool_error(
1132 "Schedule store not available in this context. Scheduled monitors require session schedules.",
1133 );
1134 };
1135
1136 match schedule_store
1137 .count_active_schedules(context.session_id)
1138 .await
1139 {
1140 Ok(count) if count >= MAX_ACTIVE_SCHEDULES_PER_SESSION => {
1141 return ToolExecutionResult::tool_error(format!(
1142 "Maximum {MAX_ACTIVE_SCHEDULES_PER_SESSION} active schedules per session. Cancel an existing schedule first."
1143 ));
1144 }
1145 Err(err) => return ToolExecutionResult::internal_error(err),
1146 _ => {}
1147 }
1148
1149 let description = build_background_schedule_description(
1150 tool_name,
1151 &tool_args,
1152 &title,
1153 signal_on_completion,
1154 );
1155
1156 return match schedule_store
1157 .create_schedule(
1158 context.session_id,
1159 description,
1160 schedule_request.cron_expression.clone(),
1161 schedule_request.scheduled_at,
1162 schedule_request.timezone.clone(),
1163 )
1164 .await
1165 {
1166 Ok(schedule) => {
1167 let mut monitor_task_id: Option<String> = None;
1171 if let Some(ref task_registry) = context.session_task_registry {
1172 let spec = json!({
1173 "tool": tool_name,
1174 "arguments": &tool_args,
1175 "schedule_id": schedule.id.to_string(),
1176 "schedule_type": schedule.schedule_type,
1177 "cron_expression": schedule.cron_expression,
1178 "scheduled_at": schedule.scheduled_at,
1179 "timezone": schedule.timezone,
1180 "signal_on_completion": signal_on_completion,
1181 });
1182 match task_registry
1183 .create(crate::session_task::CreateSessionTask {
1184 session_id: context.session_id,
1185 id: None,
1186 kind: crate::session_task::TASK_KIND_MONITOR.to_string(),
1187 display_name: title.clone(),
1188 spec,
1189 state: crate::session_task::SessionTaskState::Running,
1190 links: crate::session_task::TaskLinks::default(),
1191 wake_policy: crate::session_task::TaskWakePolicy::Silent,
1194 })
1195 .await
1196 {
1197 Ok(task) => {
1198 monitor_task_id = Some(task.id);
1199 }
1200 Err(e) => {
1201 tracing::warn!(
1202 session_id = %context.session_id,
1203 schedule_id = %schedule.id,
1204 error = %e,
1205 "Failed to create monitor task for schedule (best-effort)"
1206 );
1207 }
1208 }
1209 }
1210 ToolExecutionResult::success(json!({
1211 "created": true,
1212 "status": "scheduled",
1213 "title": title,
1214 "tool": tool_name,
1215 "signal_on_completion": signal_on_completion,
1216 "schedule_id": schedule.id.to_string(),
1217 "schedule_type": schedule.schedule_type,
1218 "cron_expression": schedule.cron_expression,
1219 "scheduled_at": schedule.scheduled_at,
1220 "timezone": schedule.timezone,
1221 "next_trigger_at": schedule.next_trigger_at,
1222 "enabled": schedule.enabled,
1223 "task_id": monitor_task_id,
1224 }))
1225 }
1226 Err(err) => ToolExecutionResult::internal_error(err),
1227 };
1228 }
1229
1230 let Some(task_registry) = &context.session_task_registry else {
1231 return ToolExecutionResult::tool_error(
1232 "Session task registry not available in this context. Background runs require task tracking.",
1233 );
1234 };
1235 if context.file_store.is_none() {
1236 return ToolExecutionResult::tool_error(
1237 "Session file store not available in this context. spawn_background requires artifact persistence.",
1238 );
1239 }
1240
1241 let background_run_permit = match ACTIVE_BACKGROUND_RUNS_PER_WORKER.try_acquire() {
1242 Ok(permit) => permit,
1243 Err(_) => {
1244 return ToolExecutionResult::tool_error(format!(
1245 "Worker is already running the maximum {MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER} active background runs. Try again after an existing run finishes."
1246 ));
1247 }
1248 };
1249
1250 let session_run_permit = match try_acquire_session_background_permit(context.session_id) {
1251 Ok(permit) => permit,
1252 Err(_) => {
1253 return ToolExecutionResult::tool_error(format!(
1254 "Maximum {MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION} active background runs per session. Wait for an existing run to finish before starting another."
1255 ));
1256 }
1257 };
1258
1259 let run_id = format!("bg_{}", uuid::Uuid::now_v7().simple());
1260 let artifact_dir = format!("/.background/{run_id}");
1261 let log_path = format!("{artifact_dir}/output.log");
1262 let result_path = format!("{artifact_dir}/result.json");
1263
1264 let (task_id, task_attempt): (Option<String>, i32) = match task_registry
1267 .create(crate::session_task::CreateSessionTask {
1268 session_id: context.session_id,
1269 id: None,
1270 kind: crate::session_task::TASK_KIND_BACKGROUND_TOOL.to_string(),
1271 display_name: title.clone(),
1272 spec: json!({
1273 "tool": tool_name,
1274 "arguments": &tool_args,
1275 "reattachable": tool.hints().idempotent.unwrap_or(false)
1278 || tool.hints().readonly.unwrap_or(false),
1279 "signal_on_completion": signal_on_completion,
1281 }),
1282 state: crate::session_task::SessionTaskState::Running,
1283 links: crate::session_task::TaskLinks::default(),
1284 wake_policy: crate::session_task::TaskWakePolicy::Silent,
1285 })
1286 .await
1287 {
1288 Ok(task) => (Some(task.id), task.attempt),
1289 Err(e) => {
1290 return ToolExecutionResult::internal_error_msg(format!(
1291 "Failed to create background run task: {e}"
1292 ));
1293 }
1294 };
1295
1296 let background_context = context.clone().with_tool_registry(tool_registry.clone());
1297 let sink = Arc::new(SessionBackgroundSink::new(
1298 background_context.clone(),
1299 run_id.clone(),
1300 title.clone(),
1301 tool_name.to_string(),
1302 log_path.clone(),
1303 result_path.clone(),
1304 signal_on_completion,
1305 task_id.clone(),
1306 ));
1307 let run_id_for_task = run_id.clone();
1308 let tool_for_task = tool.clone();
1309 let tool_name_for_task = tool_name.to_string();
1310
1311 let cancel_registry = context.session_task_registry.clone();
1313 let cancel_session_id = context.session_id;
1314 let cancel_task_id = task_id.clone();
1315 let cancel_task_attempt = task_attempt;
1317
1318 tokio::spawn(async move {
1319 let _background_run_permit = background_run_permit;
1320 let _session_run_permit = session_run_permit;
1321 let _ = sink.status("Starting").await;
1322
1323 let outcome: std::result::Result<BackgroundOutcome, ToolExecutionResult> = match (
1331 cancel_registry.as_ref(),
1332 cancel_task_id.as_deref(),
1333 ) {
1334 (Some(registry), Some(task_id_str)) => {
1335 let registry = registry.clone();
1336 let task_id_str = task_id_str.to_string();
1337 let tool_fut = async {
1338 match tool_for_task.as_background_executable() {
1339 Some(background_tool) => {
1340 background_tool
1341 .execute_background(tool_args, background_context, sink.clone())
1342 .await
1343 }
1344 None => Err(ToolExecutionResult::tool_error(format!(
1345 "Tool declared background support but has no background executor: {}",
1346 tool_name_for_task
1347 ))),
1348 }
1349 };
1350 let watch_fut = async {
1351 loop {
1352 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1353 let _ = registry
1357 .update(
1358 cancel_session_id,
1359 &task_id_str,
1360 crate::session_task::SessionTaskUpdate {
1361 heartbeat_at: Some(chrono::Utc::now()),
1362 expected_attempt: Some(cancel_task_attempt),
1363 ..Default::default()
1364 },
1365 )
1366 .await;
1367 if let Ok(Some(task)) =
1369 registry.get(cancel_session_id, &task_id_str).await
1370 && task.cancel_requested_at.is_some()
1371 {
1372 break;
1373 }
1374 }
1375 };
1376 tokio::select! {
1377 result = tool_fut => result,
1378 () = watch_fut => {
1379 Err(ToolExecutionResult::ToolError(
1381 BACKGROUND_CANCEL_SENTINEL.to_string(),
1382 ))
1383 }
1384 }
1385 }
1386 _ => match tool_for_task.as_background_executable() {
1388 Some(background_tool) => {
1389 background_tool
1390 .execute_background(tool_args, background_context, sink.clone())
1391 .await
1392 }
1393 None => Err(ToolExecutionResult::tool_error(format!(
1394 "Tool declared background support but has no background executor: {}",
1395 tool_name_for_task
1396 ))),
1397 },
1398 };
1399
1400 let finalize_result = if is_canceled_outcome(&outcome) {
1403 sink.finalize_canceled().await
1404 } else {
1405 sink.finalize(outcome).await
1406 };
1407 if let Err(err) = finalize_result {
1408 tracing::warn!(
1409 run_id = run_id_for_task,
1410 error = %err,
1411 "Background run finalization failed"
1412 );
1413 }
1414 });
1415
1416 ToolExecutionResult::success(json!({
1417 "run_id": run_id,
1418 "resource_id": run_id,
1419 "task_id": task_id,
1420 "title": title,
1421 "tool": tool_name,
1422 "status": "running",
1423 "signal_on_completion": signal_on_completion,
1424 "artifact_dir": artifact_dir,
1425 "log_path": log_path,
1426 "result_path": result_path
1427 }))
1428 }
1429
1430 fn requires_context(&self) -> bool {
1431 true
1432 }
1433}
1434
1435#[derive(Debug, Default)]
1436struct SessionBackgroundState {
1437 status_text: String,
1438 progress: Option<BackgroundProgress>,
1439 output_tail: String,
1440 output_log: String,
1441 output_log_chars: usize,
1442 output_log_truncated: bool,
1443}
1444
1445const MAX_BACKGROUND_OUTPUT_LOG_CHARS: usize = 256 * 1024;
1446
1447struct SessionBackgroundSink {
1448 context: ToolContext,
1449 run_id: String,
1450 display_name: String,
1451 tool_name: String,
1452 log_path: String,
1453 result_path: String,
1454 signal_on_completion: bool,
1455 task_id: Option<String>,
1457 state: tokio::sync::Mutex<SessionBackgroundState>,
1458}
1459
1460impl SessionBackgroundSink {
1461 #[allow(clippy::too_many_arguments)]
1462 fn new(
1463 context: ToolContext,
1464 run_id: String,
1465 display_name: String,
1466 tool_name: String,
1467 log_path: String,
1468 result_path: String,
1469 signal_on_completion: bool,
1470 task_id: Option<String>,
1471 ) -> Self {
1472 Self {
1473 context,
1474 run_id,
1475 display_name,
1476 tool_name,
1477 log_path,
1478 result_path,
1479 signal_on_completion,
1480 task_id,
1481 state: tokio::sync::Mutex::new(SessionBackgroundState {
1482 status_text: "Queued".to_string(),
1483 ..Default::default()
1484 }),
1485 }
1486 }
1487
1488 async fn mirror_task(&self, update: crate::session_task::SessionTaskUpdate) {
1490 let (Some(registry), Some(task_id)) = (&self.context.session_task_registry, &self.task_id)
1491 else {
1492 return;
1493 };
1494 let _ = registry
1495 .update(self.context.session_id, task_id, update)
1496 .await;
1497 }
1498
1499 async fn finalize_canceled(&self) -> Result<()> {
1500 let output_log = {
1501 let state = self.state.lock().await;
1502 let mut log = Self::final_output_log(&state);
1503 log.push_str("\nCanceled by request.\n");
1504 log
1505 };
1506 self.write_text_file(&self.log_path, &output_log).await?;
1507 let result_json = serde_json::to_string_pretty(&serde_json::json!({"status": "canceled"}))
1508 .unwrap_or_else(|_| r#"{"status":"canceled"}"#.to_string());
1509 self.write_text_file(&self.result_path, &result_json)
1510 .await?;
1511
1512 let mut state = self.state.lock().await;
1513 state.status_text = "Canceled".to_string();
1514 drop(state);
1515
1516 self.mirror_task(crate::session_task::SessionTaskUpdate {
1517 state: Some(crate::session_task::SessionTaskState::Canceled),
1518 summary: Some("Canceled by request.".to_string()),
1519 result_path: Some(self.result_path.clone()),
1520 ..Default::default()
1521 })
1522 .await;
1523 if self.signal_on_completion {
1524 self.signal_session("canceled", "Canceled by request.")
1525 .await?;
1526 }
1527 Ok(())
1528 }
1529
1530 async fn finalize(
1531 &self,
1532 outcome: std::result::Result<BackgroundOutcome, ToolExecutionResult>,
1533 ) -> Result<()> {
1534 match outcome {
1535 Ok(outcome) => {
1536 let output_log = if let Some(raw_output) = &outcome.raw_output {
1537 raw_output.clone()
1538 } else {
1539 let state = self.state.lock().await;
1540 Self::final_output_log(&state)
1541 };
1542 self.write_text_file(&self.log_path, &output_log).await?;
1543 let result_json = serde_json::to_string_pretty(&outcome.result)
1544 .unwrap_or_else(|_| outcome.result.to_string());
1545 self.write_text_file(&self.result_path, &result_json)
1546 .await?;
1547
1548 let mut state = self.state.lock().await;
1549 state.status_text = "Completed".to_string();
1550 drop(state);
1551 self.mirror_task(crate::session_task::SessionTaskUpdate {
1552 state: Some(crate::session_task::SessionTaskState::Succeeded),
1553 summary: Some(outcome.summary.clone()),
1554 result_path: Some(self.result_path.clone()),
1555 ..Default::default()
1556 })
1557 .await;
1558 if self.signal_on_completion {
1559 self.signal_session("completed", &outcome.summary).await?;
1560 }
1561 }
1562 Err(err) => {
1563 let message = match err {
1564 ToolExecutionResult::ToolError(msg) => msg,
1565 ToolExecutionResult::InternalError(inner) => inner.message,
1566 ToolExecutionResult::ConnectionRequired { provider } => {
1567 format!("Background tool requires connection setup: {provider}")
1568 }
1569 ToolExecutionResult::Success(_)
1570 | ToolExecutionResult::SuccessWithImages { .. } => {
1571 "Background run ended unexpectedly".to_string()
1572 }
1573 };
1574 let output_log = {
1575 let state = self.state.lock().await;
1576 Self::final_output_log(&state)
1577 };
1578 self.write_text_file(&self.log_path, &output_log).await?;
1579 let error_json = serde_json::to_string_pretty(&json!({
1580 "status": "failed",
1581 "error": &message,
1582 }))
1583 .unwrap_or_else(|_| {
1584 json!({
1585 "status": "failed",
1586 "error": &message,
1587 })
1588 .to_string()
1589 });
1590 self.write_text_file(&self.result_path, &error_json).await?;
1591 let mut state = self.state.lock().await;
1592 state.status_text = "Failed".to_string();
1593 drop(state);
1594 self.mirror_task(crate::session_task::SessionTaskUpdate {
1595 state: Some(crate::session_task::SessionTaskState::Failed),
1596 summary: Some(message.clone()),
1597 result_path: Some(self.result_path.clone()),
1598 error: Some(crate::session_task::TaskError {
1599 kind: "error".to_string(),
1600 message: message.clone(),
1601 }),
1602 ..Default::default()
1603 })
1604 .await;
1605 if self.signal_on_completion {
1606 self.signal_session("failed", &message).await?;
1607 }
1608 }
1609 }
1610
1611 Ok(())
1612 }
1613
1614 async fn signal_session(&self, status: &str, summary: &str) -> Result<()> {
1615 let Some(platform_store) = &self.context.platform_store else {
1616 return Ok(());
1617 };
1618 let message = format!(
1619 "Background run {status}.\n- run_id: {}\n- title: {}\n- tool: {}\n- summary: {}\n- result_path: {}\n- log_path: {}",
1620 self.run_id,
1621 self.display_name,
1622 self.tool_name,
1623 summary,
1624 self.result_path,
1625 self.log_path
1626 );
1627 platform_store
1628 .send_message(self.context.session_id, &message)
1629 .await
1630 }
1631
1632 async fn write_text_file(&self, path: &str, content: &str) -> Result<()> {
1633 let file_store = self.context.file_store.as_ref().ok_or_else(|| {
1634 anyhow::anyhow!(
1635 "background run {} cannot persist artifact {} because no session file store is configured",
1636 self.run_id,
1637 path
1638 )
1639 })?;
1640
1641 ensure_directory(file_store.as_ref(), self.context.session_id, "/.background").await?;
1642 let run_dir = format!("/.background/{}", self.run_id);
1643 ensure_directory(file_store.as_ref(), self.context.session_id, &run_dir).await?;
1644 file_store
1645 .write_file(self.context.session_id, path, content, "text")
1646 .await?;
1647 Ok(())
1648 }
1649}
1650
1651#[async_trait]
1652impl BackgroundEventSink for SessionBackgroundSink {
1653 async fn status(&self, message: &str) -> Result<()> {
1654 let mut state = self.state.lock().await;
1655 state.status_text = message.to_string();
1656 drop(state);
1657 self.mirror_task(crate::session_task::SessionTaskUpdate {
1658 state_detail: Some(message.to_string()),
1659 ..Default::default()
1660 })
1661 .await;
1662 Ok(())
1663 }
1664
1665 async fn output(&self, stream: &str, delta: &str) -> Result<()> {
1666 let mut state = self.state.lock().await;
1667 if !delta.is_empty() {
1668 let prefix = format!("[{stream}] ");
1669 state.output_tail.push_str(&prefix);
1670 state.output_tail.push_str(delta);
1671 Self::append_to_output_log(&mut state, &prefix, delta);
1672 if state.output_tail.chars().count() > 2048 {
1673 state.output_tail = state
1674 .output_tail
1675 .chars()
1676 .rev()
1677 .take(2048)
1678 .collect::<Vec<_>>()
1679 .into_iter()
1680 .rev()
1681 .collect();
1682 }
1683 }
1684 Ok(())
1685 }
1686
1687 async fn progress(&self, progress: BackgroundProgress) -> Result<()> {
1688 let mut state = self.state.lock().await;
1689 state.progress = Some(progress.clone());
1690 drop(state);
1691 self.mirror_task(crate::session_task::SessionTaskUpdate {
1692 progress: Some(progress),
1693 ..Default::default()
1694 })
1695 .await;
1696 Ok(())
1697 }
1698}
1699
1700impl SessionBackgroundSink {
1701 fn append_to_output_log(state: &mut SessionBackgroundState, prefix: &str, delta: &str) {
1702 if state.output_log_chars >= MAX_BACKGROUND_OUTPUT_LOG_CHARS {
1703 state.output_log_truncated = true;
1704 return;
1705 }
1706
1707 let chunk = format!("{prefix}{delta}");
1708 let remaining = MAX_BACKGROUND_OUTPUT_LOG_CHARS - state.output_log_chars;
1709 let chunk_chars = chunk.chars().count();
1710
1711 if chunk_chars <= remaining {
1712 state.output_log.push_str(&chunk);
1713 state.output_log_chars += chunk_chars;
1714 return;
1715 }
1716
1717 let truncated_chunk: String = chunk.chars().take(remaining).collect();
1718 state.output_log.push_str(&truncated_chunk);
1719 state.output_log_chars += truncated_chunk.chars().count();
1720 state.output_log_truncated = true;
1721 }
1722
1723 fn final_output_log(state: &SessionBackgroundState) -> String {
1724 if !state.output_log_truncated {
1725 return state.output_log.clone();
1726 }
1727
1728 format!(
1729 "{}\n[system] background output truncated at {} characters\n",
1730 state.output_log, MAX_BACKGROUND_OUTPUT_LOG_CHARS
1731 )
1732 }
1733}
1734
1735const BACKGROUND_CANCEL_SENTINEL: &str = "__everruns_background_cancel__";
1739
1740fn is_canceled_outcome(
1744 outcome: &std::result::Result<BackgroundOutcome, ToolExecutionResult>,
1745) -> bool {
1746 matches!(outcome, Err(ToolExecutionResult::ToolError(msg)) if msg == BACKGROUND_CANCEL_SENTINEL)
1747}
1748
1749pub(crate) async fn reattach_background_run(
1765 task: &crate::session_task::SessionTask,
1766 context: &crate::traits::ToolContext,
1767) -> crate::error::Result<()> {
1768 if context.file_store.is_none() {
1771 return Err(crate::error::AgentLoopError::tool(
1772 "file store not available; cannot re-attach background run",
1773 ));
1774 }
1775 if context.session_task_registry.is_none() {
1776 return Err(crate::error::AgentLoopError::tool(
1777 "task registry not available; cannot re-attach background run",
1778 ));
1779 }
1780
1781 let tool_name: String = task
1782 .spec
1783 .get("tool")
1784 .and_then(|v| v.as_str())
1785 .filter(|s| !s.is_empty())
1786 .map(str::to_owned)
1787 .ok_or_else(|| {
1788 crate::error::AgentLoopError::tool(
1789 "background_tool spec missing 'tool' field; cannot re-attach",
1790 )
1791 })?;
1792
1793 let tool_args = task
1794 .spec
1795 .get("arguments")
1796 .cloned()
1797 .unwrap_or_else(|| serde_json::Value::Object(Default::default()));
1798
1799 let registry = std::sync::Arc::new(ToolRegistry::with_defaults());
1800
1801 let Some(tool) = registry.get(&tool_name).cloned() else {
1802 return Err(crate::error::AgentLoopError::tool(format!(
1803 "tool '{tool_name}' not found in built-in registry; cannot re-attach"
1804 )));
1805 };
1806
1807 if tool.as_background_executable().is_none() {
1808 return Err(crate::error::AgentLoopError::tool(format!(
1809 "tool '{tool_name}' does not support background execution; cannot re-attach"
1810 )));
1811 }
1812
1813 let hints = tool.hints();
1816 if !hints.idempotent.unwrap_or(false) && !hints.readonly.unwrap_or(false) {
1817 return Err(crate::error::AgentLoopError::tool(format!(
1818 "tool '{tool_name}' is not idempotent or readonly; re-attach declined",
1819 )));
1820 }
1821
1822 let background_run_permit = ACTIVE_BACKGROUND_RUNS_PER_WORKER
1825 .try_acquire()
1826 .map_err(|_| {
1827 crate::error::AgentLoopError::tool(
1828 "worker background run limit reached; re-attach deferred",
1829 )
1830 })?;
1831 let session_run_permit =
1832 try_acquire_session_background_permit(task.session_id).map_err(|_| {
1833 crate::error::AgentLoopError::tool(
1834 "session background run limit reached; re-attach deferred",
1835 )
1836 })?;
1837
1838 let signal_on_completion = task
1841 .spec
1842 .get("signal_on_completion")
1843 .and_then(|v| v.as_bool())
1844 .unwrap_or(true);
1845
1846 let run_id = format!("bg_{}", uuid::Uuid::now_v7().simple());
1847 let artifact_dir = format!("/.background/{run_id}");
1848 let log_path = format!("{artifact_dir}/output.log");
1849 let result_path = format!("{artifact_dir}/result.json");
1850
1851 let task_id = task.id.clone();
1852 let task_attempt = task.attempt;
1853 let session_id = task.session_id;
1854
1855 let sink_context = context.clone().with_tool_registry(registry);
1856 let sink = std::sync::Arc::new(SessionBackgroundSink::new(
1857 sink_context.clone(),
1858 run_id.clone(),
1859 task.display_name.clone(),
1860 tool_name.to_string(),
1861 log_path,
1862 result_path,
1863 signal_on_completion,
1864 Some(task_id.clone()),
1865 ));
1866
1867 let cancel_registry = context.session_task_registry.clone();
1868 let run_id_for_log = run_id.clone();
1869
1870 tokio::spawn(async move {
1871 let _background_run_permit = background_run_permit;
1873 let _session_run_permit = session_run_permit;
1874 let _ = sink.status("Re-attaching").await;
1875
1876 let outcome: std::result::Result<BackgroundOutcome, ToolExecutionResult> =
1877 match (cancel_registry.as_ref(), Some(task_id.as_str())) {
1878 (Some(registry), Some(task_id_str)) => {
1879 let registry = registry.clone();
1880 let task_id_str = task_id_str.to_string();
1881 let tool_fut = async {
1882 match tool.as_background_executable() {
1883 Some(bg) => {
1884 bg.execute_background(tool_args, sink_context.clone(), sink.clone())
1885 .await
1886 }
1887 None => Err(ToolExecutionResult::tool_error(format!(
1888 "tool '{tool_name}' lost background support during re-attach"
1889 ))),
1890 }
1891 };
1892 let watch_fut = async {
1893 loop {
1894 tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1895 let _ = registry
1896 .update(
1897 session_id,
1898 &task_id_str,
1899 crate::session_task::SessionTaskUpdate {
1900 heartbeat_at: Some(chrono::Utc::now()),
1901 expected_attempt: Some(task_attempt),
1902 ..Default::default()
1903 },
1904 )
1905 .await;
1906 if let Ok(Some(t)) = registry.get(session_id, &task_id_str).await
1907 && t.cancel_requested_at.is_some()
1908 {
1909 break;
1910 }
1911 }
1912 };
1913 tokio::select! {
1914 result = tool_fut => result,
1915 () = watch_fut => Err(ToolExecutionResult::ToolError(
1916 BACKGROUND_CANCEL_SENTINEL.to_string(),
1917 )),
1918 }
1919 }
1920 _ => match tool.as_background_executable() {
1921 Some(bg) => {
1922 bg.execute_background(tool_args, sink_context, sink.clone())
1923 .await
1924 }
1925 None => Err(ToolExecutionResult::tool_error(format!(
1926 "tool '{tool_name}' lost background support during re-attach"
1927 ))),
1928 },
1929 };
1930
1931 let finalize_result = if is_canceled_outcome(&outcome) {
1932 sink.finalize_canceled().await
1933 } else {
1934 sink.finalize(outcome).await
1935 };
1936 if let Err(err) = finalize_result {
1937 tracing::warn!(
1938 run_id = run_id_for_log,
1939 error = %err,
1940 "Background run re-attach finalization failed"
1941 );
1942 }
1943 });
1944
1945 Ok(())
1946}
1947
1948async fn ensure_directory(
1949 file_store: &dyn crate::traits::SessionFileSystem,
1950 session_id: crate::SessionId,
1951 path: &str,
1952) -> Result<()> {
1953 if let Some(entry) = file_store.stat_file(session_id, path).await? {
1954 if entry.is_directory {
1955 return Ok(());
1956 }
1957 return Err(anyhow::anyhow!("path exists but is not a directory: {path}").into());
1958 }
1959 let _ = file_store.create_directory(session_id, path).await?;
1960 Ok(())
1961}
1962
1963pub struct FailingTool {
1965 error_message: String,
1966 use_internal_error: bool,
1967}
1968
1969impl FailingTool {
1970 pub fn with_tool_error(message: impl Into<String>) -> Self {
1972 Self {
1973 error_message: message.into(),
1974 use_internal_error: false,
1975 }
1976 }
1977
1978 pub fn with_internal_error(message: impl Into<String>) -> Self {
1980 Self {
1981 error_message: message.into(),
1982 use_internal_error: true,
1983 }
1984 }
1985}
1986
1987impl Default for FailingTool {
1988 fn default() -> Self {
1989 Self::with_tool_error("Tool execution failed")
1990 }
1991}
1992
1993#[async_trait]
1994impl Tool for FailingTool {
1995 fn name(&self) -> &str {
1996 "failing_tool"
1997 }
1998
1999 fn display_name(&self) -> Option<&str> {
2000 Some("Failing Tool")
2001 }
2002
2003 fn description(&self) -> &str {
2004 "A tool that always fails (for testing error handling)"
2005 }
2006
2007 fn parameters_schema(&self) -> Value {
2008 serde_json::json!({
2009 "type": "object",
2010 "properties": {},
2011 "additionalProperties": false
2012 })
2013 }
2014
2015 fn hints(&self) -> ToolHints {
2016 ToolHints::default()
2017 .with_readonly(true)
2018 .with_idempotent(true)
2019 }
2020
2021 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2022 if self.use_internal_error {
2023 ToolExecutionResult::internal_error_msg(&self.error_message)
2024 } else {
2025 ToolExecutionResult::tool_error(&self.error_message)
2026 }
2027 }
2028}
2029
2030#[cfg(test)]
2035mod tests {
2036 use super::*;
2037 use crate::capabilities::GetCurrentTimeTool;
2038 use crate::platform_store::PlatformStore;
2039 use crate::session_file::{FileInfo, FileStat, SessionFile};
2040 use crate::session_task::SessionTaskRegistry;
2041 use crate::traits::{SessionFileSystem, SessionScheduleStore};
2042 use crate::typed_id::{HarnessId, SessionId};
2043 use crate::{AgentId, KeyInfo, PlatformMessage, SecretInfo};
2044 use async_trait::async_trait;
2045 use std::sync::{
2046 Arc as StdArc, Mutex,
2047 atomic::{AtomicBool, Ordering},
2048 };
2049
2050 #[derive(Default)]
2051 struct TestBackgroundTool;
2052
2053 #[async_trait]
2054 impl BackgroundExecutableTool for TestBackgroundTool {
2055 async fn execute_background(
2056 &self,
2057 arguments: Value,
2058 _context: ToolContext,
2059 sink: Arc<dyn BackgroundEventSink>,
2060 ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
2061 sink.status("Waiting for test result")
2062 .await
2063 .map_err(ToolExecutionResult::internal_error)?;
2064 sink.output("stdout", "hello from background")
2065 .await
2066 .map_err(ToolExecutionResult::internal_error)?;
2067 sink.progress(BackgroundProgress {
2068 current: Some(1),
2069 total: Some(1),
2070 unit: Some("step".to_string()),
2071 label: Some("done".to_string()),
2072 })
2073 .await
2074 .map_err(ToolExecutionResult::internal_error)?;
2075
2076 Ok(BackgroundOutcome {
2077 summary: arguments["summary"].as_str().unwrap_or("done").to_string(),
2078 result: json!({"ok": true}),
2079 raw_output: None,
2080 })
2081 }
2082 }
2083
2084 #[async_trait]
2085 impl Tool for TestBackgroundTool {
2086 fn name(&self) -> &str {
2087 "test_background"
2088 }
2089
2090 fn display_name(&self) -> Option<&str> {
2091 Some("Test Background")
2092 }
2093
2094 fn description(&self) -> &str {
2095 "test tool"
2096 }
2097
2098 fn parameters_schema(&self) -> Value {
2099 json!({
2100 "type": "object",
2101 "properties": {
2102 "summary": { "type": "string" }
2103 }
2104 })
2105 }
2106
2107 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2108 ToolExecutionResult::tool_error("foreground unsupported")
2109 }
2110
2111 fn hints(&self) -> ToolHints {
2112 ToolHints::default().with_supports_background(true)
2113 }
2114
2115 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
2116 Some(self)
2117 }
2118 }
2119
2120 #[derive(Default)]
2121 struct TestFailingBackgroundTool;
2122
2123 #[async_trait]
2124 impl BackgroundExecutableTool for TestFailingBackgroundTool {
2125 async fn execute_background(
2126 &self,
2127 _arguments: Value,
2128 _context: ToolContext,
2129 sink: Arc<dyn BackgroundEventSink>,
2130 ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
2131 sink.status("Running failing test")
2132 .await
2133 .map_err(ToolExecutionResult::internal_error)?;
2134 sink.output("stderr", "background failed")
2135 .await
2136 .map_err(ToolExecutionResult::internal_error)?;
2137 Err(ToolExecutionResult::tool_error("boom"))
2138 }
2139 }
2140
2141 #[async_trait]
2142 impl Tool for TestFailingBackgroundTool {
2143 fn name(&self) -> &str {
2144 "test_background_fail"
2145 }
2146
2147 fn display_name(&self) -> Option<&str> {
2148 Some("Test Background Fail")
2149 }
2150
2151 fn description(&self) -> &str {
2152 "failing background test tool"
2153 }
2154
2155 fn parameters_schema(&self) -> Value {
2156 json!({
2157 "type": "object",
2158 "properties": {}
2159 })
2160 }
2161
2162 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2163 ToolExecutionResult::tool_error("foreground unsupported")
2164 }
2165
2166 fn hints(&self) -> ToolHints {
2167 ToolHints::default().with_supports_background(true)
2168 }
2169
2170 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
2171 Some(self)
2172 }
2173 }
2174
2175 #[derive(Default)]
2176 struct TestLargeOutputBackgroundTool;
2177
2178 #[async_trait]
2179 impl BackgroundExecutableTool for TestLargeOutputBackgroundTool {
2180 async fn execute_background(
2181 &self,
2182 _arguments: Value,
2183 _context: ToolContext,
2184 sink: Arc<dyn BackgroundEventSink>,
2185 ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
2186 let large_chunk = "x".repeat(MAX_BACKGROUND_OUTPUT_LOG_CHARS + 4096);
2187 sink.output("stdout", &large_chunk)
2188 .await
2189 .map_err(ToolExecutionResult::internal_error)?;
2190 Ok(BackgroundOutcome {
2191 summary: "large output complete".to_string(),
2192 result: json!({"ok": true}),
2193 raw_output: None,
2194 })
2195 }
2196 }
2197
2198 #[async_trait]
2199 impl Tool for TestLargeOutputBackgroundTool {
2200 fn name(&self) -> &str {
2201 "test_background_large_output"
2202 }
2203
2204 fn display_name(&self) -> Option<&str> {
2205 Some("Test Background Large Output")
2206 }
2207
2208 fn description(&self) -> &str {
2209 "background test tool with huge output"
2210 }
2211
2212 fn parameters_schema(&self) -> Value {
2213 json!({
2214 "type": "object",
2215 "properties": {}
2216 })
2217 }
2218
2219 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2220 ToolExecutionResult::tool_error("foreground unsupported")
2221 }
2222
2223 fn hints(&self) -> ToolHints {
2224 ToolHints::default().with_supports_background(true)
2225 }
2226
2227 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
2228 Some(self)
2229 }
2230 }
2231
2232 struct BlockingBackgroundTool {
2233 release: StdArc<AtomicBool>,
2234 }
2235
2236 #[async_trait]
2237 impl BackgroundExecutableTool for BlockingBackgroundTool {
2238 async fn execute_background(
2239 &self,
2240 _arguments: Value,
2241 _context: ToolContext,
2242 sink: Arc<dyn BackgroundEventSink>,
2243 ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
2244 sink.status("Blocking until released")
2245 .await
2246 .map_err(ToolExecutionResult::internal_error)?;
2247 while !self.release.load(Ordering::SeqCst) {
2248 tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2249 }
2250 Ok(BackgroundOutcome {
2251 summary: "released".to_string(),
2252 result: json!({"ok": true}),
2253 raw_output: None,
2254 })
2255 }
2256 }
2257
2258 #[async_trait]
2259 impl Tool for BlockingBackgroundTool {
2260 fn name(&self) -> &str {
2261 "test_background_blocking"
2262 }
2263
2264 fn display_name(&self) -> Option<&str> {
2265 Some("Test Background Blocking")
2266 }
2267
2268 fn description(&self) -> &str {
2269 "background test tool that waits for test release"
2270 }
2271
2272 fn parameters_schema(&self) -> Value {
2273 json!({
2274 "type": "object",
2275 "properties": {}
2276 })
2277 }
2278
2279 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2280 ToolExecutionResult::tool_error("foreground unsupported")
2281 }
2282
2283 fn hints(&self) -> ToolHints {
2284 ToolHints::default().with_supports_background(true)
2285 }
2286
2287 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
2288 Some(self)
2289 }
2290 }
2291
2292 #[derive(Default)]
2293 struct TestFileStore {
2294 files: Mutex<HashMap<String, SessionFile>>,
2295 }
2296
2297 #[async_trait]
2298 impl crate::traits::SessionFileSystem for TestFileStore {
2299 async fn read_file(
2300 &self,
2301 _session_id: SessionId,
2302 path: &str,
2303 ) -> crate::Result<Option<SessionFile>> {
2304 Ok(self.files.lock().unwrap().get(path).cloned())
2305 }
2306
2307 async fn write_file(
2308 &self,
2309 session_id: SessionId,
2310 path: &str,
2311 content: &str,
2312 encoding: &str,
2313 ) -> crate::Result<SessionFile> {
2314 let now = chrono::Utc::now();
2315 let file = SessionFile {
2316 id: uuid::Uuid::now_v7(),
2317 session_id: session_id.uuid(),
2318 path: path.to_string(),
2319 name: FileInfo::name_from_path(path),
2320 content: Some(content.to_string()),
2321 encoding: encoding.to_string(),
2322 is_directory: false,
2323 is_readonly: false,
2324 size_bytes: content.len() as i64,
2325 created_at: now,
2326 updated_at: now,
2327 };
2328 self.files
2329 .lock()
2330 .unwrap()
2331 .insert(path.to_string(), file.clone());
2332 Ok(file)
2333 }
2334
2335 async fn delete_file(
2336 &self,
2337 _session_id: SessionId,
2338 _path: &str,
2339 _recursive: bool,
2340 ) -> crate::Result<bool> {
2341 Ok(false)
2342 }
2343
2344 async fn list_directory(
2345 &self,
2346 _session_id: SessionId,
2347 _path: &str,
2348 ) -> crate::Result<Vec<FileInfo>> {
2349 Ok(Vec::new())
2350 }
2351
2352 async fn stat_file(
2353 &self,
2354 _session_id: SessionId,
2355 path: &str,
2356 ) -> crate::Result<Option<FileStat>> {
2357 let file = self.files.lock().unwrap().get(path).cloned();
2358 Ok(file.map(|entry| FileStat {
2359 path: entry.path,
2360 name: entry.name,
2361 is_directory: entry.is_directory,
2362 is_readonly: entry.is_readonly,
2363 size_bytes: entry.size_bytes,
2364 created_at: entry.created_at,
2365 updated_at: entry.updated_at,
2366 }))
2367 }
2368
2369 async fn grep_files(
2370 &self,
2371 _session_id: SessionId,
2372 _pattern: &str,
2373 _path_pattern: Option<&str>,
2374 ) -> crate::Result<Vec<crate::session_file::GrepMatch>> {
2375 Ok(Vec::new())
2376 }
2377
2378 async fn create_directory(
2379 &self,
2380 session_id: SessionId,
2381 path: &str,
2382 ) -> crate::Result<FileInfo> {
2383 let now = chrono::Utc::now();
2384 let id = uuid::Uuid::now_v7();
2385 let dir = SessionFile {
2386 id,
2387 session_id: session_id.uuid(),
2388 path: path.to_string(),
2389 name: FileInfo::name_from_path(path),
2390 content: None,
2391 encoding: "text".to_string(),
2392 is_directory: true,
2393 is_readonly: false,
2394 size_bytes: 0,
2395 created_at: now,
2396 updated_at: now,
2397 };
2398 self.files.lock().unwrap().insert(path.to_string(), dir);
2399 Ok(FileInfo {
2400 id,
2401 session_id: session_id.uuid(),
2402 path: path.to_string(),
2403 name: FileInfo::name_from_path(path),
2404 is_directory: true,
2405 is_readonly: false,
2406 size_bytes: 0,
2407 created_at: now,
2408 updated_at: now,
2409 })
2410 }
2411 }
2412
2413 #[derive(Default)]
2414 struct TestPlatformStore {
2415 sent_messages: Mutex<Vec<String>>,
2416 }
2417
2418 #[async_trait]
2419 impl PlatformStore for TestPlatformStore {
2420 async fn list_harnesses(&self) -> crate::Result<Vec<crate::Harness>> {
2421 Ok(Vec::new())
2422 }
2423 async fn get_harness(&self, _id: HarnessId) -> crate::Result<Option<crate::Harness>> {
2424 Ok(None)
2425 }
2426 async fn create_harness(
2427 &self,
2428 _name: &str,
2429 _display_name: Option<&str>,
2430 _description: Option<&str>,
2431 _system_prompt: Option<&str>,
2432 _parent_harness_id: Option<HarnessId>,
2433 _capabilities: &[String],
2434 ) -> crate::Result<crate::Harness> {
2435 unreachable!()
2436 }
2437 async fn update_harness(
2438 &self,
2439 _id: HarnessId,
2440 _name: Option<&str>,
2441 _display_name: Option<&str>,
2442 _description: Option<&str>,
2443 _system_prompt: Option<&str>,
2444 _parent_harness_id: Option<Option<HarnessId>>,
2445 ) -> crate::Result<crate::Harness> {
2446 unreachable!()
2447 }
2448 async fn delete_harness(&self, _id: HarnessId) -> crate::Result<()> {
2449 Ok(())
2450 }
2451 async fn copy_harness(
2452 &self,
2453 _id: HarnessId,
2454 _new_name: Option<&str>,
2455 ) -> crate::Result<crate::Harness> {
2456 unreachable!()
2457 }
2458 async fn list_agents(&self) -> crate::Result<Vec<crate::Agent>> {
2459 Ok(Vec::new())
2460 }
2461 async fn get_agent_by_id(&self, _id: AgentId) -> crate::Result<Option<crate::Agent>> {
2462 Ok(None)
2463 }
2464 async fn create_agent(
2465 &self,
2466 _name: &str,
2467 _display_name: Option<&str>,
2468 _description: Option<&str>,
2469 _system_prompt: &str,
2470 _capabilities: &[String],
2471 ) -> crate::Result<crate::Agent> {
2472 unreachable!()
2473 }
2474 async fn update_agent(
2475 &self,
2476 _id: AgentId,
2477 _name: Option<&str>,
2478 _display_name: Option<&str>,
2479 _description: Option<&str>,
2480 _system_prompt: Option<&str>,
2481 ) -> crate::Result<crate::Agent> {
2482 unreachable!()
2483 }
2484 async fn delete_agent(&self, _id: AgentId) -> crate::Result<()> {
2485 Ok(())
2486 }
2487 async fn list_apps(
2488 &self,
2489 _search: Option<&str>,
2490 _include_archived: bool,
2491 ) -> crate::Result<Vec<crate::App>> {
2492 Ok(Vec::new())
2493 }
2494 async fn get_app(&self, _id: crate::AppId) -> crate::Result<Option<crate::App>> {
2495 Ok(None)
2496 }
2497 async fn create_app(
2498 &self,
2499 _name: &str,
2500 _description: Option<&str>,
2501 _harness_id: HarnessId,
2502 _agent_id: Option<AgentId>,
2503 _agent_identity_id: Option<crate::AgentIdentityId>,
2504 _channel_type: Option<crate::ChannelType>,
2505 _channel_config: Option<&serde_json::Value>,
2506 ) -> crate::Result<crate::App> {
2507 unreachable!()
2508 }
2509 async fn update_app(
2510 &self,
2511 _id: crate::AppId,
2512 _name: Option<&str>,
2513 _description: Option<&str>,
2514 _harness_id: Option<HarnessId>,
2515 _agent_id: Option<AgentId>,
2516 _agent_identity_id: Option<Option<crate::AgentIdentityId>>,
2517 ) -> crate::Result<crate::App> {
2518 unreachable!()
2519 }
2520 async fn delete_app(&self, _id: crate::AppId) -> crate::Result<()> {
2521 Ok(())
2522 }
2523 async fn destroy_app(&self, _id: crate::AppId) -> crate::Result<()> {
2524 Ok(())
2525 }
2526 async fn publish_app(&self, _id: crate::AppId) -> crate::Result<crate::App> {
2527 unreachable!()
2528 }
2529 async fn unpublish_app(&self, _id: crate::AppId) -> crate::Result<crate::App> {
2530 unreachable!()
2531 }
2532 async fn add_app_channel(
2533 &self,
2534 _app_id: crate::AppId,
2535 _channel_type: crate::ChannelType,
2536 _channel_config: Option<&serde_json::Value>,
2537 _enabled: Option<bool>,
2538 ) -> crate::Result<crate::AppChannel> {
2539 unreachable!()
2540 }
2541 async fn update_app_channel(
2542 &self,
2543 _app_id: crate::AppId,
2544 _channel_id: crate::AppChannelId,
2545 _channel_type: Option<crate::ChannelType>,
2546 _channel_config: Option<&serde_json::Value>,
2547 _enabled: Option<bool>,
2548 ) -> crate::Result<crate::AppChannel> {
2549 unreachable!()
2550 }
2551 async fn delete_app_channel(
2552 &self,
2553 _app_id: crate::AppId,
2554 _channel_id: crate::AppChannelId,
2555 ) -> crate::Result<()> {
2556 Ok(())
2557 }
2558 async fn list_sessions(
2559 &self,
2560 _limit: Option<usize>,
2561 _agent_id: Option<AgentId>,
2562 ) -> crate::Result<Vec<crate::Session>> {
2563 Ok(Vec::new())
2564 }
2565 async fn create_session(
2566 &self,
2567 _harness_id: HarnessId,
2568 _agent_id: Option<AgentId>,
2569 _title: Option<&str>,
2570 _locale: Option<&str>,
2571 _blueprint_id: Option<&str>,
2572 _blueprint_config: Option<&serde_json::Value>,
2573 _parent_session_id: Option<SessionId>,
2574 ) -> crate::Result<crate::Session> {
2575 unreachable!()
2576 }
2577 async fn get_session_by_id(&self, _id: SessionId) -> crate::Result<Option<crate::Session>> {
2578 Ok(None)
2579 }
2580 async fn get_session_context_report(
2581 &self,
2582 id: SessionId,
2583 ) -> crate::Result<crate::SessionContextReport> {
2584 Ok(crate::SessionContextReport {
2585 session_id: id.to_string(),
2586 model: "llmsim".to_string(),
2587 context_window_tokens: None,
2588 estimated_input_tokens: 0,
2589 sections: vec![],
2590 contributions: vec![],
2591 cumulative_usage: None,
2592 })
2593 }
2594 async fn delete_session(&self, _id: SessionId) -> crate::Result<()> {
2595 Ok(())
2596 }
2597 async fn send_message(&self, _session_id: SessionId, content: &str) -> crate::Result<()> {
2598 self.sent_messages.lock().unwrap().push(content.to_string());
2599 Ok(())
2600 }
2601 async fn get_messages(
2602 &self,
2603 _session_id: SessionId,
2604 _limit: Option<usize>,
2605 ) -> crate::Result<Vec<PlatformMessage>> {
2606 Ok(Vec::new())
2607 }
2608 async fn wait_for_idle(
2609 &self,
2610 _session_id: SessionId,
2611 _timeout_secs: Option<u64>,
2612 ) -> crate::Result<String> {
2613 Ok("idle".to_string())
2614 }
2615 async fn list_capabilities(
2616 &self,
2617 _search: Option<&str>,
2618 ) -> crate::Result<Vec<crate::CapabilityInfo>> {
2619 Ok(Vec::new())
2620 }
2621 fn base_url(&self) -> &str {
2622 "http://localhost:9300"
2623 }
2624 }
2625
2626 #[derive(Default)]
2627 struct NoopStorageStore;
2628
2629 #[derive(Default)]
2630 struct TestScheduleStore {
2631 schedules: Mutex<Vec<crate::session_schedule::SessionSchedule>>,
2632 }
2633
2634 #[async_trait]
2635 impl crate::traits::SessionScheduleStore for TestScheduleStore {
2636 async fn create_schedule(
2637 &self,
2638 session_id: SessionId,
2639 description: String,
2640 cron_expression: Option<String>,
2641 scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
2642 timezone: String,
2643 ) -> crate::Result<crate::session_schedule::SessionSchedule> {
2644 let schedule = crate::session_schedule::SessionSchedule {
2645 id: crate::typed_id::ScheduleId::new(),
2646 session_id,
2647 owner_principal_id: crate::PrincipalId::from_seed(1),
2648 resolved_owner_user_id: None,
2649 owner: None,
2650 effective_owner: None,
2651 description,
2652 cron_expression: cron_expression.clone(),
2653 scheduled_at,
2654 timezone,
2655 enabled: true,
2656 schedule_type: crate::session_schedule::SessionSchedule::derive_type(
2657 &cron_expression,
2658 ),
2659 next_trigger_at: Some(chrono::Utc::now() + chrono::Duration::minutes(10)),
2660 last_triggered_at: None,
2661 trigger_count: 0,
2662 created_at: chrono::Utc::now(),
2663 updated_at: chrono::Utc::now(),
2664 };
2665 self.schedules.lock().unwrap().push(schedule.clone());
2666 Ok(schedule)
2667 }
2668
2669 async fn cancel_schedule(
2670 &self,
2671 _session_id: SessionId,
2672 schedule_id: crate::ScheduleId,
2673 ) -> crate::Result<crate::session_schedule::SessionSchedule> {
2674 let mut schedules = self.schedules.lock().unwrap();
2675 let schedule = schedules
2676 .iter_mut()
2677 .find(|schedule| schedule.id == schedule_id)
2678 .ok_or_else(|| crate::AgentLoopError::tool("Schedule not found".to_string()))?;
2679 schedule.enabled = false;
2680 Ok(schedule.clone())
2681 }
2682
2683 async fn list_schedules(
2684 &self,
2685 session_id: SessionId,
2686 ) -> crate::Result<Vec<crate::session_schedule::SessionSchedule>> {
2687 Ok(self
2688 .schedules
2689 .lock()
2690 .unwrap()
2691 .iter()
2692 .filter(|schedule| schedule.session_id == session_id)
2693 .cloned()
2694 .collect())
2695 }
2696
2697 async fn count_active_schedules(&self, session_id: SessionId) -> crate::Result<u32> {
2698 Ok(self
2699 .schedules
2700 .lock()
2701 .unwrap()
2702 .iter()
2703 .filter(|schedule| schedule.session_id == session_id && schedule.enabled)
2704 .count() as u32)
2705 }
2706 }
2707
2708 #[async_trait]
2709 impl crate::traits::SessionStorageStore for NoopStorageStore {
2710 async fn set_value(
2711 &self,
2712 _session_id: SessionId,
2713 _key: &str,
2714 _value: &str,
2715 ) -> crate::Result<()> {
2716 Ok(())
2717 }
2718 async fn get_value(
2719 &self,
2720 _session_id: SessionId,
2721 _key: &str,
2722 ) -> crate::Result<Option<String>> {
2723 Ok(None)
2724 }
2725 async fn delete_value(&self, _session_id: SessionId, _key: &str) -> crate::Result<bool> {
2726 Ok(false)
2727 }
2728 async fn list_keys(&self, _session_id: SessionId) -> crate::Result<Vec<KeyInfo>> {
2729 Ok(Vec::new())
2730 }
2731 async fn set_secret(
2732 &self,
2733 _session_id: SessionId,
2734 _name: &str,
2735 _value: &str,
2736 ) -> crate::Result<()> {
2737 Ok(())
2738 }
2739 async fn get_secret(
2740 &self,
2741 _session_id: SessionId,
2742 _name: &str,
2743 ) -> crate::Result<Option<String>> {
2744 Ok(None)
2745 }
2746 async fn delete_secret(&self, _session_id: SessionId, _name: &str) -> crate::Result<bool> {
2747 Ok(false)
2748 }
2749 async fn list_secrets(&self, _session_id: SessionId) -> crate::Result<Vec<SecretInfo>> {
2750 Ok(Vec::new())
2751 }
2752 }
2753
2754 #[tokio::test]
2755 async fn test_echo_tool() {
2756 let tool = EchoTool;
2757
2758 let result = tool
2759 .execute(serde_json::json!({"message": "Hello, world!"}))
2760 .await;
2761
2762 if let ToolExecutionResult::Success(value) = result {
2763 assert_eq!(
2764 value.get("echoed").unwrap().as_str().unwrap(),
2765 "Hello, world!"
2766 );
2767 assert_eq!(value.get("length").unwrap().as_u64().unwrap(), 13);
2768 } else {
2769 panic!("Expected success");
2770 }
2771 }
2772
2773 #[tokio::test]
2774 async fn test_failing_tool_with_tool_error() {
2775 let tool = FailingTool::with_tool_error("Something went wrong");
2776
2777 let result = tool.execute(serde_json::json!({})).await;
2778
2779 if let ToolExecutionResult::ToolError(msg) = result {
2780 assert_eq!(msg, "Something went wrong");
2781 } else {
2782 panic!("Expected tool error");
2783 }
2784 }
2785
2786 #[tokio::test]
2787 async fn test_failing_tool_with_internal_error() {
2788 let tool = FailingTool::with_internal_error("Database connection failed");
2789
2790 let result = tool.execute(serde_json::json!({})).await;
2791
2792 if let ToolExecutionResult::InternalError(err) = result {
2793 assert_eq!(err.message, "Database connection failed");
2794 } else {
2795 panic!("Expected internal error");
2796 }
2797 }
2798
2799 #[tokio::test]
2800 async fn test_tool_result_conversion() {
2801 let result = ToolExecutionResult::success(serde_json::json!({"value": 42}));
2803 let tool_result = result.into_tool_result("call_1", "test_tool");
2804 assert!(tool_result.error.is_none());
2805 assert_eq!(tool_result.result.unwrap()["value"], 42);
2806
2807 let result = ToolExecutionResult::tool_error("Invalid input");
2809 let tool_result = result.into_tool_result("call_2", "test_tool");
2810 assert_eq!(tool_result.error.as_deref(), Some("Invalid input"));
2811 assert_eq!(
2812 tool_result.result.unwrap(),
2813 serde_json::json!({"error": "Invalid input"})
2814 );
2815
2816 let result = ToolExecutionResult::internal_error_msg("Secret database error");
2818 let tool_result = result.into_tool_result("call_3", "test_tool");
2819 assert_eq!(
2820 tool_result.error.as_deref(),
2821 Some("An internal error occurred while executing the tool")
2822 );
2823 assert_eq!(
2824 tool_result.result.unwrap(),
2825 serde_json::json!({"error": "An internal error occurred while executing the tool"})
2826 );
2827 }
2828
2829 #[tokio::test]
2830 async fn test_tool_registry() {
2831 let mut registry = ToolRegistry::new();
2832 registry.register(GetCurrentTimeTool);
2833 registry.register(EchoTool);
2834
2835 assert_eq!(registry.len(), 2);
2836 assert!(registry.has("get_current_time"));
2837 assert!(registry.has("echo"));
2838 assert!(!registry.has("nonexistent"));
2839
2840 let definitions = registry.tool_definitions();
2841 assert_eq!(definitions.len(), 2);
2842 }
2843
2844 #[tokio::test]
2845 async fn test_tool_registry_builder() {
2846 let registry = ToolRegistry::builder()
2847 .tool(GetCurrentTimeTool)
2848 .tool(EchoTool)
2849 .build();
2850
2851 assert_eq!(registry.len(), 2);
2852 }
2853
2854 #[test]
2855 fn test_tool_display_name_in_definition() {
2856 let tool = GetCurrentTimeTool;
2858 assert_eq!(tool.display_name(), Some("Get Current Time"));
2859
2860 let def = tool.to_definition();
2861 assert_eq!(def.display_name(), Some("Get Current Time"));
2862 }
2863
2864 #[test]
2865 fn test_success_with_raw_output_object_preserves_shape() {
2866 let res = ToolExecutionResult::success_with_raw_output(
2867 serde_json::json!({"stdout": "hello"}),
2868 "raw stdout bytes".to_string(),
2869 );
2870 let tr = res.into_tool_result("call_1", "demo");
2871 assert_eq!(tr.result.as_ref().unwrap()["stdout"], "hello");
2872 assert!(
2873 tr.result
2874 .as_ref()
2875 .unwrap()
2876 .as_object()
2877 .unwrap()
2878 .get("_raw_output")
2879 .is_none(),
2880 "sidecar key must not leak to the LLM-visible result"
2881 );
2882 assert_eq!(tr.raw_output.as_deref(), Some("raw stdout bytes"));
2883 }
2884
2885 #[test]
2886 fn test_success_with_raw_output_scalar_unwraps_to_string() {
2887 let res = ToolExecutionResult::success_with_raw_output(
2888 "compact summary".to_string(),
2889 "full output bytes".to_string(),
2890 );
2891 let tr = res.into_tool_result("call_1", "demo");
2892 assert_eq!(
2893 tr.result,
2894 Some(serde_json::Value::String("compact summary".into()))
2895 );
2896 assert_eq!(tr.raw_output.as_deref(), Some("full output bytes"));
2897 }
2898
2899 #[test]
2900 fn test_success_result_with_raw_output_scalar_key_is_not_unwrapped() {
2901 let res = ToolExecutionResult::success(
2902 serde_json::json!({"_raw_output_scalar": "user_value", "kept": true}),
2903 );
2904 let tr = res.into_tool_result("call_1", "demo");
2905 assert_eq!(
2906 tr.result,
2907 Some(serde_json::json!({"_raw_output_scalar": "user_value", "kept": true}))
2908 );
2909 assert_eq!(tr.raw_output, None);
2910 }
2911
2912 #[test]
2913 fn test_success_result_with_only_raw_output_scalar_key_is_not_unwrapped() {
2914 let res = ToolExecutionResult::success(serde_json::json!({"_raw_output_scalar": "v"}));
2917 let tr = res.into_tool_result("call_1", "demo");
2918 assert_eq!(
2919 tr.result,
2920 Some(serde_json::json!({"_raw_output_scalar": "v"}))
2921 );
2922 assert_eq!(tr.raw_output, None);
2923 }
2924
2925 #[test]
2926 fn test_echo_tool_display_name() {
2927 let tool = EchoTool;
2928 assert_eq!(tool.display_name(), Some("Echo"));
2929
2930 let def = tool.to_definition();
2931 assert_eq!(def.display_name(), Some("Echo"));
2932 }
2933
2934 #[test]
2935 fn test_all_default_tools_have_display_names() {
2936 let registry = ToolRegistry::with_defaults();
2937 let definitions = registry.tool_definitions();
2938
2939 for def in &definitions {
2940 assert!(
2941 def.display_name().is_some(),
2942 "Tool '{}' should have a display_name",
2943 def.name()
2944 );
2945 }
2946 }
2947
2948 #[tokio::test]
2949 async fn test_tool_registry_as_executor() {
2950 let mut registry = ToolRegistry::new();
2951 registry.register(EchoTool);
2952
2953 let tool_call = ToolCall {
2954 id: "call_1".to_string(),
2955 name: "echo".to_string(),
2956 arguments: serde_json::json!({"message": "test"}),
2957 };
2958
2959 let tool_def = registry.get("echo").unwrap().to_definition();
2960 let result = registry.execute(&tool_call, &tool_def).await.unwrap();
2961
2962 assert!(result.error.is_none());
2963 assert_eq!(result.result.unwrap()["echoed"], "test");
2964 }
2965
2966 #[test]
2967 fn test_tool_to_definition() {
2968 let tool = GetCurrentTimeTool;
2969 let def = tool.to_definition();
2970
2971 let ToolDefinition::Builtin(builtin) = def else {
2972 panic!("expected Builtin variant");
2973 };
2974 assert_eq!(builtin.name, "get_current_time");
2975 assert_eq!(builtin.policy, ToolPolicy::Auto);
2976 }
2977
2978 #[test]
2979 fn test_with_defaults_has_expected_tools() {
2980 let registry = ToolRegistry::with_defaults();
2981
2982 assert!(
2984 registry.has("get_current_time"),
2985 "should have get_current_time"
2986 );
2987 assert!(registry.has("echo"), "should have echo");
2988 assert!(
2991 !registry.has("spawn_background"),
2992 "spawn_background must NOT be in defaults — it comes from the \
2993 background_execution capability"
2994 );
2995 assert!(
2996 registry.has("report_progress"),
2997 "should have report_progress"
2998 );
2999
3000 assert!(registry.has("add"), "should have add");
3002 assert!(registry.has("subtract"), "should have subtract");
3003 assert!(registry.has("multiply"), "should have multiply");
3004 assert!(registry.has("divide"), "should have divide");
3005
3006 assert!(registry.has("get_weather"), "should have get_weather");
3008 assert!(registry.has("get_forecast"), "should have get_forecast");
3009
3010 assert!(registry.has("write_todos"), "should have write_todos");
3012
3013 assert!(registry.has("read_file"), "should have read_file");
3015 assert!(registry.has("write_file"), "should have write_file");
3016 assert!(registry.has("edit_file"), "should have edit_file");
3017 assert!(registry.has("list_directory"), "should have list_directory");
3018 assert!(registry.has("grep_files"), "should have grep_files");
3019 assert!(registry.has("delete_file"), "should have delete_file");
3020 assert!(registry.has("stat_file"), "should have stat_file");
3021
3022 assert!(registry.has("web_fetch"), "should have web_fetch");
3024
3025 assert_eq!(registry.len(), 18, "should have 18 default tools");
3027 }
3028
3029 #[tokio::test]
3030 async fn test_with_defaults_tools_are_executable() {
3031 let registry = ToolRegistry::with_defaults();
3032
3033 let tool_call = ToolCall {
3035 id: "call_1".to_string(),
3036 name: "echo".to_string(),
3037 arguments: serde_json::json!({"message": "hello from defaults"}),
3038 };
3039
3040 let tool_def = registry.get("echo").unwrap().to_definition();
3041 let result = registry.execute(&tool_call, &tool_def).await.unwrap();
3042
3043 assert!(result.error.is_none());
3044 assert_eq!(result.result.unwrap()["echoed"], "hello from defaults");
3045 }
3046
3047 #[tokio::test]
3048 async fn test_with_defaults_math_tools() {
3049 let registry = ToolRegistry::with_defaults();
3050
3051 let tool_call = ToolCall {
3053 id: "call_add".to_string(),
3054 name: "add".to_string(),
3055 arguments: serde_json::json!({"a": 5, "b": 3}),
3056 };
3057
3058 let tool_def = registry.get("add").unwrap().to_definition();
3059 let result = registry.execute(&tool_call, &tool_def).await.unwrap();
3060
3061 assert!(result.error.is_none());
3062 assert_eq!(result.result.unwrap()["result"].as_f64().unwrap(), 8.0);
3064 }
3065
3066 #[test]
3070 fn test_with_defaults_excludes_capability_only_tools() {
3071 let registry = ToolRegistry::with_defaults();
3072
3073 assert!(
3075 !registry.has("bash"),
3076 "bash must not be in defaults — it comes from bashkit_shell capability"
3077 );
3078 assert!(
3080 !registry.has("kv_store"),
3081 "kv_store must not be in defaults — it comes from session_storage capability"
3082 );
3083 assert!(
3087 !registry.has("spawn_background"),
3088 "spawn_background must not be in defaults — it comes from the \
3089 background_execution capability (auto-activated by tool hints)"
3090 );
3091 }
3092
3093 #[tokio::test]
3094 async fn test_spawn_background_executes_and_signals_session() {
3095 let session_id = SessionId::new();
3096 let file_store = Arc::new(TestFileStore::default());
3097 let platform_store = Arc::new(TestPlatformStore::default());
3098 let storage_store = Arc::new(NoopStorageStore);
3099 let task_registry = Arc::new(InMemoryTaskRegistry::default());
3100 let tool_registry = ToolRegistry::builder()
3101 .tool(SpawnBackgroundTool)
3102 .tool(TestBackgroundTool)
3103 .build();
3104
3105 let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3106 .with_tool_registry(Arc::new(tool_registry))
3107 .with_platform_store(platform_store.clone())
3108 .with_session_task_registry(task_registry.clone());
3109
3110 let tool = SpawnBackgroundTool;
3111 let result = tool
3112 .execute_with_context(
3113 json!({
3114 "tool": "test_background",
3115 "args": { "summary": "Background complete" }
3116 }),
3117 &context,
3118 )
3119 .await;
3120
3121 let ToolExecutionResult::Success(value) = result else {
3122 panic!("spawn_background should succeed");
3123 };
3124 let run_id = value["run_id"].as_str().unwrap().to_string();
3125 let task_id = value["task_id"].as_str().unwrap().to_string();
3126
3127 tokio::time::timeout(std::time::Duration::from_secs(2), async {
3128 loop {
3129 if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3130 && task.state == crate::session_task::SessionTaskState::Succeeded
3131 {
3132 break task;
3133 }
3134 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3135 }
3136 })
3137 .await
3138 .expect("background run should complete");
3139 let _ = run_id; let messages = platform_store.sent_messages.lock().unwrap().clone();
3142 assert_eq!(messages.len(), 1);
3143 assert!(messages[0].contains("Background run completed"));
3144
3145 let log_file = file_store
3146 .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3147 .await
3148 .unwrap()
3149 .expect("log file");
3150 assert!(
3151 log_file
3152 .content
3153 .as_deref()
3154 .unwrap_or_default()
3155 .contains("hello from background")
3156 );
3157 }
3158
3159 #[tokio::test]
3160 async fn test_spawn_background_persists_failure_artifacts() {
3161 let session_id = SessionId::new();
3162 let file_store = Arc::new(TestFileStore::default());
3163 let storage_store = Arc::new(NoopStorageStore);
3164 let task_registry = Arc::new(InMemoryTaskRegistry::default());
3165 let tool_registry = ToolRegistry::builder()
3166 .tool(SpawnBackgroundTool)
3167 .tool(TestFailingBackgroundTool)
3168 .build();
3169
3170 let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3171 .with_tool_registry(Arc::new(tool_registry))
3172 .with_session_task_registry(task_registry.clone());
3173
3174 let result = SpawnBackgroundTool
3175 .execute_with_context(
3176 json!({
3177 "tool": "test_background_fail",
3178 "args": {}
3179 }),
3180 &context,
3181 )
3182 .await;
3183
3184 let ToolExecutionResult::Success(value) = result else {
3185 panic!("spawn_background should succeed");
3186 };
3187 let run_id = value["run_id"].as_str().unwrap().to_string();
3188 let task_id = value["task_id"].as_str().unwrap().to_string();
3189
3190 tokio::time::timeout(std::time::Duration::from_secs(2), async {
3191 loop {
3192 if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3193 && task.state == crate::session_task::SessionTaskState::Failed
3194 {
3195 break task;
3196 }
3197 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3198 }
3199 })
3200 .await
3201 .expect("background run should fail");
3202 let _ = run_id;
3203
3204 let log_file = file_store
3205 .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3206 .await
3207 .unwrap()
3208 .expect("log file");
3209 assert!(
3210 log_file
3211 .content
3212 .as_deref()
3213 .unwrap_or_default()
3214 .contains("background failed")
3215 );
3216
3217 let result_file = file_store
3218 .read_file(session_id, &format!("/.background/{run_id}/result.json"))
3219 .await
3220 .unwrap()
3221 .expect("result file");
3222 let result_json: Value =
3223 serde_json::from_str(result_file.content.as_deref().unwrap_or_default())
3224 .expect("valid json");
3225 assert_eq!(result_json["status"], "failed");
3226 assert_eq!(result_json["error"], "boom");
3227 }
3228
3229 #[tokio::test]
3230 async fn test_spawn_background_rejects_when_session_active_run_limit_reached() {
3231 let session_id = SessionId::new();
3232 let file_store = Arc::new(TestFileStore::default());
3233 let storage_store = Arc::new(NoopStorageStore);
3234 let task_registry = Arc::new(InMemoryTaskRegistry::default());
3235 let release = StdArc::new(AtomicBool::new(false));
3236 let tool_registry = ToolRegistry::builder()
3237 .tool(SpawnBackgroundTool)
3238 .tool(BlockingBackgroundTool {
3239 release: release.clone(),
3240 })
3241 .build();
3242
3243 let context = ToolContext::with_stores(session_id, file_store, storage_store)
3244 .with_tool_registry(Arc::new(tool_registry))
3245 .with_session_task_registry(task_registry.clone());
3246
3247 let mut task_ids = Vec::new();
3248 for _ in 0..MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION {
3249 let result = SpawnBackgroundTool
3250 .execute_with_context(
3251 json!({
3252 "tool": "test_background_blocking",
3253 "args": {}
3254 }),
3255 &context,
3256 )
3257 .await;
3258
3259 let ToolExecutionResult::Success(value) = result else {
3260 panic!("background run below the session limit should start");
3261 };
3262 task_ids.push(value["task_id"].as_str().unwrap().to_string());
3263 }
3264
3265 tokio::time::timeout(std::time::Duration::from_secs(2), async {
3267 loop {
3268 let running = task_registry
3269 .list(
3270 session_id,
3271 Some(&crate::session_task::SessionTaskFilter {
3272 kind: Some(crate::session_task::TASK_KIND_BACKGROUND_TOOL.to_string()),
3273 state: Some(crate::session_task::SessionTaskState::Running),
3274 }),
3275 )
3276 .await
3277 .unwrap();
3278 if running.len() == MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION {
3279 break;
3280 }
3281 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3282 }
3283 })
3284 .await
3285 .expect("background runs should become running");
3286
3287 let result = SpawnBackgroundTool
3288 .execute_with_context(
3289 json!({
3290 "tool": "test_background_blocking",
3291 "args": {}
3292 }),
3293 &context,
3294 )
3295 .await;
3296
3297 let ToolExecutionResult::ToolError(message) = result else {
3298 release.store(true, Ordering::SeqCst);
3299 panic!("spawn_background should reject once the session limit is reached");
3300 };
3301 assert!(message.contains("active background runs per session"));
3302
3303 release.store(true, Ordering::SeqCst);
3304 tokio::time::timeout(std::time::Duration::from_secs(2), async {
3305 for task_id in task_ids {
3306 loop {
3307 if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3308 && task.state == crate::session_task::SessionTaskState::Succeeded
3309 {
3310 break;
3311 }
3312 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3313 }
3314 }
3315 })
3316 .await
3317 .expect("blocking background runs should complete after release");
3318
3319 tokio::time::timeout(std::time::Duration::from_secs(1), async {
3324 loop {
3325 if !has_session_background_permits(session_id) {
3326 break;
3327 }
3328 tokio::time::sleep(std::time::Duration::from_millis(5)).await;
3329 }
3330 })
3331 .await
3332 .expect("completed background runs should prune their per-session permit cache entry");
3333 }
3334
3335 #[tokio::test]
3336 async fn test_spawn_background_requires_task_registry() {
3337 let session_id = SessionId::new();
3338 let file_store = Arc::new(TestFileStore::default());
3339 let storage_store = Arc::new(NoopStorageStore);
3340 let tool_registry = ToolRegistry::builder()
3341 .tool(SpawnBackgroundTool)
3342 .tool(TestBackgroundTool)
3343 .build();
3344
3345 let context = ToolContext::with_stores(session_id, file_store, storage_store)
3347 .with_tool_registry(Arc::new(tool_registry));
3348
3349 let result = SpawnBackgroundTool
3350 .execute_with_context(
3351 json!({
3352 "tool": "test_background",
3353 "args": {}
3354 }),
3355 &context,
3356 )
3357 .await;
3358
3359 let ToolExecutionResult::ToolError(message) = result else {
3360 panic!("spawn_background should reject missing task registry");
3361 };
3362 assert!(message.contains("Session task registry not available"));
3363 }
3364
3365 #[tokio::test]
3366 async fn test_spawn_background_requires_file_store() {
3367 let session_id = SessionId::new();
3368 let storage_store = Arc::new(NoopStorageStore);
3369 let task_registry = Arc::new(InMemoryTaskRegistry::default());
3370 let tool_registry = ToolRegistry::builder()
3371 .tool(SpawnBackgroundTool)
3372 .tool(TestBackgroundTool)
3373 .build();
3374
3375 let context = ToolContext::with_storage_store(session_id, storage_store)
3377 .with_tool_registry(Arc::new(tool_registry))
3378 .with_session_task_registry(task_registry);
3379
3380 let result = SpawnBackgroundTool
3381 .execute_with_context(
3382 json!({
3383 "tool": "test_background",
3384 "args": {}
3385 }),
3386 &context,
3387 )
3388 .await;
3389
3390 let ToolExecutionResult::ToolError(message) = result else {
3391 panic!("spawn_background should reject missing file store");
3392 };
3393 assert!(message.contains("Session file store not available"));
3394 }
3395
3396 #[tokio::test]
3397 async fn test_spawn_background_caps_output_log_size() {
3398 let session_id = SessionId::new();
3399 let file_store = Arc::new(TestFileStore::default());
3400 let storage_store = Arc::new(NoopStorageStore);
3401 let task_registry = Arc::new(InMemoryTaskRegistry::default());
3402 let tool_registry = ToolRegistry::builder()
3403 .tool(SpawnBackgroundTool)
3404 .tool(TestLargeOutputBackgroundTool)
3405 .build();
3406
3407 let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3408 .with_tool_registry(Arc::new(tool_registry))
3409 .with_session_task_registry(task_registry.clone());
3410
3411 let result = SpawnBackgroundTool
3412 .execute_with_context(
3413 json!({
3414 "tool": "test_background_large_output",
3415 "args": {}
3416 }),
3417 &context,
3418 )
3419 .await;
3420
3421 let ToolExecutionResult::Success(value) = result else {
3422 panic!("spawn_background should succeed");
3423 };
3424 let run_id = value["run_id"].as_str().unwrap().to_string();
3425 let task_id = value["task_id"].as_str().unwrap().to_string();
3426
3427 tokio::time::timeout(std::time::Duration::from_secs(2), async {
3428 loop {
3429 if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3430 && task.state == crate::session_task::SessionTaskState::Succeeded
3431 {
3432 break;
3433 }
3434 tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3435 }
3436 })
3437 .await
3438 .expect("background run should complete");
3439 let _ = run_id;
3440
3441 let log_content = file_store
3442 .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3443 .await
3444 .unwrap()
3445 .expect("log file")
3446 .content
3447 .unwrap_or_default();
3448
3449 assert!(log_content.contains("[system] background output truncated"));
3450 assert!(log_content.chars().count() <= MAX_BACKGROUND_OUTPUT_LOG_CHARS + 128);
3451 }
3452
3453 #[tokio::test]
3454 async fn test_spawn_background_can_create_scheduled_monitor() {
3455 let session_id = SessionId::new();
3456 let schedule_store = Arc::new(TestScheduleStore::default());
3457 let storage_store = Arc::new(NoopStorageStore);
3458 let tool_registry = ToolRegistry::builder()
3459 .tool(SpawnBackgroundTool)
3460 .tool(TestBackgroundTool)
3461 .build();
3462
3463 let context = ToolContext::with_storage_store(session_id, storage_store)
3464 .with_tool_registry(Arc::new(tool_registry))
3465 .with_schedule_store(schedule_store.clone());
3466
3467 let result = SpawnBackgroundTool
3468 .execute_with_context(
3469 json!({
3470 "tool": "test_background",
3471 "title": "Watch PR 1319",
3472 "args": { "summary": "Background complete" },
3473 "schedule": {
3474 "cron_expression": "*/10 * * * *",
3475 "timezone": "America/Chicago"
3476 }
3477 }),
3478 &context,
3479 )
3480 .await;
3481
3482 let ToolExecutionResult::Success(value) = result else {
3483 panic!("spawn_background should create a schedule: {result:?}");
3484 };
3485
3486 assert_eq!(value["status"], "scheduled");
3487 assert_eq!(value["title"], "Watch PR 1319");
3488 assert_eq!(value["cron_expression"], "*/10 * * * *");
3489 assert_eq!(value["timezone"], "America/Chicago");
3490
3491 let schedules = schedule_store.list_schedules(session_id).await.unwrap();
3492 assert_eq!(schedules.len(), 1);
3493 assert_eq!(
3494 schedules[0].cron_expression.as_deref(),
3495 Some("*/10 * * * *")
3496 );
3497 assert!(schedules[0].description.contains("Monitor: Watch PR 1319"));
3498 assert!(
3499 schedules[0]
3500 .description
3501 .contains("\"summary\": \"Background complete\"")
3502 );
3503 }
3504
3505 #[tokio::test]
3506 async fn test_spawn_background_rejects_invalid_scheduled_at() {
3507 let session_id = SessionId::new();
3508 let storage_store = Arc::new(NoopStorageStore);
3509 let tool_registry = ToolRegistry::builder()
3510 .tool(SpawnBackgroundTool)
3511 .tool(TestBackgroundTool)
3512 .build();
3513 let context = ToolContext::with_storage_store(session_id, storage_store)
3514 .with_tool_registry(Arc::new(tool_registry));
3515
3516 let result = SpawnBackgroundTool
3517 .execute_with_context(
3518 json!({
3519 "tool": "test_background",
3520 "args": {},
3521 "schedule": {
3522 "scheduled_at": "tomorrow at noon"
3523 }
3524 }),
3525 &context,
3526 )
3527 .await;
3528
3529 let ToolExecutionResult::ToolError(message) = result else {
3530 panic!("spawn_background should reject invalid scheduled_at");
3531 };
3532 assert!(message.contains("scheduled_at must be RFC3339"));
3533 }
3534
3535 #[tokio::test]
3536 async fn test_spawn_background_rejects_ambiguous_schedule_shape() {
3537 let session_id = SessionId::new();
3538 let storage_store = Arc::new(NoopStorageStore);
3539 let tool_registry = ToolRegistry::builder()
3540 .tool(SpawnBackgroundTool)
3541 .tool(TestBackgroundTool)
3542 .build();
3543 let context = ToolContext::with_storage_store(session_id, storage_store)
3544 .with_tool_registry(Arc::new(tool_registry));
3545
3546 let result = SpawnBackgroundTool
3547 .execute_with_context(
3548 json!({
3549 "tool": "test_background",
3550 "args": {},
3551 "schedule": {
3552 "cron_expression": "*/10 * * * *",
3553 "scheduled_at": "2026-04-16T15:30:00Z"
3554 }
3555 }),
3556 &context,
3557 )
3558 .await;
3559
3560 let ToolExecutionResult::ToolError(message) = result else {
3561 panic!("spawn_background should reject ambiguous schedule shape");
3562 };
3563 assert!(message.contains("must not include both cron_expression and scheduled_at"));
3564 }
3565
3566 #[test]
3571 fn test_is_canceled_outcome_detects_sentinel() {
3572 let sentinel: std::result::Result<BackgroundOutcome, ToolExecutionResult> = Err(
3574 ToolExecutionResult::ToolError(BACKGROUND_CANCEL_SENTINEL.to_string()),
3575 );
3576 assert!(is_canceled_outcome(&sentinel));
3577 }
3578
3579 #[test]
3580 fn test_is_canceled_outcome_does_not_match_other_errors() {
3581 let other_err: std::result::Result<BackgroundOutcome, ToolExecutionResult> =
3582 Err(ToolExecutionResult::ToolError("boom".to_string()));
3583 assert!(!is_canceled_outcome(&other_err));
3584
3585 let success: std::result::Result<BackgroundOutcome, ToolExecutionResult> =
3586 Ok(BackgroundOutcome {
3587 summary: "ok".to_string(),
3588 result: json!({"ok": true}),
3589 raw_output: None,
3590 });
3591 assert!(!is_canceled_outcome(&success));
3592 }
3593
3594 #[derive(Default)]
3597 struct SleepingBackgroundTool;
3598
3599 #[async_trait]
3600 impl BackgroundExecutableTool for SleepingBackgroundTool {
3601 async fn execute_background(
3602 &self,
3603 _arguments: Value,
3604 _context: ToolContext,
3605 sink: Arc<dyn BackgroundEventSink>,
3606 ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
3607 sink.status("Sleeping forever")
3608 .await
3609 .map_err(ToolExecutionResult::internal_error)?;
3610 tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
3612 Ok(BackgroundOutcome {
3613 summary: "should not reach here".to_string(),
3614 result: json!({}),
3615 raw_output: None,
3616 })
3617 }
3618 }
3619
3620 #[async_trait]
3621 impl Tool for SleepingBackgroundTool {
3622 fn name(&self) -> &str {
3623 "test_background_sleeping"
3624 }
3625
3626 fn display_name(&self) -> Option<&str> {
3627 Some("Test Background Sleeping")
3628 }
3629
3630 fn description(&self) -> &str {
3631 "background test tool that sleeps indefinitely"
3632 }
3633
3634 fn parameters_schema(&self) -> Value {
3635 json!({
3636 "type": "object",
3637 "properties": {}
3638 })
3639 }
3640
3641 async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
3642 ToolExecutionResult::tool_error("foreground unsupported")
3643 }
3644
3645 fn hints(&self) -> ToolHints {
3646 ToolHints::default().with_supports_background(true)
3647 }
3648
3649 fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
3650 Some(self)
3651 }
3652 }
3653
3654 #[derive(Default)]
3658 struct InMemoryTaskRegistry {
3659 tasks: Mutex<HashMap<String, crate::session_task::SessionTask>>,
3660 }
3661
3662 #[async_trait]
3663 impl crate::session_task::SessionTaskRegistry for InMemoryTaskRegistry {
3664 async fn create(
3665 &self,
3666 input: crate::session_task::CreateSessionTask,
3667 ) -> crate::Result<crate::session_task::SessionTask> {
3668 let mut tasks = self.tasks.lock().unwrap();
3669 if let Some(id) = &input.id
3670 && let Some(existing) = tasks.get(id)
3671 {
3672 return Ok(existing.clone());
3673 }
3674 let task = crate::session_task::new_session_task(input, chrono::Utc::now());
3675 tasks.insert(task.id.clone(), task.clone());
3676 Ok(task)
3677 }
3678
3679 async fn update(
3680 &self,
3681 _session_id: SessionId,
3682 task_id: &str,
3683 update: crate::session_task::SessionTaskUpdate,
3684 ) -> crate::Result<Option<crate::session_task::SessionTask>> {
3685 let mut tasks = self.tasks.lock().unwrap();
3686 let Some(task) = tasks.get_mut(task_id) else {
3687 return Ok(None);
3688 };
3689 crate::session_task::apply_task_update(task, update, chrono::Utc::now());
3690 Ok(Some(task.clone()))
3691 }
3692
3693 async fn get(
3694 &self,
3695 _session_id: SessionId,
3696 task_id: &str,
3697 ) -> crate::Result<Option<crate::session_task::SessionTask>> {
3698 Ok(self.tasks.lock().unwrap().get(task_id).cloned())
3699 }
3700
3701 async fn list(
3702 &self,
3703 _session_id: SessionId,
3704 filter: Option<&crate::session_task::SessionTaskFilter>,
3705 ) -> crate::Result<Vec<crate::session_task::SessionTask>> {
3706 let tasks = self.tasks.lock().unwrap();
3707 Ok(tasks
3708 .values()
3709 .filter(|task| {
3710 filter.is_none_or(|f| {
3711 f.kind.as_deref().is_none_or(|kind| task.kind == kind)
3712 && f.state.is_none_or(|state| task.state == state)
3713 })
3714 })
3715 .cloned()
3716 .collect())
3717 }
3718
3719 async fn request_cancel(
3720 &self,
3721 _session_id: SessionId,
3722 task_id: &str,
3723 ) -> crate::Result<Option<crate::session_task::SessionTask>> {
3724 let mut tasks = self.tasks.lock().unwrap();
3725 let Some(task) = tasks.get_mut(task_id) else {
3726 return Ok(None);
3727 };
3728 task.cancel_requested_at
3729 .get_or_insert_with(chrono::Utc::now);
3730 task.updated_at = chrono::Utc::now();
3731 Ok(Some(task.clone()))
3732 }
3733
3734 async fn record_message(
3735 &self,
3736 _session_id: SessionId,
3737 task_id: &str,
3738 message: crate::session_task::NewTaskMessage,
3739 ) -> crate::Result<crate::session_task::TaskMessage> {
3740 let tasks = self.tasks.lock().unwrap();
3741 let _task = tasks
3742 .get(task_id)
3743 .ok_or_else(|| crate::AgentLoopError::tool(format!("no task {task_id}")))?;
3744 Ok(crate::session_task::TaskMessage {
3745 id: crate::session_task::generate_task_message_id(),
3746 task_id: task_id.to_string(),
3747 direction: message.direction,
3748 content: message.content,
3749 in_reply_to: message.in_reply_to,
3750 created_at: chrono::Utc::now(),
3751 })
3752 }
3753
3754 async fn list_messages(
3755 &self,
3756 _session_id: SessionId,
3757 _task_id: &str,
3758 _limit: Option<u32>,
3759 _after_id: Option<&str>,
3760 ) -> crate::Result<Vec<crate::session_task::TaskMessage>> {
3761 Ok(Vec::new())
3762 }
3763 }
3764
3765 #[tokio::test]
3768 async fn test_cancel_background_run_via_task_registry() {
3769 let session_id = SessionId::new();
3770 let file_store = Arc::new(TestFileStore::default());
3771 let storage_store = Arc::new(NoopStorageStore);
3772 let task_registry = Arc::new(InMemoryTaskRegistry::default());
3773
3774 let tool_registry = ToolRegistry::builder()
3775 .tool(SpawnBackgroundTool)
3776 .tool(SleepingBackgroundTool)
3777 .build();
3778
3779 let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3780 .with_tool_registry(Arc::new(tool_registry))
3781 .with_session_task_registry(task_registry.clone());
3782
3783 let result = SpawnBackgroundTool
3784 .execute_with_context(
3785 json!({
3786 "tool": "test_background_sleeping",
3787 "args": {},
3788 "signal_on_completion": false
3789 }),
3790 &context,
3791 )
3792 .await;
3793
3794 let ToolExecutionResult::Success(value) = result else {
3795 panic!("spawn_background should succeed");
3796 };
3797 let run_id = value["run_id"].as_str().unwrap().to_string();
3798 let task_id = value["task_id"].as_str().unwrap().to_string();
3799
3800 tokio::time::timeout(std::time::Duration::from_secs(5), async {
3802 loop {
3803 if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3805 && task.heartbeat_at.is_some()
3806 {
3807 break;
3808 }
3809 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3810 }
3811 })
3812 .await
3813 .expect("background run should start and send at least one heartbeat");
3814
3815 task_registry
3817 .request_cancel(session_id, &task_id)
3818 .await
3819 .expect("request_cancel should succeed");
3820
3821 tokio::time::timeout(std::time::Duration::from_secs(10), async {
3823 loop {
3824 if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3825 && task.state == crate::session_task::SessionTaskState::Canceled
3826 {
3827 break task;
3828 }
3829 tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3830 }
3831 })
3832 .await
3833 .expect("background task should reach Canceled state");
3834
3835 let result_file = file_store
3837 .read_file(session_id, &format!("/.background/{run_id}/result.json"))
3838 .await
3839 .unwrap()
3840 .expect("result.json should exist");
3841 let result_json: Value =
3842 serde_json::from_str(result_file.content.as_deref().unwrap_or_default())
3843 .expect("valid json");
3844 assert_eq!(result_json["status"], "canceled");
3845
3846 let log_file = file_store
3847 .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3848 .await
3849 .unwrap()
3850 .expect("output.log should exist");
3851 assert!(
3852 log_file
3853 .content
3854 .as_deref()
3855 .unwrap_or_default()
3856 .contains("Canceled by request.")
3857 );
3858 }
3859
3860 fn make_reattach_task(spec: serde_json::Value) -> crate::session_task::SessionTask {
3865 use crate::session_task::{SessionTaskState, TaskLinks, TaskWakePolicy};
3866 crate::session_task::SessionTask {
3867 id: "t-reattach".to_string(),
3868 session_id: SessionId::new(),
3869 kind: crate::session_task::TASK_KIND_BACKGROUND_TOOL.to_string(),
3870 display_name: "Reattach test".to_string(),
3871 spec,
3872 state: SessionTaskState::Running,
3873 state_detail: None,
3874 progress: None,
3875 input_request: None,
3876 cancel_requested_at: None,
3877 summary: None,
3878 result_path: None,
3879 artifacts: vec![],
3880 error: None,
3881 attempt: 2,
3882 worker_id: None,
3883 heartbeat_at: None,
3884 links: TaskLinks::default(),
3885 wake_policy: TaskWakePolicy::Silent,
3886 created_at: chrono::Utc::now(),
3887 started_at: None,
3888 finished_at: None,
3889 updated_at: chrono::Utc::now(),
3890 }
3891 }
3892
3893 #[tokio::test]
3894 async fn reattach_fails_with_missing_file_store() {
3895 let session_id = SessionId::new();
3896 let task_registry = Arc::new(InMemoryTaskRegistry::default());
3898 let context =
3899 crate::traits::ToolContext::new(session_id).with_session_task_registry(task_registry);
3900 let task = make_reattach_task(serde_json::json!({
3901 "tool": "get_current_time",
3902 "arguments": {},
3903 "reattachable": true,
3904 "signal_on_completion": true,
3905 }));
3906 let err = reattach_background_run(&task, &context)
3907 .await
3908 .expect_err("should fail without file store");
3909 assert!(
3910 err.to_string().contains("file store"),
3911 "error should mention file store, got: {err}"
3912 );
3913 }
3914
3915 #[tokio::test]
3916 async fn reattach_fails_with_missing_task_registry() {
3917 let session_id = SessionId::new();
3918 let file_store = Arc::new(TestFileStore::default());
3919 let storage_store = Arc::new(NoopStorageStore);
3920 let context =
3922 crate::traits::ToolContext::with_stores(session_id, file_store, storage_store);
3923 let task = make_reattach_task(serde_json::json!({
3924 "tool": "get_current_time",
3925 "arguments": {},
3926 "reattachable": true,
3927 "signal_on_completion": true,
3928 }));
3929 let err = reattach_background_run(&task, &context)
3930 .await
3931 .expect_err("should fail without task registry");
3932 assert!(
3933 err.to_string().contains("task registry"),
3934 "error should mention task registry, got: {err}"
3935 );
3936 }
3937
3938 #[tokio::test]
3939 async fn reattach_fails_with_unknown_tool_name() {
3940 let session_id = SessionId::new();
3941 let file_store = Arc::new(TestFileStore::default());
3942 let storage_store = Arc::new(NoopStorageStore);
3943 let task_registry = Arc::new(InMemoryTaskRegistry::default());
3944 let context =
3945 crate::traits::ToolContext::with_stores(session_id, file_store, storage_store)
3946 .with_session_task_registry(task_registry);
3947 let task = make_reattach_task(serde_json::json!({
3949 "tool": "test_background",
3950 "arguments": {},
3951 "reattachable": true,
3952 "signal_on_completion": true,
3953 }));
3954 let err = reattach_background_run(&task, &context)
3955 .await
3956 .expect_err("should fail for unknown tool");
3957 assert!(
3958 err.to_string().contains("not found in built-in registry"),
3959 "error should mention built-in registry, got: {err}"
3960 );
3961 }
3962
3963 #[tokio::test]
3964 async fn reattach_fails_with_missing_tool_spec_field() {
3965 let session_id = SessionId::new();
3966 let file_store = Arc::new(TestFileStore::default());
3967 let storage_store = Arc::new(NoopStorageStore);
3968 let task_registry = Arc::new(InMemoryTaskRegistry::default());
3969 let context =
3970 crate::traits::ToolContext::with_stores(session_id, file_store, storage_store)
3971 .with_session_task_registry(task_registry);
3972 let task = make_reattach_task(serde_json::json!({ "reattachable": true }));
3974 let err = reattach_background_run(&task, &context)
3975 .await
3976 .expect_err("should fail with missing tool field");
3977 assert!(
3978 err.to_string().contains("missing 'tool' field"),
3979 "error should mention missing tool field, got: {err}"
3980 );
3981 }
3982}