Skip to main content

everruns_core/
tools.rs

1// Tool Abstraction for Agent Loop
2//
3// This module provides a high-level abstraction for tools that can be executed
4// by the agent loop. Tools are defined using the `Tool` trait and can be
5// registered with a `ToolRegistry` for use in the loop.
6//
7// Design decisions:
8// - Tools are defined via a trait for flexibility (function-style tools)
9// - ToolRegistry implements ToolExecutor for integration with the agent loop
10// - Error handling distinguishes between user-visible errors and internal errors
11// - Internal errors are logged but not exposed to the LLM (security)
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use serde_json::{Value, json};
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use tracing::error;
19
20use crate::background::{
21    BackgroundEventSink, BackgroundExecutableTool, BackgroundOutcome, BackgroundProgress,
22};
23use crate::session_resource::{RegisterSessionResource, SessionResourceStatus};
24use crate::session_schedule::MAX_ACTIVE_SCHEDULES_PER_SESSION;
25use crate::tool_types::{
26    BuiltinTool, DeferrablePolicy, ToolCall, ToolDefinition, ToolHints, ToolPolicy, ToolResult,
27};
28use crate::traits::ToolContext;
29use crate::typed_id::SessionId;
30use tokio::sync::Semaphore;
31
32use crate::error::Result;
33use crate::traits::ToolExecutor;
34
35/// Maximum active immediate background runs allowed for a single session.
36///
37/// This mirrors the scheduled monitor cap so model-visible background execution
38/// cannot be used to queue unbounded active worker jobs for one session.
39pub const MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION: usize = 5;
40
41/// Maximum active immediate background runs allowed in this worker process.
42///
43/// The per-session semaphore limits tenant/session abuse; this process-wide
44/// semaphore keeps concurrent sessions from exhausting worker-local resources.
45const MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER: usize = 64;
46static ACTIVE_BACKGROUND_RUNS_PER_WORKER: Semaphore =
47    Semaphore::const_new(MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER);
48
49/// Per-session semaphores that enforce `MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION`.
50///
51/// Using a semaphore rather than a DB count makes the check atomic: concurrent
52/// `spawn_background` calls for the same session cannot both slip through the
53/// guard (check-then-act race) because `try_acquire` is inherently atomic.
54static SESSION_BACKGROUND_PERMITS: OnceLock<std::sync::Mutex<HashMap<SessionId, Arc<Semaphore>>>> =
55    OnceLock::new();
56
57fn session_background_semaphore(session_id: SessionId) -> Arc<Semaphore> {
58    SESSION_BACKGROUND_PERMITS
59        .get_or_init(Default::default)
60        .lock()
61        .unwrap()
62        .entry(session_id)
63        .or_insert_with(|| Arc::new(Semaphore::new(MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION)))
64        .clone()
65}
66
67// ============================================================================
68// Tool Execution Result - Error Handling Contract
69// ============================================================================
70
71/// Image data returned by a tool alongside text results.
72///
73/// This allows tools (built-in or MCP) to return images that are sent
74/// to the LLM as native image content blocks, not stringified JSON.
75#[derive(Debug, Clone, Serialize, Deserialize)]
76pub struct ToolResultImage {
77    /// Base64-encoded image data
78    pub base64: String,
79    /// MIME type (e.g., "image/png", "image/jpeg")
80    pub media_type: String,
81}
82
83/// Result of a tool execution.
84///
85/// This enum distinguishes between different outcomes:
86/// - `Success`: Tool executed successfully, result is returned to LLM
87/// - `SuccessWithImages`: Successful execution with JSON result plus images
88/// - `ToolError`: Tool-level error that should be shown to the LLM
89///   (e.g., "City not found", "Invalid date format")
90/// - `InternalError`: System-level error that should NOT be exposed to the LLM
91///   (e.g., database connection failure, API key issues)
92///
93/// # Security
94///
95/// Internal errors are logged but replaced with a generic message when
96/// returned to the LLM. This prevents leaking sensitive information like
97/// database errors, API keys, or internal system details.
98#[derive(Debug)]
99pub enum ToolExecutionResult {
100    /// Successful execution with a JSON result
101    Success(Value),
102
103    /// Successful execution with a JSON result and images.
104    /// Images are sent to the LLM as native image content blocks
105    /// (not stringified JSON), enabling visual understanding.
106    SuccessWithImages {
107        result: Value,
108        images: Vec<ToolResultImage>,
109    },
110
111    /// Tool-level error that is safe to show to the LLM
112    ///
113    /// Use this for expected error conditions that the LLM should know about,
114    /// such as validation errors, resource not found, etc.
115    ToolError(String),
116
117    /// Internal/system error that should NOT be exposed to the LLM
118    ///
119    /// Use this for unexpected errors like network failures, database errors,
120    /// or other internal issues. The error details will be logged but replaced
121    /// with a generic message when returned to the LLM.
122    InternalError(ToolInternalError),
123
124    /// A user connection is required to execute this tool.
125    ///
126    /// Instead of returning an error, this signals that the workflow should
127    /// pause and ask the client to set up a connection for the given provider.
128    /// The UI renders an inline connection dialog; once the user saves (or
129    /// cancels), a tool result is submitted and execution resumes.
130    ConnectionRequired {
131        /// Connection provider id (e.g. "daytona", "brave_search")
132        provider: String,
133    },
134}
135
136impl ToolExecutionResult {
137    /// Create a successful result
138    pub fn success(value: impl Into<Value>) -> Self {
139        ToolExecutionResult::Success(value.into())
140    }
141
142    /// Create a successful result with pre-truncation raw output for VFS persistence.
143    /// The raw output is transferred to `ToolResult.raw_output` during `into_tool_result()`.
144    pub fn success_with_raw_output(value: impl Into<Value>, raw_output: String) -> Self {
145        let mut value = value.into();
146        // Embed raw output in a sidecar key — extracted in into_tool_result().
147        // Non-object values are wrapped in a scalar carrier so raw_output still
148        // flows through; the carrier is unwrapped on extraction.
149        match value.as_object_mut() {
150            Some(obj) => {
151                obj.insert("_raw_output".to_string(), Value::String(raw_output));
152            }
153            None => {
154                value = serde_json::json!({
155                    "_raw_output_scalar": value,
156                    "_raw_output": raw_output,
157                });
158            }
159        }
160        ToolExecutionResult::Success(value)
161    }
162
163    /// Create a successful result with images
164    pub fn success_with_images(value: impl Into<Value>, images: Vec<ToolResultImage>) -> Self {
165        ToolExecutionResult::SuccessWithImages {
166            result: value.into(),
167            images,
168        }
169    }
170
171    /// Create a tool-level error (safe to show to LLM)
172    pub fn tool_error(message: impl Into<String>) -> Self {
173        ToolExecutionResult::ToolError(message.into())
174    }
175
176    /// Create an internal error (will be hidden from LLM)
177    pub fn internal_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
178        ToolExecutionResult::InternalError(ToolInternalError::new(error))
179    }
180
181    /// Create an internal error from a string message
182    pub fn internal_error_msg(message: impl Into<String>) -> Self {
183        ToolExecutionResult::InternalError(ToolInternalError::from_message(message))
184    }
185
186    /// Signal that a user connection is required before this tool can execute.
187    pub fn connection_required(provider: impl Into<String>) -> Self {
188        ToolExecutionResult::ConnectionRequired {
189            provider: provider.into(),
190        }
191    }
192
193    /// Check if this is a successful result
194    pub fn is_success(&self) -> bool {
195        matches!(
196            self,
197            ToolExecutionResult::Success(_) | ToolExecutionResult::SuccessWithImages { .. }
198        )
199    }
200
201    /// Check if this is an error (either tool error or internal error)
202    pub fn is_error(&self) -> bool {
203        matches!(
204            self,
205            ToolExecutionResult::ToolError(_) | ToolExecutionResult::InternalError(_)
206        )
207    }
208
209    /// Check if this requires a user connection setup
210    pub fn is_connection_required(&self) -> bool {
211        matches!(self, ToolExecutionResult::ConnectionRequired { .. })
212    }
213
214    /// Convert to a ToolResult for the agent loop
215    ///
216    /// Both tool errors and internal errors are packaged as `{"error": "..."}` in the
217    /// result field. This provides a consistent contract where the result field always
218    /// contains the payload, and the agent loop continues the same way for all outcomes.
219    ///
220    /// Internal errors are logged but replaced with a generic message when returned.
221    pub fn into_tool_result(self, tool_call_id: &str, tool_name: &str) -> ToolResult {
222        match self {
223            ToolExecutionResult::Success(mut value) => {
224                // Extract sidecar raw output if present (from success_with_raw_output)
225                let raw_output = value
226                    .as_object_mut()
227                    .and_then(|obj| obj.remove("_raw_output"))
228                    .and_then(|v| v.as_str().map(|s| s.to_string()));
229                // Unwrap scalar carrier only when it matches the exact wrapper shape
230                // set by success_with_raw_output for non-object inputs.
231                let result_value = if let Some(obj) = value.as_object_mut() {
232                    let is_scalar_carrier = raw_output.is_some()
233                        && obj.len() == 1
234                        && obj.contains_key("_raw_output_scalar");
235                    if is_scalar_carrier {
236                        obj.remove("_raw_output_scalar").unwrap_or(Value::Null)
237                    } else {
238                        value
239                    }
240                } else {
241                    value
242                };
243                ToolResult {
244                    tool_call_id: tool_call_id.to_string(),
245                    result: Some(result_value),
246                    images: None,
247                    error: None,
248                    connection_required: None,
249                    raw_output,
250                }
251            }
252            ToolExecutionResult::SuccessWithImages { result, images } => ToolResult {
253                tool_call_id: tool_call_id.to_string(),
254                result: Some(result),
255                images: if images.is_empty() {
256                    None
257                } else {
258                    Some(images)
259                },
260                error: None,
261                connection_required: None,
262                raw_output: None,
263            },
264            ToolExecutionResult::ToolError(message) => ToolResult {
265                tool_call_id: tool_call_id.to_string(),
266                result: Some(serde_json::json!({ "error": &message })),
267                images: None,
268                error: Some(message),
269                connection_required: None,
270                raw_output: None,
271            },
272            ToolExecutionResult::InternalError(err) => {
273                // Log the full error details for debugging
274                error!(
275                    tool_name = %tool_name,
276                    tool_call_id = %tool_call_id,
277                    error = %err.message,
278                    error_chain = %err.chain_string(),
279                    "Tool internal error (details hidden from LLM)"
280                );
281
282                // Return generic error message to LLM, packaged as {"error": "..."}
283                let generic_msg = "An internal error occurred while executing the tool";
284                ToolResult {
285                    tool_call_id: tool_call_id.to_string(),
286                    result: Some(serde_json::json!({
287                        "error": generic_msg
288                    })),
289                    images: None,
290                    error: Some(generic_msg.to_string()),
291                    connection_required: None,
292                    raw_output: None,
293                }
294            }
295            ToolExecutionResult::ConnectionRequired { ref provider } => ToolResult {
296                tool_call_id: tool_call_id.to_string(),
297                result: Some(serde_json::json!({
298                    "connection_required": provider,
299                })),
300                images: None,
301                error: None,
302                connection_required: Some(provider.clone()),
303                raw_output: None,
304            },
305        }
306    }
307}
308
309/// Internal error details (logged but not exposed to LLM)
310#[derive(Debug)]
311pub struct ToolInternalError {
312    /// Error message for logging
313    pub message: String,
314    /// Optional source error
315    pub source: Option<Box<dyn std::error::Error + Send + Sync>>,
316}
317
318impl ToolInternalError {
319    /// Create from an error
320    pub fn new(error: impl std::error::Error + Send + Sync + 'static) -> Self {
321        Self {
322            message: error.to_string(),
323            source: Some(Box::new(error)),
324        }
325    }
326
327    /// Create from a string message
328    pub fn from_message(message: impl Into<String>) -> Self {
329        Self {
330            message: message.into(),
331            source: None,
332        }
333    }
334
335    pub fn chain_string(&self) -> String {
336        let mut parts = vec![self.message.clone()];
337        let mut current = <Self as std::error::Error>::source(self);
338        while let Some(source) = current {
339            let message = source.to_string();
340            if parts.last() != Some(&message) {
341                parts.push(message);
342            }
343            current = source.source();
344        }
345        parts.join(": ")
346    }
347}
348
349impl std::fmt::Display for ToolInternalError {
350    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
351        write!(f, "{}", self.message)
352    }
353}
354
355impl std::error::Error for ToolInternalError {
356    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
357        self.source
358            .as_ref()
359            .map(|e| e.as_ref() as &(dyn std::error::Error + 'static))
360    }
361}
362
363// ============================================================================
364// Tool Trait - Core Tool Abstraction
365// ============================================================================
366
367/// Trait for implementing tools that can be executed by the agent loop.
368///
369/// # Example
370///
371/// ```ignore
372/// use async_trait::async_trait;
373/// use serde_json::{json, Value};
374///
375/// struct GetCurrentTime;
376///
377/// #[async_trait]
378/// impl Tool for GetCurrentTime {
379///     fn name(&self) -> &str {
380///         "get_current_time"
381///     }
382///
383///     fn description(&self) -> &str {
384///         "Get the current date and time"
385///     }
386///
387///     fn parameters_schema(&self) -> Value {
388///         json!({
389///             "type": "object",
390///             "properties": {
391///                 "timezone": {
392///                     "type": "string",
393///                     "description": "Timezone (e.g., 'UTC', 'America/New_York')"
394///                 }
395///             }
396///         })
397///     }
398///
399///     async fn execute(&self, arguments: Value) -> ToolExecutionResult {
400///         let timezone = arguments.get("timezone")
401///             .and_then(|v| v.as_str())
402///             .unwrap_or("UTC");
403///
404///         ToolExecutionResult::success(json!({
405///             "current_time": chrono::Utc::now().to_rfc3339(),
406///             "timezone": timezone
407///         }))
408///     }
409/// }
410/// ```
411#[async_trait]
412pub trait Tool: Send + Sync {
413    /// Returns the tool's unique name.
414    ///
415    /// This name is used by the LLM to invoke the tool and must be unique
416    /// within a ToolRegistry.
417    fn name(&self) -> &str;
418
419    /// Returns a human-readable display name for UI rendering.
420    ///
421    /// This name is shown to users in the UI instead of the technical tool name.
422    /// For example, "Get Current Time" instead of "get_current_time".
423    /// Returns None if no display name is set, in which case the UI may
424    /// fall back to the technical name.
425    fn display_name(&self) -> Option<&str> {
426        None
427    }
428
429    /// Returns a description of what the tool does.
430    ///
431    /// This description is provided to the LLM to help it understand
432    /// when and how to use the tool.
433    fn description(&self) -> &str;
434
435    /// Returns the JSON schema for the tool's parameters.
436    ///
437    /// This schema follows the JSON Schema specification and describes
438    /// the expected arguments for the tool. The LLM uses this to
439    /// generate valid tool calls.
440    fn parameters_schema(&self) -> Value;
441
442    /// Execute the tool with the given arguments.
443    ///
444    /// # Arguments
445    ///
446    /// * `arguments` - The arguments passed to the tool as a JSON value.
447    ///   These should conform to the schema returned by `parameters_schema()`.
448    ///
449    /// # Returns
450    ///
451    /// A `ToolExecutionResult` indicating success, tool error, or internal error.
452    async fn execute(&self, arguments: Value) -> ToolExecutionResult;
453
454    /// Execute the tool with context.
455    ///
456    /// This method provides access to runtime context like session ID and
457    /// optional stores (file store, etc.). Override this method for tools
458    /// that need access to session context or external resources.
459    ///
460    /// The default implementation simply calls `execute()`, ignoring the context.
461    ///
462    /// # Arguments
463    ///
464    /// * `arguments` - The arguments passed to the tool as a JSON value.
465    /// * `context` - Runtime context containing session ID and optional stores.
466    ///
467    /// # Returns
468    ///
469    /// A `ToolExecutionResult` indicating success, tool error, or internal error.
470    async fn execute_with_context(
471        &self,
472        arguments: Value,
473        _context: &ToolContext,
474    ) -> ToolExecutionResult {
475        // Default: delegate to execute(), ignoring context
476        self.execute(arguments).await
477    }
478
479    /// Returns true if this tool requires context for execution.
480    ///
481    /// Tools that need session context (like filesystem tools) should
482    /// override this to return true.
483    fn requires_context(&self) -> bool {
484        false
485    }
486
487    /// Returns the tool policy (auto or requires_approval).
488    ///
489    /// Default is `Auto` which means the tool executes immediately.
490    /// Override to return `RequiresApproval` for sensitive operations.
491    fn policy(&self) -> ToolPolicy {
492        ToolPolicy::Auto
493    }
494
495    /// Returns semantic hints describing the tool's behavioral properties.
496    ///
497    /// Override to provide hints like readonly, destructive, idempotent, etc.
498    /// Default is empty (all hints unspecified).
499    fn hints(&self) -> ToolHints {
500        ToolHints::default()
501    }
502
503    /// Returns native background execution support when this tool opts into
504    /// detached execution via `hints().supports_background`.
505    fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
506        None
507    }
508
509    /// Convert this tool to a ToolDefinition for the agent config.
510    ///
511    /// This is used by ToolRegistry to generate tool definitions
512    /// for the LLM provider.
513    fn to_definition(&self) -> ToolDefinition {
514        ToolDefinition::Builtin(BuiltinTool {
515            name: self.name().to_string(),
516            display_name: self.display_name().map(|s| s.to_string()),
517            description: self.description().to_string(),
518            parameters: self.parameters_schema(),
519            policy: self.policy(),
520            category: None,
521            deferrable: DeferrablePolicy::default(),
522            hints: self.hints(),
523            full_parameters: None,
524        })
525    }
526}
527
528// ============================================================================
529// ToolRegistry - Collection of Tools
530// ============================================================================
531
532/// A registry that holds multiple tools and implements ToolExecutor.
533///
534/// ToolRegistry provides a convenient way to manage multiple tools and
535/// integrate them with the agent loop. It implements `ToolExecutor` so
536/// it can be used directly with `AgentLoop`.
537///
538/// # Example
539///
540/// ```ignore
541/// use everruns_core::tools::{Tool, ToolRegistry};
542///
543/// // Create registry and add tools
544/// let mut registry = ToolRegistry::new();
545/// registry.register(Box::new(GetCurrentTime));
546/// registry.register(Box::new(GetWeather));
547///
548/// // Get tool definitions for agent config
549/// let definitions = registry.tool_definitions();
550///
551/// // Use with agent loop
552/// let agent_loop = AgentLoop::new(config, emitter, store, llm, registry);
553/// ```
554#[derive(Default, Clone)]
555pub struct ToolRegistry {
556    tools: HashMap<String, Arc<dyn Tool>>,
557}
558
559impl ToolRegistry {
560    /// Create a new empty tool registry
561    pub fn new() -> Self {
562        Self {
563            tools: HashMap::new(),
564        }
565    }
566
567    /// Create a tool registry with default built-in tools.
568    ///
569    /// This includes:
570    /// - `get_current_time`: Returns the current date and time
571    /// - `echo`: Echoes back the provided message
572    /// - `report_progress`: Emits deterministic external progress updates
573    /// - TestMath tools: add, subtract, multiply, divide
574    /// - TestWeather tools: get_weather, get_forecast
575    /// - TaskList tools: write_todos
576    /// - FileSystem tools: read_file, write_file, edit_file, list_directory, grep_files, delete_file, stat_file
577    /// - WebFetch tools: web_fetch
578    pub fn with_defaults() -> Self {
579        use crate::capabilities::{
580            AddTool, DeleteFileTool, DivideTool, EditFileTool, GetCurrentTimeTool, GetForecastTool,
581            GetWeatherTool, GrepFilesTool, ListDirectoryTool, MultiplyTool, ReadFileTool,
582            StatFileTool, SubtractTool, WebFetchTool, WriteFileTool, WriteTodosTool,
583        };
584        use crate::progress_reporting::ReportProgressTool;
585
586        ToolRegistry::builder()
587            .tool(GetCurrentTimeTool)
588            .tool(EchoTool)
589            // NOTE: `spawn_background` is intentionally NOT a default tool —
590            // it is contributed by the `background_execution` capability,
591            // which is auto-activated by
592            // `collect_capabilities_with_configs` whenever a collected tool
593            // declares `ToolHints::supports_background = Some(true)`. Keeping
594            // it out of defaults preserves the lockstep contract between
595            // model-visible tools and the worker execution registry: the
596            // executor only knows about `spawn_background` when the model
597            // can also see it.
598            .tool(ReportProgressTool)
599            // TestMath capability tools
600            .tool(AddTool)
601            .tool(SubtractTool)
602            .tool(MultiplyTool)
603            .tool(DivideTool)
604            // TestWeather capability tools
605            .tool(GetWeatherTool)
606            .tool(GetForecastTool)
607            // TaskList capability tools
608            .tool(WriteTodosTool)
609            // FileSystem capability tools
610            .tool(ReadFileTool)
611            .tool(WriteFileTool)
612            .tool(EditFileTool)
613            .tool(ListDirectoryTool)
614            .tool(GrepFilesTool)
615            .tool(DeleteFileTool)
616            .tool(StatFileTool)
617            // WebFetch capability tools
618            .tool(WebFetchTool::default())
619            .build()
620    }
621
622    /// Register a tool with the registry.
623    ///
624    /// If a tool with the same name already exists, it will be replaced.
625    pub fn register(&mut self, tool: impl Tool + 'static) {
626        self.tools.insert(tool.name().to_string(), Arc::new(tool));
627    }
628
629    /// Register a boxed tool
630    pub fn register_boxed(&mut self, tool: Box<dyn Tool>) {
631        self.tools.insert(tool.name().to_string(), Arc::from(tool));
632    }
633
634    /// Register an Arc-wrapped tool
635    pub fn register_arc(&mut self, tool: Arc<dyn Tool>) {
636        self.tools.insert(tool.name().to_string(), tool);
637    }
638
639    /// Get a tool by name
640    pub fn get(&self, name: &str) -> Option<&Arc<dyn Tool>> {
641        self.tools.get(name)
642    }
643
644    /// Check if a tool is registered
645    pub fn has(&self, name: &str) -> bool {
646        self.tools.contains_key(name)
647    }
648
649    /// Get the number of registered tools
650    pub fn len(&self) -> usize {
651        self.tools.len()
652    }
653
654    /// Check if the registry is empty
655    pub fn is_empty(&self) -> bool {
656        self.tools.is_empty()
657    }
658
659    /// Get all tool names
660    pub fn tool_names(&self) -> Vec<&str> {
661        self.tools.keys().map(|s| s.as_str()).collect()
662    }
663
664    /// Get tool definitions for use in RuntimeAgent.
665    ///
666    /// Returns a Vec of ToolDefinition that can be passed to
667    /// `RuntimeAgent::with_tools()`.
668    pub fn tool_definitions(&self) -> Vec<ToolDefinition> {
669        self.tools.values().map(|t| t.to_definition()).collect()
670    }
671
672    /// Remove a tool from the registry
673    pub fn unregister(&mut self, name: &str) -> Option<Arc<dyn Tool>> {
674        self.tools.remove(name)
675    }
676
677    /// Clear all tools from the registry
678    pub fn clear(&mut self) {
679        self.tools.clear();
680    }
681
682    /// Create a builder for fluent tool registration
683    pub fn builder() -> ToolRegistryBuilder {
684        ToolRegistryBuilder::new()
685    }
686}
687
688impl std::fmt::Debug for ToolRegistry {
689    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
690        f.debug_struct("ToolRegistry")
691            .field("tools", &self.tool_names())
692            .finish()
693    }
694}
695
696#[async_trait]
697impl ToolExecutor for ToolRegistry {
698    async fn execute(
699        &self,
700        tool_call: &ToolCall,
701        _tool_def: &ToolDefinition,
702    ) -> Result<ToolResult> {
703        let tool = self.tools.get(&tool_call.name).ok_or_else(|| {
704            crate::error::AgentLoopError::tool(format!("Tool not found: {}", tool_call.name))
705        })?;
706
707        let result = tool.execute(tool_call.arguments.clone()).await;
708        Ok(result.into_tool_result(&tool_call.id, &tool_call.name))
709    }
710
711    async fn execute_with_context(
712        &self,
713        tool_call: &ToolCall,
714        _tool_def: &ToolDefinition,
715        context: &ToolContext,
716    ) -> Result<ToolResult> {
717        let tool = self.tools.get(&tool_call.name).ok_or_else(|| {
718            crate::error::AgentLoopError::tool(format!("Tool not found: {}", tool_call.name))
719        })?;
720
721        // Use execute_with_context for all tools - context-aware tools will use it,
722        // regular tools will delegate to execute() via the default implementation
723        let result = tool
724            .execute_with_context(tool_call.arguments.clone(), context)
725            .await;
726        Ok(result.into_tool_result(&tool_call.id, &tool_call.name))
727    }
728}
729
730// ============================================================================
731// ToolRegistryBuilder - Fluent API for Building Registry
732// ============================================================================
733
734/// Builder for creating a ToolRegistry with a fluent API.
735///
736/// # Example
737///
738/// ```ignore
739/// let registry = ToolRegistry::builder()
740///     .tool(GetCurrentTime)
741///     .tool(GetWeather)
742///     .build();
743/// ```
744pub struct ToolRegistryBuilder {
745    registry: ToolRegistry,
746}
747
748impl ToolRegistryBuilder {
749    /// Create a new builder
750    pub fn new() -> Self {
751        Self {
752            registry: ToolRegistry::new(),
753        }
754    }
755
756    /// Add a tool to the registry
757    pub fn tool(mut self, tool: impl Tool + 'static) -> Self {
758        self.registry.register(tool);
759        self
760    }
761
762    /// Add a boxed tool to the registry
763    pub fn tool_boxed(mut self, tool: Box<dyn Tool>) -> Self {
764        self.registry.register_boxed(tool);
765        self
766    }
767
768    /// Add an Arc-wrapped tool to the registry
769    pub fn tool_arc(mut self, tool: Arc<dyn Tool>) -> Self {
770        self.registry.register_arc(tool);
771        self
772    }
773
774    /// Build the registry
775    pub fn build(self) -> ToolRegistry {
776        self.registry
777    }
778}
779
780impl Default for ToolRegistryBuilder {
781    fn default() -> Self {
782        Self::new()
783    }
784}
785
786// ============================================================================
787// Built-in Tools
788// ============================================================================
789
790/// A tool that echoes back its arguments (useful for testing)
791pub struct EchoTool;
792
793#[async_trait]
794impl Tool for EchoTool {
795    fn name(&self) -> &str {
796        "echo"
797    }
798
799    fn display_name(&self) -> Option<&str> {
800        Some("Echo")
801    }
802
803    fn description(&self) -> &str {
804        "Echo back the provided message. Useful for testing tool execution."
805    }
806
807    fn parameters_schema(&self) -> Value {
808        serde_json::json!({
809            "type": "object",
810            "properties": {
811                "message": {
812                    "type": "string",
813                    "description": "The message to echo back"
814                }
815            },
816            "required": ["message"],
817            "additionalProperties": false
818        })
819    }
820
821    fn hints(&self) -> ToolHints {
822        ToolHints::default()
823            .with_readonly(true)
824            .with_idempotent(true)
825    }
826
827    async fn execute(&self, arguments: Value) -> ToolExecutionResult {
828        let message = arguments
829            .get("message")
830            .and_then(|v| v.as_str())
831            .unwrap_or("");
832
833        ToolExecutionResult::success(serde_json::json!({
834            "echoed": message,
835            "length": message.len()
836        }))
837    }
838}
839
840/// Spawn a background-capable tool and return immediately with a run handle.
841pub struct SpawnBackgroundTool;
842
843#[derive(Debug, Clone)]
844struct BackgroundScheduleRequest {
845    cron_expression: Option<String>,
846    scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
847    timezone: String,
848}
849
850fn parse_background_schedule(
851    arguments: &Value,
852) -> std::result::Result<Option<BackgroundScheduleRequest>, String> {
853    let Some(schedule) = arguments.get("schedule") else {
854        return Ok(None);
855    };
856    let Some(schedule) = schedule.as_object() else {
857        return Err("schedule must be an object".to_string());
858    };
859
860    let cron_expression = schedule
861        .get("cron_expression")
862        .and_then(Value::as_str)
863        .map(str::trim)
864        .filter(|value| !value.is_empty())
865        .map(ToString::to_string);
866    let scheduled_at = match schedule.get("scheduled_at").and_then(Value::as_str) {
867        Some(value) => {
868            let value = value.trim();
869            if value.is_empty() {
870                None
871            } else {
872                Some(
873                    chrono::DateTime::parse_from_rfc3339(value)
874                        .map_err(|_| "scheduled_at must be RFC3339".to_string())?
875                        .with_timezone(&chrono::Utc),
876                )
877            }
878        }
879        None => None,
880    };
881
882    match (cron_expression.is_some(), scheduled_at.is_some()) {
883        (false, false) => {
884            return Err(
885                "schedule must include exactly one of cron_expression (recurring) or scheduled_at (one-shot)"
886                    .to_string(),
887            );
888        }
889        (true, true) => {
890            return Err(
891                "schedule must not include both cron_expression and scheduled_at; provide exactly one"
892                    .to_string(),
893            );
894        }
895        _ => {}
896    }
897
898    let timezone = schedule
899        .get("timezone")
900        .and_then(Value::as_str)
901        .map(str::trim)
902        .filter(|value| !value.is_empty())
903        .unwrap_or("UTC")
904        .to_string();
905
906    Ok(Some(BackgroundScheduleRequest {
907        cron_expression,
908        scheduled_at,
909        timezone,
910    }))
911}
912
913fn build_background_schedule_description(
914    tool_name: &str,
915    tool_args: &Value,
916    title: &str,
917    signal_on_completion: bool,
918) -> String {
919    let payload = json!({
920        "tool": tool_name,
921        "title": title,
922        "signal_on_completion": signal_on_completion,
923        "args": tool_args,
924    });
925    let payload_json =
926        serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string());
927
928    format!(
929        "Monitor: {title}\n\n\
930This scheduled monitor fired. Start the background run now.\n\n\
931spawn_background payload:\n{payload_json}"
932    )
933}
934
935#[async_trait]
936impl Tool for SpawnBackgroundTool {
937    fn name(&self) -> &str {
938        "spawn_background"
939    }
940
941    fn display_name(&self) -> Option<&str> {
942        Some("Spawn Background")
943    }
944
945    fn description(&self) -> &str {
946        "Run a background-capable built-in tool asynchronously. Returns immediately and signals the session when the background run completes."
947    }
948
949    fn parameters_schema(&self) -> Value {
950        json!({
951            "type": "object",
952            "properties": {
953                "tool": {
954                    "type": "string",
955                    "description": "Name of the built-in tool to execute in the background"
956                },
957                "args": {
958                    "type": "object",
959                    "description": "Arguments to pass to the target tool"
960                },
961                "title": {
962                    "type": "string",
963                    "description": "Optional human-readable label for the background run"
964                },
965                "schedule": {
966                    "type": "object",
967                    "description": "Optional session schedule. When provided, this creates a scheduled monitor instead of starting the run immediately.",
968                    "properties": {
969                        "cron_expression": {
970                            "type": "string",
971                            "description": "Standard 5-field cron expression for recurring runs (e.g. '*/10 * * * *' for every 10 minutes)"
972                        },
973                        "scheduled_at": {
974                            "type": "string",
975                            "description": "ISO 8601 datetime for a one-shot run (e.g. '2026-04-16T15:30:00Z')"
976                        },
977                        "timezone": {
978                            "type": "string",
979                            "description": "IANA timezone for the schedule. Default: UTC"
980                        }
981                    },
982                    "additionalProperties": false
983                },
984                "signal_on_completion": {
985                    "type": "boolean",
986                    "description": "Send a synthetic user message back to the session when the run completes",
987                    "default": true
988                }
989            },
990            "required": ["tool", "args"],
991            "additionalProperties": false
992        })
993    }
994
995    async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
996        ToolExecutionResult::tool_error(
997            "spawn_background requires context. This tool must be executed with session context.",
998        )
999    }
1000
1001    async fn execute_with_context(
1002        &self,
1003        arguments: Value,
1004        context: &ToolContext,
1005    ) -> ToolExecutionResult {
1006        let tool_name = match arguments.get("tool").and_then(|v| v.as_str()) {
1007            Some(name) if !name.trim().is_empty() => name.trim(),
1008            _ => return ToolExecutionResult::tool_error("Missing required parameter: tool"),
1009        };
1010        let tool_args = match arguments.get("args") {
1011            Some(args) if args.is_object() => args.clone(),
1012            _ => {
1013                return ToolExecutionResult::tool_error(
1014                    "Missing required parameter: args (object expected)",
1015                );
1016            }
1017        };
1018        let signal_on_completion = arguments
1019            .get("signal_on_completion")
1020            .and_then(|v| v.as_bool())
1021            .unwrap_or(true);
1022        let schedule_request = match parse_background_schedule(&arguments) {
1023            Ok(schedule) => schedule,
1024            Err(message) => return ToolExecutionResult::tool_error(message),
1025        };
1026
1027        let Some(tool_registry) = &context.tool_registry else {
1028            return ToolExecutionResult::tool_error(
1029                "Tool registry not available in this context. spawn_background requires worker-side tool execution.",
1030            );
1031        };
1032
1033        let Some(tool) = tool_registry.get(tool_name).cloned() else {
1034            return ToolExecutionResult::tool_error(format!("Unknown tool: {tool_name}"));
1035        };
1036        if tool_name == self.name() {
1037            return ToolExecutionResult::tool_error(
1038                "spawn_background cannot target itself recursively",
1039            );
1040        }
1041        if tool.hints().supports_background != Some(true) {
1042            return ToolExecutionResult::tool_error(format!(
1043                "Tool does not support background execution: {tool_name}"
1044            ));
1045        }
1046        if tool.as_background_executable().is_none() {
1047            return ToolExecutionResult::tool_error(format!(
1048                "Tool declared background support but has no background executor: {tool_name}"
1049            ));
1050        }
1051        let title = arguments
1052            .get("title")
1053            .and_then(|v| v.as_str())
1054            .map(str::trim)
1055            .filter(|s| !s.is_empty())
1056            .map(|s| s.to_string())
1057            .unwrap_or_else(|| {
1058                tool.display_name()
1059                    .map(ToString::to_string)
1060                    .unwrap_or_else(|| format!("Background {tool_name}"))
1061            });
1062
1063        if let Some(schedule_request) = schedule_request {
1064            let Some(schedule_store) = &context.schedule_store else {
1065                return ToolExecutionResult::tool_error(
1066                    "Schedule store not available in this context. Scheduled monitors require session schedules.",
1067                );
1068            };
1069
1070            match schedule_store
1071                .count_active_schedules(context.session_id)
1072                .await
1073            {
1074                Ok(count) if count >= MAX_ACTIVE_SCHEDULES_PER_SESSION => {
1075                    return ToolExecutionResult::tool_error(format!(
1076                        "Maximum {MAX_ACTIVE_SCHEDULES_PER_SESSION} active schedules per session. Cancel an existing schedule first."
1077                    ));
1078                }
1079                Err(err) => return ToolExecutionResult::internal_error(err),
1080                _ => {}
1081            }
1082
1083            let description = build_background_schedule_description(
1084                tool_name,
1085                &tool_args,
1086                &title,
1087                signal_on_completion,
1088            );
1089
1090            return match schedule_store
1091                .create_schedule(
1092                    context.session_id,
1093                    description,
1094                    schedule_request.cron_expression.clone(),
1095                    schedule_request.scheduled_at,
1096                    schedule_request.timezone.clone(),
1097                )
1098                .await
1099            {
1100                Ok(schedule) => ToolExecutionResult::success(json!({
1101                    "created": true,
1102                    "status": "scheduled",
1103                    "title": title,
1104                    "tool": tool_name,
1105                    "signal_on_completion": signal_on_completion,
1106                    "schedule_id": schedule.id.to_string(),
1107                    "schedule_type": schedule.schedule_type,
1108                    "cron_expression": schedule.cron_expression,
1109                    "scheduled_at": schedule.scheduled_at,
1110                    "timezone": schedule.timezone,
1111                    "next_trigger_at": schedule.next_trigger_at,
1112                    "enabled": schedule.enabled
1113                })),
1114                Err(err) => ToolExecutionResult::internal_error(err),
1115            };
1116        }
1117
1118        let Some(resource_registry) = &context.session_resource_registry else {
1119            return ToolExecutionResult::tool_error(
1120                "Session resource registry not available in this context",
1121            );
1122        };
1123        if context.file_store.is_none() {
1124            return ToolExecutionResult::tool_error(
1125                "Session file store not available in this context. spawn_background requires artifact persistence.",
1126            );
1127        }
1128
1129        let background_run_permit = match ACTIVE_BACKGROUND_RUNS_PER_WORKER.try_acquire() {
1130            Ok(permit) => permit,
1131            Err(_) => {
1132                return ToolExecutionResult::tool_error(format!(
1133                    "Worker is already running the maximum {MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER} active background runs. Try again after an existing run finishes."
1134                ));
1135            }
1136        };
1137
1138        let session_run_permit = match session_background_semaphore(context.session_id)
1139            .try_acquire_owned()
1140        {
1141            Ok(permit) => permit,
1142            Err(_) => {
1143                return ToolExecutionResult::tool_error(format!(
1144                    "Maximum {MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION} active background runs per session. Wait for an existing run to finish before starting another."
1145                ));
1146            }
1147        };
1148
1149        let run_id = format!("bg_{}", uuid::Uuid::now_v7().simple());
1150        let artifact_dir = format!("/.background/{run_id}");
1151        let log_path = format!("{artifact_dir}/output.log");
1152        let result_path = format!("{artifact_dir}/result.json");
1153        let metadata = json!({
1154            "tool": tool_name,
1155            "status_text": "Queued",
1156            "signal_on_completion": signal_on_completion,
1157            "artifact_dir": artifact_dir,
1158            "log_path": log_path,
1159            "result_path": result_path,
1160        });
1161
1162        if let Err(e) = resource_registry
1163            .register(RegisterSessionResource {
1164                session_id: context.session_id,
1165                resource_id: run_id.clone(),
1166                kind: "background_run".to_string(),
1167                display_name: title.clone(),
1168                status: SessionResourceStatus::Active,
1169                metadata,
1170            })
1171            .await
1172        {
1173            return ToolExecutionResult::internal_error_msg(format!(
1174                "Failed to register background run: {e}"
1175            ));
1176        }
1177
1178        let background_context = context.clone().with_tool_registry(tool_registry.clone());
1179        let sink = Arc::new(SessionBackgroundSink::new(
1180            background_context.clone(),
1181            run_id.clone(),
1182            title.clone(),
1183            tool_name.to_string(),
1184            log_path.clone(),
1185            result_path.clone(),
1186            signal_on_completion,
1187        ));
1188        let run_id_for_task = run_id.clone();
1189        let tool_for_task = tool.clone();
1190        let tool_name_for_task = tool_name.to_string();
1191
1192        tokio::spawn(async move {
1193            let _background_run_permit = background_run_permit;
1194            let _session_run_permit = session_run_permit;
1195            let _ = sink.status("Starting").await;
1196            let outcome = match tool_for_task.as_background_executable() {
1197                Some(background_tool) => {
1198                    background_tool
1199                        .execute_background(tool_args, background_context, sink.clone())
1200                        .await
1201                }
1202                None => Err(ToolExecutionResult::tool_error(format!(
1203                    "Tool declared background support but has no background executor: {}",
1204                    tool_name_for_task
1205                ))),
1206            };
1207
1208            if let Err(err) = sink.finalize(outcome).await {
1209                tracing::warn!(
1210                    run_id = run_id_for_task,
1211                    error = %err,
1212                    "Background run finalization failed"
1213                );
1214            }
1215        });
1216
1217        ToolExecutionResult::success(json!({
1218            "run_id": run_id,
1219            "resource_id": run_id,
1220            "title": title,
1221            "tool": tool_name,
1222            "status": "running",
1223            "signal_on_completion": signal_on_completion,
1224            "artifact_dir": artifact_dir,
1225            "log_path": log_path,
1226            "result_path": result_path
1227        }))
1228    }
1229
1230    fn requires_context(&self) -> bool {
1231        true
1232    }
1233}
1234
1235#[derive(Debug, Default)]
1236struct SessionBackgroundState {
1237    status_text: String,
1238    progress: Option<BackgroundProgress>,
1239    output_tail: String,
1240    output_log: String,
1241    output_log_chars: usize,
1242    output_log_truncated: bool,
1243}
1244
1245const MAX_BACKGROUND_OUTPUT_LOG_CHARS: usize = 256 * 1024;
1246
1247struct SessionBackgroundSink {
1248    context: ToolContext,
1249    run_id: String,
1250    display_name: String,
1251    tool_name: String,
1252    log_path: String,
1253    result_path: String,
1254    signal_on_completion: bool,
1255    state: tokio::sync::Mutex<SessionBackgroundState>,
1256}
1257
1258impl SessionBackgroundSink {
1259    fn new(
1260        context: ToolContext,
1261        run_id: String,
1262        display_name: String,
1263        tool_name: String,
1264        log_path: String,
1265        result_path: String,
1266        signal_on_completion: bool,
1267    ) -> Self {
1268        Self {
1269            context,
1270            run_id,
1271            display_name,
1272            tool_name,
1273            log_path,
1274            result_path,
1275            signal_on_completion,
1276            state: tokio::sync::Mutex::new(SessionBackgroundState {
1277                status_text: "Queued".to_string(),
1278                ..Default::default()
1279            }),
1280        }
1281    }
1282
1283    async fn finalize(
1284        &self,
1285        outcome: std::result::Result<BackgroundOutcome, ToolExecutionResult>,
1286    ) -> Result<()> {
1287        match outcome {
1288            Ok(outcome) => {
1289                let output_log = if let Some(raw_output) = &outcome.raw_output {
1290                    raw_output.clone()
1291                } else {
1292                    let state = self.state.lock().await;
1293                    Self::final_output_log(&state)
1294                };
1295                self.write_text_file(&self.log_path, &output_log).await?;
1296                let result_json = serde_json::to_string_pretty(&outcome.result)
1297                    .unwrap_or_else(|_| outcome.result.to_string());
1298                self.write_text_file(&self.result_path, &result_json)
1299                    .await?;
1300
1301                let mut state = self.state.lock().await;
1302                state.status_text = "Completed".to_string();
1303                drop(state);
1304                self.update_resource(SessionResourceStatus::Completed, Some(&outcome.summary))
1305                    .await?;
1306                if self.signal_on_completion {
1307                    self.signal_session("completed", &outcome.summary).await?;
1308                }
1309            }
1310            Err(err) => {
1311                let message = match err {
1312                    ToolExecutionResult::ToolError(msg) => msg,
1313                    ToolExecutionResult::InternalError(inner) => inner.message,
1314                    ToolExecutionResult::ConnectionRequired { provider } => {
1315                        format!("Background tool requires connection setup: {provider}")
1316                    }
1317                    ToolExecutionResult::Success(_)
1318                    | ToolExecutionResult::SuccessWithImages { .. } => {
1319                        "Background run ended unexpectedly".to_string()
1320                    }
1321                };
1322                let output_log = {
1323                    let state = self.state.lock().await;
1324                    Self::final_output_log(&state)
1325                };
1326                self.write_text_file(&self.log_path, &output_log).await?;
1327                let error_json = serde_json::to_string_pretty(&json!({
1328                    "status": "failed",
1329                    "error": &message,
1330                }))
1331                .unwrap_or_else(|_| {
1332                    json!({
1333                        "status": "failed",
1334                        "error": &message,
1335                    })
1336                    .to_string()
1337                });
1338                self.write_text_file(&self.result_path, &error_json).await?;
1339                let mut state = self.state.lock().await;
1340                state.status_text = "Failed".to_string();
1341                drop(state);
1342                self.update_resource(SessionResourceStatus::Failed, Some(&message))
1343                    .await?;
1344                if self.signal_on_completion {
1345                    self.signal_session("failed", &message).await?;
1346                }
1347            }
1348        }
1349
1350        Ok(())
1351    }
1352
1353    async fn signal_session(&self, status: &str, summary: &str) -> Result<()> {
1354        let Some(platform_store) = &self.context.platform_store else {
1355            return Ok(());
1356        };
1357        let message = format!(
1358            "Background run {status}.\n- run_id: {}\n- title: {}\n- tool: {}\n- summary: {}\n- result_path: {}\n- log_path: {}",
1359            self.run_id,
1360            self.display_name,
1361            self.tool_name,
1362            summary,
1363            self.result_path,
1364            self.log_path
1365        );
1366        platform_store
1367            .send_message(self.context.session_id, &message)
1368            .await
1369    }
1370
1371    async fn update_resource(
1372        &self,
1373        status: SessionResourceStatus,
1374        summary: Option<&str>,
1375    ) -> Result<()> {
1376        let Some(registry) = &self.context.session_resource_registry else {
1377            return Ok(());
1378        };
1379        let state = self.state.lock().await;
1380        let status_text = state.status_text.clone();
1381        let progress = state.progress.clone();
1382        let output_tail = state.output_tail.clone();
1383        drop(state);
1384        registry
1385            .register(RegisterSessionResource {
1386                session_id: self.context.session_id,
1387                resource_id: self.run_id.clone(),
1388                kind: "background_run".to_string(),
1389                display_name: self.display_name.clone(),
1390                status,
1391                metadata: json!({
1392                    "tool": self.tool_name,
1393                    "status_text": status_text,
1394                    "progress": progress,
1395                    "output_tail": output_tail,
1396                    "log_path": self.log_path,
1397                    "result_path": self.result_path,
1398                    "summary": summary,
1399                    "signal_on_completion": self.signal_on_completion,
1400                }),
1401            })
1402            .await?;
1403        Ok(())
1404    }
1405
1406    async fn write_text_file(&self, path: &str, content: &str) -> Result<()> {
1407        let file_store = self.context.file_store.as_ref().ok_or_else(|| {
1408            anyhow::anyhow!(
1409                "background run {} cannot persist artifact {} because no session file store is configured",
1410                self.run_id,
1411                path
1412            )
1413        })?;
1414
1415        ensure_directory(file_store.as_ref(), self.context.session_id, "/.background").await?;
1416        let run_dir = format!("/.background/{}", self.run_id);
1417        ensure_directory(file_store.as_ref(), self.context.session_id, &run_dir).await?;
1418        file_store
1419            .write_file(self.context.session_id, path, content, "text")
1420            .await?;
1421        Ok(())
1422    }
1423}
1424
1425#[async_trait]
1426impl BackgroundEventSink for SessionBackgroundSink {
1427    async fn status(&self, message: &str) -> Result<()> {
1428        let mut state = self.state.lock().await;
1429        state.status_text = message.to_string();
1430        drop(state);
1431        self.update_resource(SessionResourceStatus::Active, None)
1432            .await
1433    }
1434
1435    async fn output(&self, stream: &str, delta: &str) -> Result<()> {
1436        let mut state = self.state.lock().await;
1437        if !delta.is_empty() {
1438            let prefix = format!("[{stream}] ");
1439            state.output_tail.push_str(&prefix);
1440            state.output_tail.push_str(delta);
1441            Self::append_to_output_log(&mut state, &prefix, delta);
1442            if state.output_tail.chars().count() > 2048 {
1443                state.output_tail = state
1444                    .output_tail
1445                    .chars()
1446                    .rev()
1447                    .take(2048)
1448                    .collect::<Vec<_>>()
1449                    .into_iter()
1450                    .rev()
1451                    .collect();
1452            }
1453        }
1454        drop(state);
1455        self.update_resource(SessionResourceStatus::Active, None)
1456            .await
1457    }
1458
1459    async fn progress(&self, progress: BackgroundProgress) -> Result<()> {
1460        let mut state = self.state.lock().await;
1461        state.progress = Some(progress);
1462        drop(state);
1463        self.update_resource(SessionResourceStatus::Active, None)
1464            .await
1465    }
1466}
1467
1468impl SessionBackgroundSink {
1469    fn append_to_output_log(state: &mut SessionBackgroundState, prefix: &str, delta: &str) {
1470        if state.output_log_chars >= MAX_BACKGROUND_OUTPUT_LOG_CHARS {
1471            state.output_log_truncated = true;
1472            return;
1473        }
1474
1475        let chunk = format!("{prefix}{delta}");
1476        let remaining = MAX_BACKGROUND_OUTPUT_LOG_CHARS - state.output_log_chars;
1477        let chunk_chars = chunk.chars().count();
1478
1479        if chunk_chars <= remaining {
1480            state.output_log.push_str(&chunk);
1481            state.output_log_chars += chunk_chars;
1482            return;
1483        }
1484
1485        let truncated_chunk: String = chunk.chars().take(remaining).collect();
1486        state.output_log.push_str(&truncated_chunk);
1487        state.output_log_chars += truncated_chunk.chars().count();
1488        state.output_log_truncated = true;
1489    }
1490
1491    fn final_output_log(state: &SessionBackgroundState) -> String {
1492        if !state.output_log_truncated {
1493            return state.output_log.clone();
1494        }
1495
1496        format!(
1497            "{}\n[system] background output truncated at {} characters\n",
1498            state.output_log, MAX_BACKGROUND_OUTPUT_LOG_CHARS
1499        )
1500    }
1501}
1502
1503async fn ensure_directory(
1504    file_store: &dyn crate::traits::SessionFileSystem,
1505    session_id: crate::SessionId,
1506    path: &str,
1507) -> Result<()> {
1508    if let Some(entry) = file_store.stat_file(session_id, path).await? {
1509        if entry.is_directory {
1510            return Ok(());
1511        }
1512        return Err(anyhow::anyhow!("path exists but is not a directory: {path}").into());
1513    }
1514    let _ = file_store.create_directory(session_id, path).await?;
1515    Ok(())
1516}
1517
1518/// A tool that always fails (useful for testing error handling)
1519pub struct FailingTool {
1520    error_message: String,
1521    use_internal_error: bool,
1522}
1523
1524impl FailingTool {
1525    /// Create a failing tool with a tool-level error
1526    pub fn with_tool_error(message: impl Into<String>) -> Self {
1527        Self {
1528            error_message: message.into(),
1529            use_internal_error: false,
1530        }
1531    }
1532
1533    /// Create a failing tool with an internal error
1534    pub fn with_internal_error(message: impl Into<String>) -> Self {
1535        Self {
1536            error_message: message.into(),
1537            use_internal_error: true,
1538        }
1539    }
1540}
1541
1542impl Default for FailingTool {
1543    fn default() -> Self {
1544        Self::with_tool_error("Tool execution failed")
1545    }
1546}
1547
1548#[async_trait]
1549impl Tool for FailingTool {
1550    fn name(&self) -> &str {
1551        "failing_tool"
1552    }
1553
1554    fn display_name(&self) -> Option<&str> {
1555        Some("Failing Tool")
1556    }
1557
1558    fn description(&self) -> &str {
1559        "A tool that always fails (for testing error handling)"
1560    }
1561
1562    fn parameters_schema(&self) -> Value {
1563        serde_json::json!({
1564            "type": "object",
1565            "properties": {},
1566            "additionalProperties": false
1567        })
1568    }
1569
1570    fn hints(&self) -> ToolHints {
1571        ToolHints::default()
1572            .with_readonly(true)
1573            .with_idempotent(true)
1574    }
1575
1576    async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1577        if self.use_internal_error {
1578            ToolExecutionResult::internal_error_msg(&self.error_message)
1579        } else {
1580            ToolExecutionResult::tool_error(&self.error_message)
1581        }
1582    }
1583}
1584
1585// ============================================================================
1586// Tests
1587// ============================================================================
1588
1589#[cfg(test)]
1590mod tests {
1591    use super::*;
1592    use crate::capabilities::GetCurrentTimeTool;
1593    use crate::platform_store::PlatformStore;
1594    use crate::session_file::{FileInfo, FileStat, SessionFile};
1595    use crate::session_resource::{SessionResourceEntry, SessionResourceFilter};
1596    use crate::traits::{SessionFileSystem, SessionResourceRegistry, SessionScheduleStore};
1597    use crate::typed_id::{HarnessId, SessionId};
1598    use crate::{AgentId, KeyInfo, PlatformMessage, SecretInfo};
1599    use async_trait::async_trait;
1600    use std::sync::{
1601        Arc as StdArc, Mutex,
1602        atomic::{AtomicBool, Ordering},
1603    };
1604
1605    #[derive(Default)]
1606    struct TestBackgroundTool;
1607
1608    #[async_trait]
1609    impl BackgroundExecutableTool for TestBackgroundTool {
1610        async fn execute_background(
1611            &self,
1612            arguments: Value,
1613            _context: ToolContext,
1614            sink: Arc<dyn BackgroundEventSink>,
1615        ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
1616            sink.status("Waiting for test result")
1617                .await
1618                .map_err(ToolExecutionResult::internal_error)?;
1619            sink.output("stdout", "hello from background")
1620                .await
1621                .map_err(ToolExecutionResult::internal_error)?;
1622            sink.progress(BackgroundProgress {
1623                current: Some(1),
1624                total: Some(1),
1625                unit: Some("step".to_string()),
1626                label: Some("done".to_string()),
1627            })
1628            .await
1629            .map_err(ToolExecutionResult::internal_error)?;
1630
1631            Ok(BackgroundOutcome {
1632                summary: arguments["summary"].as_str().unwrap_or("done").to_string(),
1633                result: json!({"ok": true}),
1634                raw_output: None,
1635            })
1636        }
1637    }
1638
1639    #[async_trait]
1640    impl Tool for TestBackgroundTool {
1641        fn name(&self) -> &str {
1642            "test_background"
1643        }
1644
1645        fn display_name(&self) -> Option<&str> {
1646            Some("Test Background")
1647        }
1648
1649        fn description(&self) -> &str {
1650            "test tool"
1651        }
1652
1653        fn parameters_schema(&self) -> Value {
1654            json!({
1655                "type": "object",
1656                "properties": {
1657                    "summary": { "type": "string" }
1658                }
1659            })
1660        }
1661
1662        async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1663            ToolExecutionResult::tool_error("foreground unsupported")
1664        }
1665
1666        fn hints(&self) -> ToolHints {
1667            ToolHints::default().with_supports_background(true)
1668        }
1669
1670        fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
1671            Some(self)
1672        }
1673    }
1674
1675    #[derive(Default)]
1676    struct TestFailingBackgroundTool;
1677
1678    #[async_trait]
1679    impl BackgroundExecutableTool for TestFailingBackgroundTool {
1680        async fn execute_background(
1681            &self,
1682            _arguments: Value,
1683            _context: ToolContext,
1684            sink: Arc<dyn BackgroundEventSink>,
1685        ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
1686            sink.status("Running failing test")
1687                .await
1688                .map_err(ToolExecutionResult::internal_error)?;
1689            sink.output("stderr", "background failed")
1690                .await
1691                .map_err(ToolExecutionResult::internal_error)?;
1692            Err(ToolExecutionResult::tool_error("boom"))
1693        }
1694    }
1695
1696    #[async_trait]
1697    impl Tool for TestFailingBackgroundTool {
1698        fn name(&self) -> &str {
1699            "test_background_fail"
1700        }
1701
1702        fn display_name(&self) -> Option<&str> {
1703            Some("Test Background Fail")
1704        }
1705
1706        fn description(&self) -> &str {
1707            "failing background test tool"
1708        }
1709
1710        fn parameters_schema(&self) -> Value {
1711            json!({
1712                "type": "object",
1713                "properties": {}
1714            })
1715        }
1716
1717        async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1718            ToolExecutionResult::tool_error("foreground unsupported")
1719        }
1720
1721        fn hints(&self) -> ToolHints {
1722            ToolHints::default().with_supports_background(true)
1723        }
1724
1725        fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
1726            Some(self)
1727        }
1728    }
1729
1730    #[derive(Default)]
1731    struct TestLargeOutputBackgroundTool;
1732
1733    #[async_trait]
1734    impl BackgroundExecutableTool for TestLargeOutputBackgroundTool {
1735        async fn execute_background(
1736            &self,
1737            _arguments: Value,
1738            _context: ToolContext,
1739            sink: Arc<dyn BackgroundEventSink>,
1740        ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
1741            let large_chunk = "x".repeat(MAX_BACKGROUND_OUTPUT_LOG_CHARS + 4096);
1742            sink.output("stdout", &large_chunk)
1743                .await
1744                .map_err(ToolExecutionResult::internal_error)?;
1745            Ok(BackgroundOutcome {
1746                summary: "large output complete".to_string(),
1747                result: json!({"ok": true}),
1748                raw_output: None,
1749            })
1750        }
1751    }
1752
1753    #[async_trait]
1754    impl Tool for TestLargeOutputBackgroundTool {
1755        fn name(&self) -> &str {
1756            "test_background_large_output"
1757        }
1758
1759        fn display_name(&self) -> Option<&str> {
1760            Some("Test Background Large Output")
1761        }
1762
1763        fn description(&self) -> &str {
1764            "background test tool with huge output"
1765        }
1766
1767        fn parameters_schema(&self) -> Value {
1768            json!({
1769                "type": "object",
1770                "properties": {}
1771            })
1772        }
1773
1774        async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1775            ToolExecutionResult::tool_error("foreground unsupported")
1776        }
1777
1778        fn hints(&self) -> ToolHints {
1779            ToolHints::default().with_supports_background(true)
1780        }
1781
1782        fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
1783            Some(self)
1784        }
1785    }
1786
1787    struct BlockingBackgroundTool {
1788        release: StdArc<AtomicBool>,
1789    }
1790
1791    #[async_trait]
1792    impl BackgroundExecutableTool for BlockingBackgroundTool {
1793        async fn execute_background(
1794            &self,
1795            _arguments: Value,
1796            _context: ToolContext,
1797            sink: Arc<dyn BackgroundEventSink>,
1798        ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
1799            sink.status("Blocking until released")
1800                .await
1801                .map_err(ToolExecutionResult::internal_error)?;
1802            while !self.release.load(Ordering::SeqCst) {
1803                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
1804            }
1805            Ok(BackgroundOutcome {
1806                summary: "released".to_string(),
1807                result: json!({"ok": true}),
1808                raw_output: None,
1809            })
1810        }
1811    }
1812
1813    #[async_trait]
1814    impl Tool for BlockingBackgroundTool {
1815        fn name(&self) -> &str {
1816            "test_background_blocking"
1817        }
1818
1819        fn display_name(&self) -> Option<&str> {
1820            Some("Test Background Blocking")
1821        }
1822
1823        fn description(&self) -> &str {
1824            "background test tool that waits for test release"
1825        }
1826
1827        fn parameters_schema(&self) -> Value {
1828            json!({
1829                "type": "object",
1830                "properties": {}
1831            })
1832        }
1833
1834        async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1835            ToolExecutionResult::tool_error("foreground unsupported")
1836        }
1837
1838        fn hints(&self) -> ToolHints {
1839            ToolHints::default().with_supports_background(true)
1840        }
1841
1842        fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
1843            Some(self)
1844        }
1845    }
1846
1847    #[derive(Default)]
1848    struct TestSessionResourceRegistry {
1849        entries: Mutex<HashMap<String, SessionResourceEntry>>,
1850    }
1851
1852    #[async_trait]
1853    impl crate::traits::SessionResourceRegistry for TestSessionResourceRegistry {
1854        async fn register(
1855            &self,
1856            entry: RegisterSessionResource,
1857        ) -> crate::Result<SessionResourceEntry> {
1858            let stored = SessionResourceEntry {
1859                resource_id: entry.resource_id.clone(),
1860                session_id: entry.session_id,
1861                kind: entry.kind,
1862                display_name: entry.display_name,
1863                status: entry.status,
1864                metadata: entry.metadata,
1865                created_at: chrono::Utc::now(),
1866                updated_at: chrono::Utc::now(),
1867            };
1868            self.entries
1869                .lock()
1870                .unwrap()
1871                .insert(entry.resource_id, stored.clone());
1872            Ok(stored)
1873        }
1874
1875        async fn update_status(
1876            &self,
1877            _session_id: SessionId,
1878            resource_id: &str,
1879            status: SessionResourceStatus,
1880        ) -> crate::Result<Option<SessionResourceEntry>> {
1881            let mut entries = self.entries.lock().unwrap();
1882            if let Some(entry) = entries.get_mut(resource_id) {
1883                entry.status = status;
1884                entry.updated_at = chrono::Utc::now();
1885                return Ok(Some(entry.clone()));
1886            }
1887            Ok(None)
1888        }
1889
1890        async fn get(
1891            &self,
1892            _session_id: SessionId,
1893            resource_id: &str,
1894        ) -> crate::Result<Option<SessionResourceEntry>> {
1895            Ok(self.entries.lock().unwrap().get(resource_id).cloned())
1896        }
1897
1898        async fn list(
1899            &self,
1900            session_id: SessionId,
1901            filter: Option<&SessionResourceFilter>,
1902        ) -> crate::Result<Vec<SessionResourceEntry>> {
1903            Ok(self
1904                .entries
1905                .lock()
1906                .unwrap()
1907                .values()
1908                .filter(|entry| entry.session_id == session_id)
1909                .filter(|entry| {
1910                    filter.is_none_or(|filter| {
1911                        filter.kind.as_ref().is_none_or(|kind| &entry.kind == kind)
1912                            && filter.status.is_none_or(|status| entry.status == status)
1913                    })
1914                })
1915                .cloned()
1916                .collect())
1917        }
1918
1919        async fn deregister(
1920            &self,
1921            _session_id: SessionId,
1922            resource_id: &str,
1923        ) -> crate::Result<bool> {
1924            Ok(self.entries.lock().unwrap().remove(resource_id).is_some())
1925        }
1926    }
1927
1928    #[derive(Default)]
1929    struct TestFileStore {
1930        files: Mutex<HashMap<String, SessionFile>>,
1931    }
1932
1933    #[async_trait]
1934    impl crate::traits::SessionFileSystem for TestFileStore {
1935        async fn read_file(
1936            &self,
1937            _session_id: SessionId,
1938            path: &str,
1939        ) -> crate::Result<Option<SessionFile>> {
1940            Ok(self.files.lock().unwrap().get(path).cloned())
1941        }
1942
1943        async fn write_file(
1944            &self,
1945            session_id: SessionId,
1946            path: &str,
1947            content: &str,
1948            encoding: &str,
1949        ) -> crate::Result<SessionFile> {
1950            let now = chrono::Utc::now();
1951            let file = SessionFile {
1952                id: uuid::Uuid::now_v7(),
1953                session_id: session_id.uuid(),
1954                path: path.to_string(),
1955                name: FileInfo::name_from_path(path),
1956                content: Some(content.to_string()),
1957                encoding: encoding.to_string(),
1958                is_directory: false,
1959                is_readonly: false,
1960                size_bytes: content.len() as i64,
1961                created_at: now,
1962                updated_at: now,
1963            };
1964            self.files
1965                .lock()
1966                .unwrap()
1967                .insert(path.to_string(), file.clone());
1968            Ok(file)
1969        }
1970
1971        async fn delete_file(
1972            &self,
1973            _session_id: SessionId,
1974            _path: &str,
1975            _recursive: bool,
1976        ) -> crate::Result<bool> {
1977            Ok(false)
1978        }
1979
1980        async fn list_directory(
1981            &self,
1982            _session_id: SessionId,
1983            _path: &str,
1984        ) -> crate::Result<Vec<FileInfo>> {
1985            Ok(Vec::new())
1986        }
1987
1988        async fn stat_file(
1989            &self,
1990            _session_id: SessionId,
1991            path: &str,
1992        ) -> crate::Result<Option<FileStat>> {
1993            let file = self.files.lock().unwrap().get(path).cloned();
1994            Ok(file.map(|entry| FileStat {
1995                path: entry.path,
1996                name: entry.name,
1997                is_directory: entry.is_directory,
1998                is_readonly: entry.is_readonly,
1999                size_bytes: entry.size_bytes,
2000                created_at: entry.created_at,
2001                updated_at: entry.updated_at,
2002            }))
2003        }
2004
2005        async fn grep_files(
2006            &self,
2007            _session_id: SessionId,
2008            _pattern: &str,
2009            _path_pattern: Option<&str>,
2010        ) -> crate::Result<Vec<crate::session_file::GrepMatch>> {
2011            Ok(Vec::new())
2012        }
2013
2014        async fn create_directory(
2015            &self,
2016            session_id: SessionId,
2017            path: &str,
2018        ) -> crate::Result<FileInfo> {
2019            let now = chrono::Utc::now();
2020            let id = uuid::Uuid::now_v7();
2021            let dir = SessionFile {
2022                id,
2023                session_id: session_id.uuid(),
2024                path: path.to_string(),
2025                name: FileInfo::name_from_path(path),
2026                content: None,
2027                encoding: "text".to_string(),
2028                is_directory: true,
2029                is_readonly: false,
2030                size_bytes: 0,
2031                created_at: now,
2032                updated_at: now,
2033            };
2034            self.files.lock().unwrap().insert(path.to_string(), dir);
2035            Ok(FileInfo {
2036                id,
2037                session_id: session_id.uuid(),
2038                path: path.to_string(),
2039                name: FileInfo::name_from_path(path),
2040                is_directory: true,
2041                is_readonly: false,
2042                size_bytes: 0,
2043                created_at: now,
2044                updated_at: now,
2045            })
2046        }
2047    }
2048
2049    #[derive(Default)]
2050    struct TestPlatformStore {
2051        sent_messages: Mutex<Vec<String>>,
2052    }
2053
2054    #[async_trait]
2055    impl PlatformStore for TestPlatformStore {
2056        async fn list_harnesses(&self) -> crate::Result<Vec<crate::Harness>> {
2057            Ok(Vec::new())
2058        }
2059        async fn get_harness(&self, _id: HarnessId) -> crate::Result<Option<crate::Harness>> {
2060            Ok(None)
2061        }
2062        async fn create_harness(
2063            &self,
2064            _name: &str,
2065            _display_name: Option<&str>,
2066            _description: Option<&str>,
2067            _system_prompt: &str,
2068            _parent_harness_id: Option<HarnessId>,
2069            _capabilities: &[String],
2070        ) -> crate::Result<crate::Harness> {
2071            unreachable!()
2072        }
2073        async fn update_harness(
2074            &self,
2075            _id: HarnessId,
2076            _name: Option<&str>,
2077            _display_name: Option<&str>,
2078            _description: Option<&str>,
2079            _system_prompt: Option<&str>,
2080            _parent_harness_id: Option<Option<HarnessId>>,
2081        ) -> crate::Result<crate::Harness> {
2082            unreachable!()
2083        }
2084        async fn delete_harness(&self, _id: HarnessId) -> crate::Result<()> {
2085            Ok(())
2086        }
2087        async fn copy_harness(
2088            &self,
2089            _id: HarnessId,
2090            _new_name: Option<&str>,
2091        ) -> crate::Result<crate::Harness> {
2092            unreachable!()
2093        }
2094        async fn list_agents(&self) -> crate::Result<Vec<crate::Agent>> {
2095            Ok(Vec::new())
2096        }
2097        async fn get_agent_by_id(&self, _id: AgentId) -> crate::Result<Option<crate::Agent>> {
2098            Ok(None)
2099        }
2100        async fn create_agent(
2101            &self,
2102            _name: &str,
2103            _display_name: Option<&str>,
2104            _description: Option<&str>,
2105            _system_prompt: &str,
2106            _capabilities: &[String],
2107        ) -> crate::Result<crate::Agent> {
2108            unreachable!()
2109        }
2110        async fn update_agent(
2111            &self,
2112            _id: AgentId,
2113            _name: Option<&str>,
2114            _display_name: Option<&str>,
2115            _description: Option<&str>,
2116            _system_prompt: Option<&str>,
2117        ) -> crate::Result<crate::Agent> {
2118            unreachable!()
2119        }
2120        async fn delete_agent(&self, _id: AgentId) -> crate::Result<()> {
2121            Ok(())
2122        }
2123        async fn list_apps(
2124            &self,
2125            _search: Option<&str>,
2126            _include_archived: bool,
2127        ) -> crate::Result<Vec<crate::App>> {
2128            Ok(Vec::new())
2129        }
2130        async fn get_app(&self, _id: crate::AppId) -> crate::Result<Option<crate::App>> {
2131            Ok(None)
2132        }
2133        async fn create_app(
2134            &self,
2135            _name: &str,
2136            _description: Option<&str>,
2137            _harness_id: HarnessId,
2138            _agent_id: Option<AgentId>,
2139            _agent_identity_id: Option<crate::AgentIdentityId>,
2140            _channel_type: Option<crate::ChannelType>,
2141            _channel_config: Option<&serde_json::Value>,
2142        ) -> crate::Result<crate::App> {
2143            unreachable!()
2144        }
2145        async fn update_app(
2146            &self,
2147            _id: crate::AppId,
2148            _name: Option<&str>,
2149            _description: Option<&str>,
2150            _harness_id: Option<HarnessId>,
2151            _agent_id: Option<AgentId>,
2152            _agent_identity_id: Option<Option<crate::AgentIdentityId>>,
2153        ) -> crate::Result<crate::App> {
2154            unreachable!()
2155        }
2156        async fn delete_app(&self, _id: crate::AppId) -> crate::Result<()> {
2157            Ok(())
2158        }
2159        async fn destroy_app(&self, _id: crate::AppId) -> crate::Result<()> {
2160            Ok(())
2161        }
2162        async fn publish_app(&self, _id: crate::AppId) -> crate::Result<crate::App> {
2163            unreachable!()
2164        }
2165        async fn unpublish_app(&self, _id: crate::AppId) -> crate::Result<crate::App> {
2166            unreachable!()
2167        }
2168        async fn add_app_channel(
2169            &self,
2170            _app_id: crate::AppId,
2171            _channel_type: crate::ChannelType,
2172            _channel_config: Option<&serde_json::Value>,
2173            _enabled: Option<bool>,
2174        ) -> crate::Result<crate::AppChannel> {
2175            unreachable!()
2176        }
2177        async fn update_app_channel(
2178            &self,
2179            _app_id: crate::AppId,
2180            _channel_id: crate::AppChannelId,
2181            _channel_type: Option<crate::ChannelType>,
2182            _channel_config: Option<&serde_json::Value>,
2183            _enabled: Option<bool>,
2184        ) -> crate::Result<crate::AppChannel> {
2185            unreachable!()
2186        }
2187        async fn delete_app_channel(
2188            &self,
2189            _app_id: crate::AppId,
2190            _channel_id: crate::AppChannelId,
2191        ) -> crate::Result<()> {
2192            Ok(())
2193        }
2194        async fn list_sessions(
2195            &self,
2196            _limit: Option<usize>,
2197            _agent_id: Option<AgentId>,
2198        ) -> crate::Result<Vec<crate::Session>> {
2199            Ok(Vec::new())
2200        }
2201        async fn create_session(
2202            &self,
2203            _harness_id: HarnessId,
2204            _agent_id: Option<AgentId>,
2205            _title: Option<&str>,
2206            _locale: Option<&str>,
2207            _blueprint_id: Option<&str>,
2208            _blueprint_config: Option<&serde_json::Value>,
2209        ) -> crate::Result<crate::Session> {
2210            unreachable!()
2211        }
2212        async fn get_session_by_id(&self, _id: SessionId) -> crate::Result<Option<crate::Session>> {
2213            Ok(None)
2214        }
2215        async fn get_session_context_report(
2216            &self,
2217            id: SessionId,
2218        ) -> crate::Result<crate::SessionContextReport> {
2219            Ok(crate::SessionContextReport {
2220                session_id: id.to_string(),
2221                model: "llmsim".to_string(),
2222                context_window_tokens: None,
2223                estimated_input_tokens: 0,
2224                sections: vec![],
2225                contributions: vec![],
2226                cumulative_usage: None,
2227            })
2228        }
2229        async fn set_subagent_metadata(
2230            &self,
2231            _session_id: SessionId,
2232            _parent_session_id: SessionId,
2233            _subagent_name: &str,
2234            _subagent_task: &str,
2235            _subagent_status: crate::session::SubagentStatus,
2236        ) -> crate::Result<crate::Session> {
2237            unreachable!()
2238        }
2239        async fn delete_session(&self, _id: SessionId) -> crate::Result<()> {
2240            Ok(())
2241        }
2242        async fn send_message(&self, _session_id: SessionId, content: &str) -> crate::Result<()> {
2243            self.sent_messages.lock().unwrap().push(content.to_string());
2244            Ok(())
2245        }
2246        async fn get_messages(
2247            &self,
2248            _session_id: SessionId,
2249            _limit: Option<usize>,
2250        ) -> crate::Result<Vec<PlatformMessage>> {
2251            Ok(Vec::new())
2252        }
2253        async fn wait_for_idle(
2254            &self,
2255            _session_id: SessionId,
2256            _timeout_secs: Option<u64>,
2257        ) -> crate::Result<String> {
2258            Ok("idle".to_string())
2259        }
2260        async fn list_capabilities(
2261            &self,
2262            _search: Option<&str>,
2263        ) -> crate::Result<Vec<crate::CapabilityInfo>> {
2264            Ok(Vec::new())
2265        }
2266        fn base_url(&self) -> &str {
2267            "http://localhost:9300"
2268        }
2269    }
2270
2271    #[derive(Default)]
2272    struct NoopStorageStore;
2273
2274    #[derive(Default)]
2275    struct TestScheduleStore {
2276        schedules: Mutex<Vec<crate::session_schedule::SessionSchedule>>,
2277    }
2278
2279    #[async_trait]
2280    impl crate::traits::SessionScheduleStore for TestScheduleStore {
2281        async fn create_schedule(
2282            &self,
2283            session_id: SessionId,
2284            description: String,
2285            cron_expression: Option<String>,
2286            scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
2287            timezone: String,
2288        ) -> crate::Result<crate::session_schedule::SessionSchedule> {
2289            let schedule = crate::session_schedule::SessionSchedule {
2290                id: crate::typed_id::ScheduleId::new(),
2291                session_id,
2292                owner_principal_id: crate::PrincipalId::from_seed(1),
2293                resolved_owner_user_id: None,
2294                owner: None,
2295                effective_owner: None,
2296                description,
2297                cron_expression: cron_expression.clone(),
2298                scheduled_at,
2299                timezone,
2300                enabled: true,
2301                schedule_type: crate::session_schedule::SessionSchedule::derive_type(
2302                    &cron_expression,
2303                ),
2304                next_trigger_at: Some(chrono::Utc::now() + chrono::Duration::minutes(10)),
2305                last_triggered_at: None,
2306                trigger_count: 0,
2307                created_at: chrono::Utc::now(),
2308                updated_at: chrono::Utc::now(),
2309            };
2310            self.schedules.lock().unwrap().push(schedule.clone());
2311            Ok(schedule)
2312        }
2313
2314        async fn cancel_schedule(
2315            &self,
2316            _session_id: SessionId,
2317            schedule_id: crate::ScheduleId,
2318        ) -> crate::Result<crate::session_schedule::SessionSchedule> {
2319            let mut schedules = self.schedules.lock().unwrap();
2320            let schedule = schedules
2321                .iter_mut()
2322                .find(|schedule| schedule.id == schedule_id)
2323                .ok_or_else(|| crate::AgentLoopError::tool("Schedule not found".to_string()))?;
2324            schedule.enabled = false;
2325            Ok(schedule.clone())
2326        }
2327
2328        async fn list_schedules(
2329            &self,
2330            session_id: SessionId,
2331        ) -> crate::Result<Vec<crate::session_schedule::SessionSchedule>> {
2332            Ok(self
2333                .schedules
2334                .lock()
2335                .unwrap()
2336                .iter()
2337                .filter(|schedule| schedule.session_id == session_id)
2338                .cloned()
2339                .collect())
2340        }
2341
2342        async fn count_active_schedules(&self, session_id: SessionId) -> crate::Result<u32> {
2343            Ok(self
2344                .schedules
2345                .lock()
2346                .unwrap()
2347                .iter()
2348                .filter(|schedule| schedule.session_id == session_id && schedule.enabled)
2349                .count() as u32)
2350        }
2351    }
2352
2353    #[async_trait]
2354    impl crate::traits::SessionStorageStore for NoopStorageStore {
2355        async fn set_value(
2356            &self,
2357            _session_id: SessionId,
2358            _key: &str,
2359            _value: &str,
2360        ) -> crate::Result<()> {
2361            Ok(())
2362        }
2363        async fn get_value(
2364            &self,
2365            _session_id: SessionId,
2366            _key: &str,
2367        ) -> crate::Result<Option<String>> {
2368            Ok(None)
2369        }
2370        async fn delete_value(&self, _session_id: SessionId, _key: &str) -> crate::Result<bool> {
2371            Ok(false)
2372        }
2373        async fn list_keys(&self, _session_id: SessionId) -> crate::Result<Vec<KeyInfo>> {
2374            Ok(Vec::new())
2375        }
2376        async fn set_secret(
2377            &self,
2378            _session_id: SessionId,
2379            _name: &str,
2380            _value: &str,
2381        ) -> crate::Result<()> {
2382            Ok(())
2383        }
2384        async fn get_secret(
2385            &self,
2386            _session_id: SessionId,
2387            _name: &str,
2388        ) -> crate::Result<Option<String>> {
2389            Ok(None)
2390        }
2391        async fn delete_secret(&self, _session_id: SessionId, _name: &str) -> crate::Result<bool> {
2392            Ok(false)
2393        }
2394        async fn list_secrets(&self, _session_id: SessionId) -> crate::Result<Vec<SecretInfo>> {
2395            Ok(Vec::new())
2396        }
2397    }
2398
2399    #[tokio::test]
2400    async fn test_echo_tool() {
2401        let tool = EchoTool;
2402
2403        let result = tool
2404            .execute(serde_json::json!({"message": "Hello, world!"}))
2405            .await;
2406
2407        if let ToolExecutionResult::Success(value) = result {
2408            assert_eq!(
2409                value.get("echoed").unwrap().as_str().unwrap(),
2410                "Hello, world!"
2411            );
2412            assert_eq!(value.get("length").unwrap().as_u64().unwrap(), 13);
2413        } else {
2414            panic!("Expected success");
2415        }
2416    }
2417
2418    #[tokio::test]
2419    async fn test_failing_tool_with_tool_error() {
2420        let tool = FailingTool::with_tool_error("Something went wrong");
2421
2422        let result = tool.execute(serde_json::json!({})).await;
2423
2424        if let ToolExecutionResult::ToolError(msg) = result {
2425            assert_eq!(msg, "Something went wrong");
2426        } else {
2427            panic!("Expected tool error");
2428        }
2429    }
2430
2431    #[tokio::test]
2432    async fn test_failing_tool_with_internal_error() {
2433        let tool = FailingTool::with_internal_error("Database connection failed");
2434
2435        let result = tool.execute(serde_json::json!({})).await;
2436
2437        if let ToolExecutionResult::InternalError(err) = result {
2438            assert_eq!(err.message, "Database connection failed");
2439        } else {
2440            panic!("Expected internal error");
2441        }
2442    }
2443
2444    #[tokio::test]
2445    async fn test_tool_result_conversion() {
2446        // Success
2447        let result = ToolExecutionResult::success(serde_json::json!({"value": 42}));
2448        let tool_result = result.into_tool_result("call_1", "test_tool");
2449        assert!(tool_result.error.is_none());
2450        assert_eq!(tool_result.result.unwrap()["value"], 42);
2451
2452        // Tool error (packaged as {"error": "..."} in result field, also sets error)
2453        let result = ToolExecutionResult::tool_error("Invalid input");
2454        let tool_result = result.into_tool_result("call_2", "test_tool");
2455        assert_eq!(tool_result.error.as_deref(), Some("Invalid input"));
2456        assert_eq!(
2457            tool_result.result.unwrap(),
2458            serde_json::json!({"error": "Invalid input"})
2459        );
2460
2461        // Internal error (packaged as {"error": "..."} with generic message)
2462        let result = ToolExecutionResult::internal_error_msg("Secret database error");
2463        let tool_result = result.into_tool_result("call_3", "test_tool");
2464        assert_eq!(
2465            tool_result.error.as_deref(),
2466            Some("An internal error occurred while executing the tool")
2467        );
2468        assert_eq!(
2469            tool_result.result.unwrap(),
2470            serde_json::json!({"error": "An internal error occurred while executing the tool"})
2471        );
2472    }
2473
2474    #[tokio::test]
2475    async fn test_tool_registry() {
2476        let mut registry = ToolRegistry::new();
2477        registry.register(GetCurrentTimeTool);
2478        registry.register(EchoTool);
2479
2480        assert_eq!(registry.len(), 2);
2481        assert!(registry.has("get_current_time"));
2482        assert!(registry.has("echo"));
2483        assert!(!registry.has("nonexistent"));
2484
2485        let definitions = registry.tool_definitions();
2486        assert_eq!(definitions.len(), 2);
2487    }
2488
2489    #[tokio::test]
2490    async fn test_tool_registry_builder() {
2491        let registry = ToolRegistry::builder()
2492            .tool(GetCurrentTimeTool)
2493            .tool(EchoTool)
2494            .build();
2495
2496        assert_eq!(registry.len(), 2);
2497    }
2498
2499    #[test]
2500    fn test_tool_display_name_in_definition() {
2501        // GetCurrentTimeTool has display_name "Get Current Time"
2502        let tool = GetCurrentTimeTool;
2503        assert_eq!(tool.display_name(), Some("Get Current Time"));
2504
2505        let def = tool.to_definition();
2506        assert_eq!(def.display_name(), Some("Get Current Time"));
2507    }
2508
2509    #[test]
2510    fn test_success_with_raw_output_object_preserves_shape() {
2511        let res = ToolExecutionResult::success_with_raw_output(
2512            serde_json::json!({"stdout": "hello"}),
2513            "raw stdout bytes".to_string(),
2514        );
2515        let tr = res.into_tool_result("call_1", "demo");
2516        assert_eq!(tr.result.as_ref().unwrap()["stdout"], "hello");
2517        assert!(
2518            tr.result
2519                .as_ref()
2520                .unwrap()
2521                .as_object()
2522                .unwrap()
2523                .get("_raw_output")
2524                .is_none(),
2525            "sidecar key must not leak to the LLM-visible result"
2526        );
2527        assert_eq!(tr.raw_output.as_deref(), Some("raw stdout bytes"));
2528    }
2529
2530    #[test]
2531    fn test_success_with_raw_output_scalar_unwraps_to_string() {
2532        let res = ToolExecutionResult::success_with_raw_output(
2533            "compact summary".to_string(),
2534            "full output bytes".to_string(),
2535        );
2536        let tr = res.into_tool_result("call_1", "demo");
2537        assert_eq!(
2538            tr.result,
2539            Some(serde_json::Value::String("compact summary".into()))
2540        );
2541        assert_eq!(tr.raw_output.as_deref(), Some("full output bytes"));
2542    }
2543
2544    #[test]
2545    fn test_success_result_with_raw_output_scalar_key_is_not_unwrapped() {
2546        let res = ToolExecutionResult::success(
2547            serde_json::json!({"_raw_output_scalar": "user_value", "kept": true}),
2548        );
2549        let tr = res.into_tool_result("call_1", "demo");
2550        assert_eq!(
2551            tr.result,
2552            Some(serde_json::json!({"_raw_output_scalar": "user_value", "kept": true}))
2553        );
2554        assert_eq!(tr.raw_output, None);
2555    }
2556
2557    #[test]
2558    fn test_success_result_with_only_raw_output_scalar_key_is_not_unwrapped() {
2559        // Single-key object with _raw_output_scalar must not be mistaken for a
2560        // success_with_raw_output carrier when raw_output is absent.
2561        let res = ToolExecutionResult::success(serde_json::json!({"_raw_output_scalar": "v"}));
2562        let tr = res.into_tool_result("call_1", "demo");
2563        assert_eq!(
2564            tr.result,
2565            Some(serde_json::json!({"_raw_output_scalar": "v"}))
2566        );
2567        assert_eq!(tr.raw_output, None);
2568    }
2569
2570    #[test]
2571    fn test_echo_tool_display_name() {
2572        let tool = EchoTool;
2573        assert_eq!(tool.display_name(), Some("Echo"));
2574
2575        let def = tool.to_definition();
2576        assert_eq!(def.display_name(), Some("Echo"));
2577    }
2578
2579    #[test]
2580    fn test_all_default_tools_have_display_names() {
2581        let registry = ToolRegistry::with_defaults();
2582        let definitions = registry.tool_definitions();
2583
2584        for def in &definitions {
2585            assert!(
2586                def.display_name().is_some(),
2587                "Tool '{}' should have a display_name",
2588                def.name()
2589            );
2590        }
2591    }
2592
2593    #[tokio::test]
2594    async fn test_tool_registry_as_executor() {
2595        let mut registry = ToolRegistry::new();
2596        registry.register(EchoTool);
2597
2598        let tool_call = ToolCall {
2599            id: "call_1".to_string(),
2600            name: "echo".to_string(),
2601            arguments: serde_json::json!({"message": "test"}),
2602        };
2603
2604        let tool_def = registry.get("echo").unwrap().to_definition();
2605        let result = registry.execute(&tool_call, &tool_def).await.unwrap();
2606
2607        assert!(result.error.is_none());
2608        assert_eq!(result.result.unwrap()["echoed"], "test");
2609    }
2610
2611    #[test]
2612    fn test_tool_to_definition() {
2613        let tool = GetCurrentTimeTool;
2614        let def = tool.to_definition();
2615
2616        let ToolDefinition::Builtin(builtin) = def else {
2617            panic!("expected Builtin variant");
2618        };
2619        assert_eq!(builtin.name, "get_current_time");
2620        assert_eq!(builtin.policy, ToolPolicy::Auto);
2621    }
2622
2623    #[test]
2624    fn test_with_defaults_has_expected_tools() {
2625        let registry = ToolRegistry::with_defaults();
2626
2627        // Core tools
2628        assert!(
2629            registry.has("get_current_time"),
2630            "should have get_current_time"
2631        );
2632        assert!(registry.has("echo"), "should have echo");
2633        // spawn_background is contributed by the background_execution
2634        // capability (auto-activated) — it must NOT be in defaults.
2635        assert!(
2636            !registry.has("spawn_background"),
2637            "spawn_background must NOT be in defaults — it comes from the \
2638             background_execution capability"
2639        );
2640        assert!(
2641            registry.has("report_progress"),
2642            "should have report_progress"
2643        );
2644
2645        // TestMath capability tools
2646        assert!(registry.has("add"), "should have add");
2647        assert!(registry.has("subtract"), "should have subtract");
2648        assert!(registry.has("multiply"), "should have multiply");
2649        assert!(registry.has("divide"), "should have divide");
2650
2651        // TestWeather capability tools
2652        assert!(registry.has("get_weather"), "should have get_weather");
2653        assert!(registry.has("get_forecast"), "should have get_forecast");
2654
2655        // TaskList capability tools
2656        assert!(registry.has("write_todos"), "should have write_todos");
2657
2658        // FileSystem capability tools
2659        assert!(registry.has("read_file"), "should have read_file");
2660        assert!(registry.has("write_file"), "should have write_file");
2661        assert!(registry.has("edit_file"), "should have edit_file");
2662        assert!(registry.has("list_directory"), "should have list_directory");
2663        assert!(registry.has("grep_files"), "should have grep_files");
2664        assert!(registry.has("delete_file"), "should have delete_file");
2665        assert!(registry.has("stat_file"), "should have stat_file");
2666
2667        // WebFetch capability tools
2668        assert!(registry.has("web_fetch"), "should have web_fetch");
2669
2670        // Total count: 19 - 1 (spawn_background, moved to capability) = 18
2671        assert_eq!(registry.len(), 18, "should have 18 default tools");
2672    }
2673
2674    #[tokio::test]
2675    async fn test_with_defaults_tools_are_executable() {
2676        let registry = ToolRegistry::with_defaults();
2677
2678        // Test echo tool execution
2679        let tool_call = ToolCall {
2680            id: "call_1".to_string(),
2681            name: "echo".to_string(),
2682            arguments: serde_json::json!({"message": "hello from defaults"}),
2683        };
2684
2685        let tool_def = registry.get("echo").unwrap().to_definition();
2686        let result = registry.execute(&tool_call, &tool_def).await.unwrap();
2687
2688        assert!(result.error.is_none());
2689        assert_eq!(result.result.unwrap()["echoed"], "hello from defaults");
2690    }
2691
2692    #[tokio::test]
2693    async fn test_with_defaults_math_tools() {
2694        let registry = ToolRegistry::with_defaults();
2695
2696        // Test add tool
2697        let tool_call = ToolCall {
2698            id: "call_add".to_string(),
2699            name: "add".to_string(),
2700            arguments: serde_json::json!({"a": 5, "b": 3}),
2701        };
2702
2703        let tool_def = registry.get("add").unwrap().to_definition();
2704        let result = registry.execute(&tool_call, &tool_def).await.unwrap();
2705
2706        assert!(result.error.is_none());
2707        // AddTool returns floats, so compare as f64
2708        assert_eq!(result.result.unwrap()["result"].as_f64().unwrap(), 8.0);
2709    }
2710
2711    /// Regression: with_defaults() must NOT include capability-provided tools like
2712    /// 'bash'. These tools come from capabilities and must be registered separately.
2713    /// If bash were in defaults, the harness capability fallback would be masked.
2714    #[test]
2715    fn test_with_defaults_excludes_capability_only_tools() {
2716        let registry = ToolRegistry::with_defaults();
2717
2718        // bash comes from virtual_bash capability, not defaults
2719        assert!(
2720            !registry.has("bash"),
2721            "bash must not be in defaults — it comes from virtual_bash capability"
2722        );
2723        // kv_store/secret_store come from session_storage capability
2724        assert!(
2725            !registry.has("kv_store"),
2726            "kv_store must not be in defaults — it comes from session_storage capability"
2727        );
2728        // spawn_background comes from background_execution capability and is
2729        // auto-activated by `collect_capabilities_with_configs` when a
2730        // background-capable tool is present (see EVE-501).
2731        assert!(
2732            !registry.has("spawn_background"),
2733            "spawn_background must not be in defaults — it comes from the \
2734             background_execution capability (auto-activated by tool hints)"
2735        );
2736    }
2737
2738    #[tokio::test]
2739    async fn test_spawn_background_executes_and_signals_session() {
2740        let session_id = SessionId::new();
2741        let resource_registry = Arc::new(TestSessionResourceRegistry::default());
2742        let file_store = Arc::new(TestFileStore::default());
2743        let platform_store = Arc::new(TestPlatformStore::default());
2744        let storage_store = Arc::new(NoopStorageStore);
2745        let tool_registry = ToolRegistry::builder()
2746            .tool(SpawnBackgroundTool)
2747            .tool(TestBackgroundTool)
2748            .build();
2749
2750        let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
2751            .with_tool_registry(Arc::new(tool_registry))
2752            .with_platform_store(platform_store.clone())
2753            .with_session_resource_registry(resource_registry.clone());
2754
2755        let tool = SpawnBackgroundTool;
2756        let result = tool
2757            .execute_with_context(
2758                json!({
2759                    "tool": "test_background",
2760                    "args": { "summary": "Background complete" }
2761                }),
2762                &context,
2763            )
2764            .await;
2765
2766        let ToolExecutionResult::Success(value) = result else {
2767            panic!("spawn_background should succeed");
2768        };
2769        let run_id = value["run_id"].as_str().unwrap().to_string();
2770
2771        tokio::time::timeout(std::time::Duration::from_secs(2), async {
2772            loop {
2773                let entry = resource_registry
2774                    .get(session_id, &run_id)
2775                    .await
2776                    .unwrap()
2777                    .expect("resource exists");
2778                if entry.status == SessionResourceStatus::Completed {
2779                    break entry;
2780                }
2781                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2782            }
2783        })
2784        .await
2785        .expect("background run should complete");
2786
2787        let messages = platform_store.sent_messages.lock().unwrap().clone();
2788        assert_eq!(messages.len(), 1);
2789        assert!(messages[0].contains("Background run completed"));
2790        assert!(messages[0].contains(&run_id));
2791
2792        let log_file = file_store
2793            .read_file(session_id, &format!("/.background/{run_id}/output.log"))
2794            .await
2795            .unwrap()
2796            .expect("log file");
2797        assert!(
2798            log_file
2799                .content
2800                .as_deref()
2801                .unwrap_or_default()
2802                .contains("hello from background")
2803        );
2804    }
2805
2806    #[tokio::test]
2807    async fn test_spawn_background_persists_failure_artifacts() {
2808        let session_id = SessionId::new();
2809        let resource_registry = Arc::new(TestSessionResourceRegistry::default());
2810        let file_store = Arc::new(TestFileStore::default());
2811        let storage_store = Arc::new(NoopStorageStore);
2812        let tool_registry = ToolRegistry::builder()
2813            .tool(SpawnBackgroundTool)
2814            .tool(TestFailingBackgroundTool)
2815            .build();
2816
2817        let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
2818            .with_tool_registry(Arc::new(tool_registry))
2819            .with_session_resource_registry(resource_registry.clone());
2820
2821        let result = SpawnBackgroundTool
2822            .execute_with_context(
2823                json!({
2824                    "tool": "test_background_fail",
2825                    "args": {}
2826                }),
2827                &context,
2828            )
2829            .await;
2830
2831        let ToolExecutionResult::Success(value) = result else {
2832            panic!("spawn_background should succeed");
2833        };
2834        let run_id = value["run_id"].as_str().unwrap().to_string();
2835
2836        tokio::time::timeout(std::time::Duration::from_secs(2), async {
2837            loop {
2838                let entry = resource_registry
2839                    .get(session_id, &run_id)
2840                    .await
2841                    .unwrap()
2842                    .expect("resource exists");
2843                if entry.status == SessionResourceStatus::Failed {
2844                    break entry;
2845                }
2846                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2847            }
2848        })
2849        .await
2850        .expect("background run should fail");
2851
2852        let log_file = file_store
2853            .read_file(session_id, &format!("/.background/{run_id}/output.log"))
2854            .await
2855            .unwrap()
2856            .expect("log file");
2857        assert!(
2858            log_file
2859                .content
2860                .as_deref()
2861                .unwrap_or_default()
2862                .contains("background failed")
2863        );
2864
2865        let result_file = file_store
2866            .read_file(session_id, &format!("/.background/{run_id}/result.json"))
2867            .await
2868            .unwrap()
2869            .expect("result file");
2870        let result_json: Value =
2871            serde_json::from_str(result_file.content.as_deref().unwrap_or_default())
2872                .expect("valid json");
2873        assert_eq!(result_json["status"], "failed");
2874        assert_eq!(result_json["error"], "boom");
2875    }
2876
2877    #[tokio::test]
2878    async fn test_spawn_background_rejects_when_session_active_run_limit_reached() {
2879        let session_id = SessionId::new();
2880        let resource_registry = Arc::new(TestSessionResourceRegistry::default());
2881        let file_store = Arc::new(TestFileStore::default());
2882        let storage_store = Arc::new(NoopStorageStore);
2883        let release = StdArc::new(AtomicBool::new(false));
2884        let tool_registry = ToolRegistry::builder()
2885            .tool(SpawnBackgroundTool)
2886            .tool(BlockingBackgroundTool {
2887                release: release.clone(),
2888            })
2889            .build();
2890
2891        let context = ToolContext::with_stores(session_id, file_store, storage_store)
2892            .with_tool_registry(Arc::new(tool_registry))
2893            .with_session_resource_registry(resource_registry.clone());
2894
2895        let mut run_ids = Vec::new();
2896        for _ in 0..MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION {
2897            let result = SpawnBackgroundTool
2898                .execute_with_context(
2899                    json!({
2900                        "tool": "test_background_blocking",
2901                        "args": {}
2902                    }),
2903                    &context,
2904                )
2905                .await;
2906
2907            let ToolExecutionResult::Success(value) = result else {
2908                panic!("background run below the session limit should start");
2909            };
2910            run_ids.push(value["run_id"].as_str().unwrap().to_string());
2911        }
2912
2913        tokio::time::timeout(std::time::Duration::from_secs(2), async {
2914            loop {
2915                let active_runs = resource_registry
2916                    .list(
2917                        session_id,
2918                        Some(&SessionResourceFilter {
2919                            kind: Some("background_run".to_string()),
2920                            status: Some(SessionResourceStatus::Active),
2921                        }),
2922                    )
2923                    .await
2924                    .unwrap();
2925                if active_runs.len() == MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION {
2926                    break;
2927                }
2928                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2929            }
2930        })
2931        .await
2932        .expect("background runs should become active");
2933
2934        let result = SpawnBackgroundTool
2935            .execute_with_context(
2936                json!({
2937                    "tool": "test_background_blocking",
2938                    "args": {}
2939                }),
2940                &context,
2941            )
2942            .await;
2943
2944        let ToolExecutionResult::ToolError(message) = result else {
2945            release.store(true, Ordering::SeqCst);
2946            panic!("spawn_background should reject once the session limit is reached");
2947        };
2948        assert!(message.contains("active background runs per session"));
2949
2950        release.store(true, Ordering::SeqCst);
2951        tokio::time::timeout(std::time::Duration::from_secs(2), async {
2952            for run_id in run_ids {
2953                loop {
2954                    let entry = resource_registry
2955                        .get(session_id, &run_id)
2956                        .await
2957                        .unwrap()
2958                        .expect("resource exists");
2959                    if entry.status == SessionResourceStatus::Completed {
2960                        break;
2961                    }
2962                    tokio::time::sleep(std::time::Duration::from_millis(20)).await;
2963                }
2964            }
2965        })
2966        .await
2967        .expect("blocking background runs should complete after release");
2968    }
2969
2970    #[tokio::test]
2971    async fn test_spawn_background_requires_file_store() {
2972        let session_id = SessionId::new();
2973        let resource_registry = Arc::new(TestSessionResourceRegistry::default());
2974        let storage_store = Arc::new(NoopStorageStore);
2975        let tool_registry = ToolRegistry::builder()
2976            .tool(SpawnBackgroundTool)
2977            .tool(TestBackgroundTool)
2978            .build();
2979
2980        let context = ToolContext::with_storage_store(session_id, storage_store)
2981            .with_tool_registry(Arc::new(tool_registry))
2982            .with_session_resource_registry(resource_registry);
2983
2984        let result = SpawnBackgroundTool
2985            .execute_with_context(
2986                json!({
2987                    "tool": "test_background",
2988                    "args": {}
2989                }),
2990                &context,
2991            )
2992            .await;
2993
2994        let ToolExecutionResult::ToolError(message) = result else {
2995            panic!("spawn_background should reject missing file store");
2996        };
2997        assert!(message.contains("Session file store not available"));
2998    }
2999
3000    #[tokio::test]
3001    async fn test_spawn_background_caps_output_log_size() {
3002        let session_id = SessionId::new();
3003        let resource_registry = Arc::new(TestSessionResourceRegistry::default());
3004        let file_store = Arc::new(TestFileStore::default());
3005        let storage_store = Arc::new(NoopStorageStore);
3006        let tool_registry = ToolRegistry::builder()
3007            .tool(SpawnBackgroundTool)
3008            .tool(TestLargeOutputBackgroundTool)
3009            .build();
3010
3011        let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3012            .with_tool_registry(Arc::new(tool_registry))
3013            .with_session_resource_registry(resource_registry.clone());
3014
3015        let result = SpawnBackgroundTool
3016            .execute_with_context(
3017                json!({
3018                    "tool": "test_background_large_output",
3019                    "args": {}
3020                }),
3021                &context,
3022            )
3023            .await;
3024
3025        let ToolExecutionResult::Success(value) = result else {
3026            panic!("spawn_background should succeed");
3027        };
3028        let run_id = value["run_id"].as_str().unwrap().to_string();
3029
3030        tokio::time::timeout(std::time::Duration::from_secs(2), async {
3031            loop {
3032                let entry = resource_registry
3033                    .get(session_id, &run_id)
3034                    .await
3035                    .unwrap()
3036                    .expect("resource exists");
3037                if entry.status == SessionResourceStatus::Completed {
3038                    break;
3039                }
3040                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3041            }
3042        })
3043        .await
3044        .expect("background run should complete");
3045
3046        let log_content = file_store
3047            .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3048            .await
3049            .unwrap()
3050            .expect("log file")
3051            .content
3052            .unwrap_or_default();
3053
3054        assert!(log_content.contains("[system] background output truncated"));
3055        assert!(log_content.chars().count() <= MAX_BACKGROUND_OUTPUT_LOG_CHARS + 128);
3056    }
3057
3058    #[tokio::test]
3059    async fn test_spawn_background_can_create_scheduled_monitor() {
3060        let session_id = SessionId::new();
3061        let schedule_store = Arc::new(TestScheduleStore::default());
3062        let storage_store = Arc::new(NoopStorageStore);
3063        let tool_registry = ToolRegistry::builder()
3064            .tool(SpawnBackgroundTool)
3065            .tool(TestBackgroundTool)
3066            .build();
3067
3068        let context = ToolContext::with_storage_store(session_id, storage_store)
3069            .with_tool_registry(Arc::new(tool_registry))
3070            .with_schedule_store(schedule_store.clone());
3071
3072        let result = SpawnBackgroundTool
3073            .execute_with_context(
3074                json!({
3075                    "tool": "test_background",
3076                    "title": "Watch PR 1319",
3077                    "args": { "summary": "Background complete" },
3078                    "schedule": {
3079                        "cron_expression": "*/10 * * * *",
3080                        "timezone": "America/Chicago"
3081                    }
3082                }),
3083                &context,
3084            )
3085            .await;
3086
3087        let ToolExecutionResult::Success(value) = result else {
3088            panic!("spawn_background should create a schedule: {result:?}");
3089        };
3090
3091        assert_eq!(value["status"], "scheduled");
3092        assert_eq!(value["title"], "Watch PR 1319");
3093        assert_eq!(value["cron_expression"], "*/10 * * * *");
3094        assert_eq!(value["timezone"], "America/Chicago");
3095
3096        let schedules = schedule_store.list_schedules(session_id).await.unwrap();
3097        assert_eq!(schedules.len(), 1);
3098        assert_eq!(
3099            schedules[0].cron_expression.as_deref(),
3100            Some("*/10 * * * *")
3101        );
3102        assert!(schedules[0].description.contains("Monitor: Watch PR 1319"));
3103        assert!(
3104            schedules[0]
3105                .description
3106                .contains("\"summary\": \"Background complete\"")
3107        );
3108    }
3109
3110    #[tokio::test]
3111    async fn test_spawn_background_rejects_invalid_scheduled_at() {
3112        let session_id = SessionId::new();
3113        let storage_store = Arc::new(NoopStorageStore);
3114        let tool_registry = ToolRegistry::builder()
3115            .tool(SpawnBackgroundTool)
3116            .tool(TestBackgroundTool)
3117            .build();
3118        let context = ToolContext::with_storage_store(session_id, storage_store)
3119            .with_tool_registry(Arc::new(tool_registry));
3120
3121        let result = SpawnBackgroundTool
3122            .execute_with_context(
3123                json!({
3124                    "tool": "test_background",
3125                    "args": {},
3126                    "schedule": {
3127                        "scheduled_at": "tomorrow at noon"
3128                    }
3129                }),
3130                &context,
3131            )
3132            .await;
3133
3134        let ToolExecutionResult::ToolError(message) = result else {
3135            panic!("spawn_background should reject invalid scheduled_at");
3136        };
3137        assert!(message.contains("scheduled_at must be RFC3339"));
3138    }
3139
3140    #[tokio::test]
3141    async fn test_spawn_background_rejects_ambiguous_schedule_shape() {
3142        let session_id = SessionId::new();
3143        let storage_store = Arc::new(NoopStorageStore);
3144        let tool_registry = ToolRegistry::builder()
3145            .tool(SpawnBackgroundTool)
3146            .tool(TestBackgroundTool)
3147            .build();
3148        let context = ToolContext::with_storage_store(session_id, storage_store)
3149            .with_tool_registry(Arc::new(tool_registry));
3150
3151        let result = SpawnBackgroundTool
3152            .execute_with_context(
3153                json!({
3154                    "tool": "test_background",
3155                    "args": {},
3156                    "schedule": {
3157                        "cron_expression": "*/10 * * * *",
3158                        "scheduled_at": "2026-04-16T15:30:00Z"
3159                    }
3160                }),
3161                &context,
3162            )
3163            .await;
3164
3165        let ToolExecutionResult::ToolError(message) = result else {
3166            panic!("spawn_background should reject ambiguous schedule shape");
3167        };
3168        assert!(message.contains("must not include both cron_expression and scheduled_at"));
3169    }
3170}