Skip to main content

everruns_core/
tools.rs

1// Tool Abstraction for Agent Loop
2//
3// This module provides a high-level abstraction for tools that can be executed
4// by the agent loop. Tools are defined using the `Tool` trait and can be
5// registered with a `ToolRegistry` for use in the loop.
6//
7// Design decisions:
8// - Tools are defined via a trait for flexibility (function-style tools)
9// - ToolRegistry implements ToolExecutor for integration with the agent loop
10// - Error handling distinguishes between user-visible errors and internal errors
11// - Internal errors are logged but not exposed to the LLM (security)
12
13use async_trait::async_trait;
14use serde::{Deserialize, Serialize};
15use serde_json::{Value, json};
16use std::collections::HashMap;
17use std::sync::{Arc, OnceLock};
18use tracing::error;
19
20use crate::background::{
21    BackgroundEventSink, BackgroundExecutableTool, BackgroundOutcome, BackgroundProgress,
22};
23use crate::session_schedule::MAX_ACTIVE_SCHEDULES_PER_SESSION;
24use crate::tool_types::{
25    BuiltinTool, DeferrablePolicy, ToolCall, ToolDefinition, ToolHints, ToolPolicy, ToolResult,
26};
27use crate::traits::ToolContext;
28use crate::typed_id::SessionId;
29use tokio::sync::{OwnedSemaphorePermit, Semaphore};
30
31use crate::error::Result;
32use crate::traits::ToolExecutor;
33
34/// Maximum active immediate background runs allowed for a single session.
35///
36/// This mirrors the scheduled monitor cap so model-visible background execution
37/// cannot be used to queue unbounded active worker jobs for one session.
38pub const MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION: usize = 5;
39
40/// Maximum active immediate background runs allowed in this worker process.
41///
42/// The per-session semaphore limits tenant/session abuse; this process-wide
43/// semaphore keeps concurrent sessions from exhausting worker-local resources.
44const MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER: usize = 64;
45static ACTIVE_BACKGROUND_RUNS_PER_WORKER: Semaphore =
46    Semaphore::const_new(MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER);
47
48/// Per-session semaphores that enforce `MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION`.
49///
50/// Using a semaphore rather than a DB count makes the check atomic: concurrent
51/// `spawn_background` calls for the same session cannot both slip through the
52/// guard (check-then-act race) because `try_acquire` is inherently atomic.
53static SESSION_BACKGROUND_PERMITS: OnceLock<std::sync::Mutex<HashMap<SessionId, Arc<Semaphore>>>> =
54    OnceLock::new();
55
56struct SessionBackgroundPermit {
57    session_id: SessionId,
58    semaphore: Arc<Semaphore>,
59    permit: Option<OwnedSemaphorePermit>,
60}
61
62impl Drop for SessionBackgroundPermit {
63    fn drop(&mut self) {
64        drop(self.permit.take());
65
66        let Some(permits) = SESSION_BACKGROUND_PERMITS.get() else {
67            return;
68        };
69        let mut permits = permits.lock().unwrap();
70        let should_remove = permits.get(&self.session_id).is_some_and(|current| {
71            Arc::ptr_eq(current, &self.semaphore)
72                && self.semaphore.available_permits() == MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION
73                && Arc::strong_count(&self.semaphore) == 2
74        });
75        if should_remove {
76            permits.remove(&self.session_id);
77        }
78    }
79}
80
81fn try_acquire_session_background_permit(
82    session_id: SessionId,
83) -> std::result::Result<SessionBackgroundPermit, tokio::sync::TryAcquireError> {
84    let permits = SESSION_BACKGROUND_PERMITS.get_or_init(Default::default);
85    let mut permits = permits.lock().unwrap();
86    let semaphore = permits
87        .entry(session_id)
88        .or_insert_with(|| Arc::new(Semaphore::new(MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION)))
89        .clone();
90    let permit = semaphore.clone().try_acquire_owned()?;
91
92    Ok(SessionBackgroundPermit {
93        session_id,
94        semaphore,
95        permit: Some(permit),
96    })
97}
98
99#[cfg(test)]
100fn has_session_background_permits(session_id: SessionId) -> bool {
101    SESSION_BACKGROUND_PERMITS
102        .get()
103        .and_then(|permits| permits.lock().unwrap().get(&session_id).cloned())
104        .is_some()
105}
106
107// ============================================================================
108// Tool Execution Result - Error Handling Contract
109// ============================================================================
110
111/// Image data returned by a tool alongside text results.
112///
113/// This allows tools (built-in or MCP) to return images that are sent
114/// to the LLM as native image content blocks, not stringified JSON.
115#[derive(Debug, Clone, Serialize, Deserialize)]
116pub struct ToolResultImage {
117    /// Base64-encoded image data
118    pub base64: String,
119    /// MIME type (e.g., "image/png", "image/jpeg")
120    pub media_type: String,
121}
122
123/// Result of a tool execution.
124///
125/// This enum distinguishes between different outcomes:
126/// - `Success`: Tool executed successfully, result is returned to LLM
127/// - `SuccessWithImages`: Successful execution with JSON result plus images
128/// - `ToolError`: Tool-level error that should be shown to the LLM
129///   (e.g., "City not found", "Invalid date format")
130/// - `InternalError`: System-level error that should NOT be exposed to the LLM
131///   (e.g., database connection failure, API key issues)
132///
133/// # Security
134///
135/// Internal errors are logged but replaced with a generic message when
136/// returned to the LLM. This prevents leaking sensitive information like
137/// database errors, API keys, or internal system details.
138#[derive(Debug)]
139pub enum ToolExecutionResult {
140    /// Successful execution with a JSON result
141    Success(Value),
142
143    /// Successful execution with a JSON result and images.
144    /// Images are sent to the LLM as native image content blocks
145    /// (not stringified JSON), enabling visual understanding.
146    SuccessWithImages {
147        result: Value,
148        images: Vec<ToolResultImage>,
149    },
150
151    /// Tool-level error that is safe to show to the LLM
152    ///
153    /// Use this for expected error conditions that the LLM should know about,
154    /// such as validation errors, resource not found, etc.
155    ToolError(String),
156
157    /// Internal/system error that should NOT be exposed to the LLM
158    ///
159    /// Use this for unexpected errors like network failures, database errors,
160    /// or other internal issues. The error details will be logged but replaced
161    /// with a generic message when returned to the LLM.
162    InternalError(ToolInternalError),
163
164    /// A user connection is required to execute this tool.
165    ///
166    /// Instead of returning an error, this signals that the workflow should
167    /// pause and ask the client to set up a connection for the given provider.
168    /// The UI renders an inline connection dialog; once the user saves (or
169    /// cancels), a tool result is submitted and execution resumes.
170    ConnectionRequired {
171        /// Connection provider id (e.g. "daytona", "brave_search")
172        provider: String,
173    },
174}
175
176impl ToolExecutionResult {
177    /// Create a successful result
178    pub fn success(value: impl Into<Value>) -> Self {
179        ToolExecutionResult::Success(value.into())
180    }
181
182    /// Create a successful result with pre-truncation raw output for VFS persistence.
183    /// The raw output is transferred to `ToolResult.raw_output` during `into_tool_result()`.
184    pub fn success_with_raw_output(value: impl Into<Value>, raw_output: String) -> Self {
185        let mut value = value.into();
186        // Embed raw output in a sidecar key — extracted in into_tool_result().
187        // Non-object values are wrapped in a scalar carrier so raw_output still
188        // flows through; the carrier is unwrapped on extraction.
189        match value.as_object_mut() {
190            Some(obj) => {
191                obj.insert("_raw_output".to_string(), Value::String(raw_output));
192            }
193            None => {
194                value = serde_json::json!({
195                    "_raw_output_scalar": value,
196                    "_raw_output": raw_output,
197                });
198            }
199        }
200        ToolExecutionResult::Success(value)
201    }
202
203    /// Create a successful result with images
204    pub fn success_with_images(value: impl Into<Value>, images: Vec<ToolResultImage>) -> Self {
205        ToolExecutionResult::SuccessWithImages {
206            result: value.into(),
207            images,
208        }
209    }
210
211    /// Create a tool-level error (safe to show to LLM)
212    pub fn tool_error(message: impl Into<String>) -> Self {
213        ToolExecutionResult::ToolError(message.into())
214    }
215
216    /// Create an internal error (will be hidden from LLM)
217    pub fn internal_error(error: impl std::error::Error + Send + Sync + 'static) -> Self {
218        ToolExecutionResult::InternalError(ToolInternalError::new(error))
219    }
220
221    /// Create an internal error from a string message
222    pub fn internal_error_msg(message: impl Into<String>) -> Self {
223        ToolExecutionResult::InternalError(ToolInternalError::from_message(message))
224    }
225
226    /// Signal that a user connection is required before this tool can execute.
227    pub fn connection_required(provider: impl Into<String>) -> Self {
228        ToolExecutionResult::ConnectionRequired {
229            provider: provider.into(),
230        }
231    }
232
233    /// Check if this is a successful result
234    pub fn is_success(&self) -> bool {
235        matches!(
236            self,
237            ToolExecutionResult::Success(_) | ToolExecutionResult::SuccessWithImages { .. }
238        )
239    }
240
241    /// Check if this is an error (either tool error or internal error)
242    pub fn is_error(&self) -> bool {
243        matches!(
244            self,
245            ToolExecutionResult::ToolError(_) | ToolExecutionResult::InternalError(_)
246        )
247    }
248
249    /// Check if this requires a user connection setup
250    pub fn is_connection_required(&self) -> bool {
251        matches!(self, ToolExecutionResult::ConnectionRequired { .. })
252    }
253
254    /// Convert to a ToolResult for the agent loop
255    ///
256    /// Both tool errors and internal errors are packaged as `{"error": "..."}` in the
257    /// result field. This provides a consistent contract where the result field always
258    /// contains the payload, and the agent loop continues the same way for all outcomes.
259    ///
260    /// Internal errors are logged but replaced with a generic message when returned.
261    pub fn into_tool_result(self, tool_call_id: &str, tool_name: &str) -> ToolResult {
262        match self {
263            ToolExecutionResult::Success(mut value) => {
264                // Extract sidecar raw output if present (from success_with_raw_output)
265                let raw_output = value
266                    .as_object_mut()
267                    .and_then(|obj| obj.remove("_raw_output"))
268                    .and_then(|v| v.as_str().map(|s| s.to_string()));
269                // Unwrap scalar carrier only when it matches the exact wrapper shape
270                // set by success_with_raw_output for non-object inputs.
271                let result_value = if let Some(obj) = value.as_object_mut() {
272                    let is_scalar_carrier = raw_output.is_some()
273                        && obj.len() == 1
274                        && obj.contains_key("_raw_output_scalar");
275                    if is_scalar_carrier {
276                        obj.remove("_raw_output_scalar").unwrap_or(Value::Null)
277                    } else {
278                        value
279                    }
280                } else {
281                    value
282                };
283                ToolResult {
284                    tool_call_id: tool_call_id.to_string(),
285                    result: Some(result_value),
286                    images: None,
287                    error: None,
288                    connection_required: None,
289                    raw_output,
290                }
291            }
292            ToolExecutionResult::SuccessWithImages { result, images } => ToolResult {
293                tool_call_id: tool_call_id.to_string(),
294                result: Some(result),
295                images: if images.is_empty() {
296                    None
297                } else {
298                    Some(images)
299                },
300                error: None,
301                connection_required: None,
302                raw_output: None,
303            },
304            ToolExecutionResult::ToolError(message) => ToolResult {
305                tool_call_id: tool_call_id.to_string(),
306                result: Some(serde_json::json!({ "error": &message })),
307                images: None,
308                error: Some(message),
309                connection_required: None,
310                raw_output: None,
311            },
312            ToolExecutionResult::InternalError(err) => {
313                // Log the full error details for debugging
314                error!(
315                    tool_name = %tool_name,
316                    tool_call_id = %tool_call_id,
317                    error = %err.message,
318                    error_chain = %err.chain_string(),
319                    "Tool internal error (details hidden from LLM)"
320                );
321
322                // Return generic error message to LLM, packaged as {"error": "..."}
323                let generic_msg = "An internal error occurred while executing the tool";
324                ToolResult {
325                    tool_call_id: tool_call_id.to_string(),
326                    result: Some(serde_json::json!({
327                        "error": generic_msg
328                    })),
329                    images: None,
330                    error: Some(generic_msg.to_string()),
331                    connection_required: None,
332                    raw_output: None,
333                }
334            }
335            ToolExecutionResult::ConnectionRequired { ref provider } => ToolResult {
336                tool_call_id: tool_call_id.to_string(),
337                result: Some(serde_json::json!({
338                    "connection_required": provider,
339                })),
340                images: None,
341                error: None,
342                connection_required: Some(provider.clone()),
343                raw_output: None,
344            },
345        }
346    }
347}
348
349/// Internal error details (logged but not exposed to LLM)
350#[derive(Debug)]
351pub struct ToolInternalError {
352    /// Error message for logging
353    pub message: String,
354    /// Optional source error
355    pub source: Option<Box<dyn std::error::Error + Send + Sync>>,
356}
357
358impl ToolInternalError {
359    /// Create from an error
360    pub fn new(error: impl std::error::Error + Send + Sync + 'static) -> Self {
361        Self {
362            message: error.to_string(),
363            source: Some(Box::new(error)),
364        }
365    }
366
367    /// Create from a string message
368    pub fn from_message(message: impl Into<String>) -> Self {
369        Self {
370            message: message.into(),
371            source: None,
372        }
373    }
374
375    pub fn chain_string(&self) -> String {
376        let mut parts = vec![self.message.clone()];
377        let mut current = <Self as std::error::Error>::source(self);
378        while let Some(source) = current {
379            let message = source.to_string();
380            if parts.last() != Some(&message) {
381                parts.push(message);
382            }
383            current = source.source();
384        }
385        parts.join(": ")
386    }
387}
388
389impl std::fmt::Display for ToolInternalError {
390    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
391        write!(f, "{}", self.message)
392    }
393}
394
395impl std::error::Error for ToolInternalError {
396    fn source(&self) -> Option<&(dyn std::error::Error + 'static)> {
397        self.source
398            .as_ref()
399            .map(|e| e.as_ref() as &(dyn std::error::Error + 'static))
400    }
401}
402
403// ============================================================================
404// Tool Trait - Core Tool Abstraction
405// ============================================================================
406
407/// Trait for implementing tools that can be executed by the agent loop.
408///
409/// # Example
410///
411/// ```ignore
412/// use async_trait::async_trait;
413/// use serde_json::{json, Value};
414///
415/// struct GetCurrentTime;
416///
417/// #[async_trait]
418/// impl Tool for GetCurrentTime {
419///     fn name(&self) -> &str {
420///         "get_current_time"
421///     }
422///
423///     fn description(&self) -> &str {
424///         "Get the current date and time"
425///     }
426///
427///     fn parameters_schema(&self) -> Value {
428///         json!({
429///             "type": "object",
430///             "properties": {
431///                 "timezone": {
432///                     "type": "string",
433///                     "description": "Timezone (e.g., 'UTC', 'America/New_York')"
434///                 }
435///             }
436///         })
437///     }
438///
439///     async fn execute(&self, arguments: Value) -> ToolExecutionResult {
440///         let timezone = arguments.get("timezone")
441///             .and_then(|v| v.as_str())
442///             .unwrap_or("UTC");
443///
444///         ToolExecutionResult::success(json!({
445///             "current_time": chrono::Utc::now().to_rfc3339(),
446///             "timezone": timezone
447///         }))
448///     }
449/// }
450/// ```
451#[async_trait]
452pub trait Tool: Send + Sync {
453    /// Returns the tool's unique name.
454    ///
455    /// This name is used by the LLM to invoke the tool and must be unique
456    /// within a ToolRegistry.
457    fn name(&self) -> &str;
458
459    /// Returns a human-readable display name for UI rendering.
460    ///
461    /// This name is shown to users in the UI instead of the technical tool name.
462    /// For example, "Get Current Time" instead of "get_current_time".
463    /// Returns None if no display name is set, in which case the UI may
464    /// fall back to the technical name.
465    fn display_name(&self) -> Option<&str> {
466        None
467    }
468
469    /// Returns a description of what the tool does.
470    ///
471    /// This description is provided to the LLM to help it understand
472    /// when and how to use the tool.
473    fn description(&self) -> &str;
474
475    /// Returns the JSON schema for the tool's parameters.
476    ///
477    /// This schema follows the JSON Schema specification and describes
478    /// the expected arguments for the tool. The LLM uses this to
479    /// generate valid tool calls.
480    fn parameters_schema(&self) -> Value;
481
482    /// Execute the tool with the given arguments.
483    ///
484    /// # Arguments
485    ///
486    /// * `arguments` - The arguments passed to the tool as a JSON value.
487    ///   These should conform to the schema returned by `parameters_schema()`.
488    ///
489    /// # Returns
490    ///
491    /// A `ToolExecutionResult` indicating success, tool error, or internal error.
492    async fn execute(&self, arguments: Value) -> ToolExecutionResult;
493
494    /// Execute the tool with context.
495    ///
496    /// This method provides access to runtime context like session ID and
497    /// optional stores (file store, etc.). Override this method for tools
498    /// that need access to session context or external resources.
499    ///
500    /// The default implementation simply calls `execute()`, ignoring the context.
501    ///
502    /// # Arguments
503    ///
504    /// * `arguments` - The arguments passed to the tool as a JSON value.
505    /// * `context` - Runtime context containing session ID and optional stores.
506    ///
507    /// # Returns
508    ///
509    /// A `ToolExecutionResult` indicating success, tool error, or internal error.
510    async fn execute_with_context(
511        &self,
512        arguments: Value,
513        _context: &ToolContext,
514    ) -> ToolExecutionResult {
515        // Default: delegate to execute(), ignoring context
516        self.execute(arguments).await
517    }
518
519    /// Returns true if this tool requires context for execution.
520    ///
521    /// Tools that need session context (like filesystem tools) should
522    /// override this to return true.
523    fn requires_context(&self) -> bool {
524        false
525    }
526
527    /// Returns the tool policy (auto or requires_approval).
528    ///
529    /// Default is `Auto` which means the tool executes immediately.
530    /// Override to return `RequiresApproval` for sensitive operations.
531    fn policy(&self) -> ToolPolicy {
532        ToolPolicy::Auto
533    }
534
535    /// Returns semantic hints describing the tool's behavioral properties.
536    ///
537    /// Override to provide hints like readonly, destructive, idempotent, etc.
538    /// Default is empty (all hints unspecified).
539    fn hints(&self) -> ToolHints {
540        ToolHints::default()
541    }
542
543    /// Returns backend-authored narration for a call to this tool, e.g.
544    /// "Read AGENTS.md".
545    ///
546    /// The owning capability's default [`crate::capabilities::Capability::narrate`]
547    /// dispatches here for the tool whose `name()` matches the call. Return
548    /// `None` to accept the generic `narration_noun`/display-name fallback.
549    /// Implementations should use the phrasing helpers in
550    /// [`crate::tool_narration`] (`narrate_read_file`, `narrate_shell_exec`, …)
551    /// so wording and localization stay consistent.
552    fn narrate(
553        &self,
554        _tool_call: &crate::tool_types::ToolCall,
555        _phase: crate::tool_narration::ToolNarrationPhase,
556        _locale: Option<&str>,
557    ) -> Option<String> {
558        None
559    }
560
561    /// Returns native background execution support when this tool opts into
562    /// detached execution via `hints().supports_background`.
563    fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
564        None
565    }
566
567    /// Deferral policy for progressive tool-schema disclosure (tool search).
568    /// Hot-path or "consult-first" tools can return [`DeferrablePolicy::Never`]
569    /// to always keep their full schema directly callable. Defaults to
570    /// [`DeferrablePolicy::Automatic`].
571    fn deferrable_policy(&self) -> DeferrablePolicy {
572        DeferrablePolicy::default()
573    }
574
575    /// Convert this tool to a ToolDefinition for the agent config.
576    ///
577    /// This is used by ToolRegistry to generate tool definitions
578    /// for the LLM provider.
579    fn to_definition(&self) -> ToolDefinition {
580        ToolDefinition::Builtin(BuiltinTool {
581            name: self.name().to_string(),
582            display_name: self.display_name().map(|s| s.to_string()),
583            description: self.description().to_string(),
584            parameters: self.parameters_schema(),
585            policy: self.policy(),
586            category: None,
587            deferrable: self.deferrable_policy(),
588            hints: self.hints(),
589            full_parameters: None,
590        })
591    }
592}
593
594// ============================================================================
595// ToolRegistry - Collection of Tools
596// ============================================================================
597
598/// A registry that holds multiple tools and implements ToolExecutor.
599///
600/// ToolRegistry provides a convenient way to manage multiple tools and
601/// integrate them with the agent loop. It implements `ToolExecutor` so
602/// it can be used directly with `AgentLoop`.
603///
604/// # Example
605///
606/// ```ignore
607/// use everruns_core::tools::{Tool, ToolRegistry};
608///
609/// // Create registry and add tools
610/// let mut registry = ToolRegistry::new();
611/// registry.register(Box::new(GetCurrentTime));
612/// registry.register(Box::new(GetWeather));
613///
614/// // Get tool definitions for agent config
615/// let definitions = registry.tool_definitions();
616///
617/// // Use with agent loop
618/// let agent_loop = AgentLoop::new(config, emitter, store, llm, registry);
619/// ```
620#[derive(Default, Clone)]
621pub struct ToolRegistry {
622    tools: HashMap<String, Arc<dyn Tool>>,
623}
624
625impl ToolRegistry {
626    /// Create a new empty tool registry
627    pub fn new() -> Self {
628        Self {
629            tools: HashMap::new(),
630        }
631    }
632
633    /// Create a tool registry with default built-in tools.
634    ///
635    /// This includes:
636    /// - `get_current_time`: Returns the current date and time
637    /// - `echo`: Echoes back the provided message
638    /// - `report_progress`: Emits deterministic external progress updates
639    /// - TestMath tools: add, subtract, multiply, divide
640    /// - TestWeather tools: get_weather, get_forecast
641    /// - TaskList tools: write_todos
642    /// - FileSystem tools: read_file, write_file, edit_file, list_directory, grep_files, delete_file, stat_file
643    /// - WebFetch tools: web_fetch
644    pub fn with_defaults() -> Self {
645        use crate::capabilities::{
646            AddTool, DeleteFileTool, DivideTool, EditFileTool, GetCurrentTimeTool, GetForecastTool,
647            GetWeatherTool, GrepFilesTool, ListDirectoryTool, MultiplyTool, ReadFileTool,
648            StatFileTool, SubtractTool, WebFetchTool, WriteFileTool, WriteTodosTool,
649        };
650        use crate::progress_reporting::ReportProgressTool;
651
652        ToolRegistry::builder()
653            .tool(GetCurrentTimeTool)
654            .tool(EchoTool)
655            // NOTE: `spawn_background` is intentionally NOT a default tool —
656            // it is contributed by the `background_execution` capability,
657            // which is auto-activated by
658            // `collect_capabilities_with_configs` whenever a collected tool
659            // declares `ToolHints::supports_background = Some(true)`. Keeping
660            // it out of defaults preserves the lockstep contract between
661            // model-visible tools and the worker execution registry: the
662            // executor only knows about `spawn_background` when the model
663            // can also see it.
664            .tool(ReportProgressTool)
665            // TestMath capability tools
666            .tool(AddTool)
667            .tool(SubtractTool)
668            .tool(MultiplyTool)
669            .tool(DivideTool)
670            // TestWeather capability tools
671            .tool(GetWeatherTool)
672            .tool(GetForecastTool)
673            // TaskList capability tools
674            .tool(WriteTodosTool)
675            // FileSystem capability tools
676            .tool(ReadFileTool)
677            .tool(WriteFileTool)
678            .tool(EditFileTool)
679            .tool(ListDirectoryTool)
680            .tool(GrepFilesTool)
681            .tool(DeleteFileTool)
682            .tool(StatFileTool)
683            // WebFetch capability tools
684            .tool(WebFetchTool::default())
685            .build()
686    }
687
688    /// Register a tool with the registry.
689    ///
690    /// If a tool with the same name already exists, it will be replaced.
691    pub fn register(&mut self, tool: impl Tool + 'static) {
692        self.tools.insert(tool.name().to_string(), Arc::new(tool));
693    }
694
695    /// Register a boxed tool
696    pub fn register_boxed(&mut self, tool: Box<dyn Tool>) {
697        self.tools.insert(tool.name().to_string(), Arc::from(tool));
698    }
699
700    /// Register an Arc-wrapped tool
701    pub fn register_arc(&mut self, tool: Arc<dyn Tool>) {
702        self.tools.insert(tool.name().to_string(), tool);
703    }
704
705    /// Get a tool by name
706    pub fn get(&self, name: &str) -> Option<&Arc<dyn Tool>> {
707        self.tools.get(name)
708    }
709
710    /// Check if a tool is registered
711    pub fn has(&self, name: &str) -> bool {
712        self.tools.contains_key(name)
713    }
714
715    /// Get the number of registered tools
716    pub fn len(&self) -> usize {
717        self.tools.len()
718    }
719
720    /// Check if the registry is empty
721    pub fn is_empty(&self) -> bool {
722        self.tools.is_empty()
723    }
724
725    /// Get all tool names
726    pub fn tool_names(&self) -> Vec<&str> {
727        self.tools.keys().map(|s| s.as_str()).collect()
728    }
729
730    /// Get tool definitions for use in RuntimeAgent.
731    ///
732    /// Returns a Vec of ToolDefinition that can be passed to
733    /// `RuntimeAgent::with_tools()`.
734    pub fn tool_definitions(&self) -> Vec<ToolDefinition> {
735        self.tools.values().map(|t| t.to_definition()).collect()
736    }
737
738    /// Remove a tool from the registry
739    pub fn unregister(&mut self, name: &str) -> Option<Arc<dyn Tool>> {
740        self.tools.remove(name)
741    }
742
743    /// Clear all tools from the registry
744    pub fn clear(&mut self) {
745        self.tools.clear();
746    }
747
748    /// Create a builder for fluent tool registration
749    pub fn builder() -> ToolRegistryBuilder {
750        ToolRegistryBuilder::new()
751    }
752}
753
754impl std::fmt::Debug for ToolRegistry {
755    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
756        f.debug_struct("ToolRegistry")
757            .field("tools", &self.tool_names())
758            .finish()
759    }
760}
761
762#[async_trait]
763impl ToolExecutor for ToolRegistry {
764    async fn execute(
765        &self,
766        tool_call: &ToolCall,
767        _tool_def: &ToolDefinition,
768    ) -> Result<ToolResult> {
769        let tool = self.tools.get(&tool_call.name).ok_or_else(|| {
770            crate::error::AgentLoopError::tool(format!("Tool not found: {}", tool_call.name))
771        })?;
772
773        let result = tool.execute(tool_call.arguments.clone()).await;
774        Ok(result.into_tool_result(&tool_call.id, &tool_call.name))
775    }
776
777    async fn execute_with_context(
778        &self,
779        tool_call: &ToolCall,
780        _tool_def: &ToolDefinition,
781        context: &ToolContext,
782    ) -> Result<ToolResult> {
783        let tool = self.tools.get(&tool_call.name).ok_or_else(|| {
784            crate::error::AgentLoopError::tool(format!("Tool not found: {}", tool_call.name))
785        })?;
786
787        // Use execute_with_context for all tools - context-aware tools will use it,
788        // regular tools will delegate to execute() via the default implementation
789        let result = tool
790            .execute_with_context(tool_call.arguments.clone(), context)
791            .await;
792        Ok(result.into_tool_result(&tool_call.id, &tool_call.name))
793    }
794}
795
796// ============================================================================
797// ToolRegistryBuilder - Fluent API for Building Registry
798// ============================================================================
799
800/// Builder for creating a ToolRegistry with a fluent API.
801///
802/// # Example
803///
804/// ```ignore
805/// let registry = ToolRegistry::builder()
806///     .tool(GetCurrentTime)
807///     .tool(GetWeather)
808///     .build();
809/// ```
810pub struct ToolRegistryBuilder {
811    registry: ToolRegistry,
812}
813
814impl ToolRegistryBuilder {
815    /// Create a new builder
816    pub fn new() -> Self {
817        Self {
818            registry: ToolRegistry::new(),
819        }
820    }
821
822    /// Add a tool to the registry
823    pub fn tool(mut self, tool: impl Tool + 'static) -> Self {
824        self.registry.register(tool);
825        self
826    }
827
828    /// Add a boxed tool to the registry
829    pub fn tool_boxed(mut self, tool: Box<dyn Tool>) -> Self {
830        self.registry.register_boxed(tool);
831        self
832    }
833
834    /// Add an Arc-wrapped tool to the registry
835    pub fn tool_arc(mut self, tool: Arc<dyn Tool>) -> Self {
836        self.registry.register_arc(tool);
837        self
838    }
839
840    /// Build the registry
841    pub fn build(self) -> ToolRegistry {
842        self.registry
843    }
844}
845
846impl Default for ToolRegistryBuilder {
847    fn default() -> Self {
848        Self::new()
849    }
850}
851
852// ============================================================================
853// Built-in Tools
854// ============================================================================
855
856/// A tool that echoes back its arguments (useful for testing)
857pub struct EchoTool;
858
859#[async_trait]
860impl Tool for EchoTool {
861    fn name(&self) -> &str {
862        "echo"
863    }
864
865    fn display_name(&self) -> Option<&str> {
866        Some("Echo")
867    }
868
869    fn description(&self) -> &str {
870        "Echo back the provided message. Useful for testing tool execution."
871    }
872
873    fn parameters_schema(&self) -> Value {
874        serde_json::json!({
875            "type": "object",
876            "properties": {
877                "message": {
878                    "type": "string",
879                    "description": "The message to echo back"
880                }
881            },
882            "required": ["message"],
883            "additionalProperties": false
884        })
885    }
886
887    fn hints(&self) -> ToolHints {
888        ToolHints::default()
889            .with_readonly(true)
890            .with_idempotent(true)
891    }
892
893    async fn execute(&self, arguments: Value) -> ToolExecutionResult {
894        let message = arguments
895            .get("message")
896            .and_then(|v| v.as_str())
897            .unwrap_or("");
898
899        ToolExecutionResult::success(serde_json::json!({
900            "echoed": message,
901            "length": message.len()
902        }))
903    }
904}
905
906/// Spawn a background-capable tool and return immediately with a run handle.
907pub struct SpawnBackgroundTool;
908
909#[derive(Debug, Clone)]
910struct BackgroundScheduleRequest {
911    cron_expression: Option<String>,
912    scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
913    timezone: String,
914}
915
916fn parse_background_schedule(
917    arguments: &Value,
918) -> std::result::Result<Option<BackgroundScheduleRequest>, String> {
919    let Some(schedule) = arguments.get("schedule") else {
920        return Ok(None);
921    };
922    let Some(schedule) = schedule.as_object() else {
923        return Err("schedule must be an object".to_string());
924    };
925
926    let cron_expression = schedule
927        .get("cron_expression")
928        .and_then(Value::as_str)
929        .map(str::trim)
930        .filter(|value| !value.is_empty())
931        .map(ToString::to_string);
932    let scheduled_at = match schedule.get("scheduled_at").and_then(Value::as_str) {
933        Some(value) => {
934            let value = value.trim();
935            if value.is_empty() {
936                None
937            } else {
938                Some(
939                    chrono::DateTime::parse_from_rfc3339(value)
940                        .map_err(|_| "scheduled_at must be RFC3339".to_string())?
941                        .with_timezone(&chrono::Utc),
942                )
943            }
944        }
945        None => None,
946    };
947
948    match (cron_expression.is_some(), scheduled_at.is_some()) {
949        (false, false) => {
950            return Err(
951                "schedule must include exactly one of cron_expression (recurring) or scheduled_at (one-shot)"
952                    .to_string(),
953            );
954        }
955        (true, true) => {
956            return Err(
957                "schedule must not include both cron_expression and scheduled_at; provide exactly one"
958                    .to_string(),
959            );
960        }
961        _ => {}
962    }
963
964    let timezone = schedule
965        .get("timezone")
966        .and_then(Value::as_str)
967        .map(str::trim)
968        .filter(|value| !value.is_empty())
969        .unwrap_or("UTC")
970        .to_string();
971
972    Ok(Some(BackgroundScheduleRequest {
973        cron_expression,
974        scheduled_at,
975        timezone,
976    }))
977}
978
979fn build_background_schedule_description(
980    tool_name: &str,
981    tool_args: &Value,
982    title: &str,
983    signal_on_completion: bool,
984) -> String {
985    let payload = json!({
986        "tool": tool_name,
987        "title": title,
988        "signal_on_completion": signal_on_completion,
989        "args": tool_args,
990    });
991    let payload_json =
992        serde_json::to_string_pretty(&payload).unwrap_or_else(|_| payload.to_string());
993
994    format!(
995        "Monitor: {title}\n\n\
996This scheduled monitor fired. Start the background run now.\n\n\
997spawn_background payload:\n{payload_json}"
998    )
999}
1000
1001#[async_trait]
1002impl Tool for SpawnBackgroundTool {
1003    fn name(&self) -> &str {
1004        "spawn_background"
1005    }
1006
1007    fn display_name(&self) -> Option<&str> {
1008        Some("Spawn Background")
1009    }
1010
1011    fn description(&self) -> &str {
1012        "Run a background-capable built-in tool asynchronously. Returns immediately and signals the session when the background run completes."
1013    }
1014
1015    fn parameters_schema(&self) -> Value {
1016        json!({
1017            "type": "object",
1018            "properties": {
1019                "tool": {
1020                    "type": "string",
1021                    "description": "Name of the built-in tool to execute in the background"
1022                },
1023                "args": {
1024                    "type": "object",
1025                    "description": "Arguments to pass to the target tool"
1026                },
1027                "title": {
1028                    "type": "string",
1029                    "description": "Optional human-readable label for the background run"
1030                },
1031                "schedule": {
1032                    "type": "object",
1033                    "description": "Optional session schedule. When provided, this creates a scheduled monitor instead of starting the run immediately.",
1034                    "properties": {
1035                        "cron_expression": {
1036                            "type": "string",
1037                            "description": "Standard 5-field cron expression for recurring runs (e.g. '*/10 * * * *' for every 10 minutes)"
1038                        },
1039                        "scheduled_at": {
1040                            "type": "string",
1041                            "description": "ISO 8601 datetime for a one-shot run (e.g. '2026-04-16T15:30:00Z')"
1042                        },
1043                        "timezone": {
1044                            "type": "string",
1045                            "description": "IANA timezone for the schedule. Default: UTC"
1046                        }
1047                    },
1048                    "additionalProperties": false
1049                },
1050                "signal_on_completion": {
1051                    "type": "boolean",
1052                    "description": "Send a synthetic user message back to the session when the run completes",
1053                    "default": true
1054                }
1055            },
1056            "required": ["tool", "args"],
1057            "additionalProperties": false
1058        })
1059    }
1060
1061    async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
1062        ToolExecutionResult::tool_error(
1063            "spawn_background requires context. This tool must be executed with session context.",
1064        )
1065    }
1066
1067    async fn execute_with_context(
1068        &self,
1069        arguments: Value,
1070        context: &ToolContext,
1071    ) -> ToolExecutionResult {
1072        let tool_name = match arguments.get("tool").and_then(|v| v.as_str()) {
1073            Some(name) if !name.trim().is_empty() => name.trim(),
1074            _ => return ToolExecutionResult::tool_error("Missing required parameter: tool"),
1075        };
1076        let tool_args = match arguments.get("args") {
1077            Some(args) if args.is_object() => args.clone(),
1078            _ => {
1079                return ToolExecutionResult::tool_error(
1080                    "Missing required parameter: args (object expected)",
1081                );
1082            }
1083        };
1084        let signal_on_completion = arguments
1085            .get("signal_on_completion")
1086            .and_then(|v| v.as_bool())
1087            .unwrap_or(true);
1088        let schedule_request = match parse_background_schedule(&arguments) {
1089            Ok(schedule) => schedule,
1090            Err(message) => return ToolExecutionResult::tool_error(message),
1091        };
1092
1093        let Some(tool_registry) = &context.tool_registry else {
1094            return ToolExecutionResult::tool_error(
1095                "Tool registry not available in this context. spawn_background requires worker-side tool execution.",
1096            );
1097        };
1098
1099        let Some(tool) = tool_registry.get(tool_name).cloned() else {
1100            return ToolExecutionResult::tool_error(format!("Unknown tool: {tool_name}"));
1101        };
1102        if tool_name == self.name() {
1103            return ToolExecutionResult::tool_error(
1104                "spawn_background cannot target itself recursively",
1105            );
1106        }
1107        if tool.hints().supports_background != Some(true) {
1108            return ToolExecutionResult::tool_error(format!(
1109                "Tool does not support background execution: {tool_name}"
1110            ));
1111        }
1112        if tool.as_background_executable().is_none() {
1113            return ToolExecutionResult::tool_error(format!(
1114                "Tool declared background support but has no background executor: {tool_name}"
1115            ));
1116        }
1117        let title = arguments
1118            .get("title")
1119            .and_then(|v| v.as_str())
1120            .map(str::trim)
1121            .filter(|s| !s.is_empty())
1122            .map(|s| s.to_string())
1123            .unwrap_or_else(|| {
1124                tool.display_name()
1125                    .map(ToString::to_string)
1126                    .unwrap_or_else(|| format!("Background {tool_name}"))
1127            });
1128
1129        if let Some(schedule_request) = schedule_request {
1130            let Some(schedule_store) = &context.schedule_store else {
1131                return ToolExecutionResult::tool_error(
1132                    "Schedule store not available in this context. Scheduled monitors require session schedules.",
1133                );
1134            };
1135
1136            match schedule_store
1137                .count_active_schedules(context.session_id)
1138                .await
1139            {
1140                Ok(count) if count >= MAX_ACTIVE_SCHEDULES_PER_SESSION => {
1141                    return ToolExecutionResult::tool_error(format!(
1142                        "Maximum {MAX_ACTIVE_SCHEDULES_PER_SESSION} active schedules per session. Cancel an existing schedule first."
1143                    ));
1144                }
1145                Err(err) => return ToolExecutionResult::internal_error(err),
1146                _ => {}
1147            }
1148
1149            let description = build_background_schedule_description(
1150                tool_name,
1151                &tool_args,
1152                &title,
1153                signal_on_completion,
1154            );
1155
1156            return match schedule_store
1157                .create_schedule(
1158                    context.session_id,
1159                    description,
1160                    schedule_request.cron_expression.clone(),
1161                    schedule_request.scheduled_at,
1162                    schedule_request.timezone.clone(),
1163                )
1164                .await
1165            {
1166                Ok(schedule) => {
1167                    // Best-effort: create a monitor task linked to this schedule.
1168                    // The schedule is the source of truth; task creation failure
1169                    // does not fail the spawn.
1170                    let mut monitor_task_id: Option<String> = None;
1171                    if let Some(ref task_registry) = context.session_task_registry {
1172                        let spec = json!({
1173                            "tool": tool_name,
1174                            "arguments": &tool_args,
1175                            "schedule_id": schedule.id.to_string(),
1176                            "schedule_type": schedule.schedule_type,
1177                            "cron_expression": schedule.cron_expression,
1178                            "scheduled_at": schedule.scheduled_at,
1179                            "timezone": schedule.timezone,
1180                            "signal_on_completion": signal_on_completion,
1181                        });
1182                        match task_registry
1183                            .create(crate::session_task::CreateSessionTask {
1184                                session_id: context.session_id,
1185                                id: None,
1186                                kind: crate::session_task::TASK_KIND_MONITOR.to_string(),
1187                                display_name: title.clone(),
1188                                spec,
1189                                state: crate::session_task::SessionTaskState::Running,
1190                                links: crate::session_task::TaskLinks::default(),
1191                                // Silent: the schedule's injected prompt message already
1192                                // wakes the session; a second wake on every fire would be noise.
1193                                wake_policy: crate::session_task::TaskWakePolicy::Silent,
1194                            })
1195                            .await
1196                        {
1197                            Ok(task) => {
1198                                monitor_task_id = Some(task.id);
1199                            }
1200                            Err(e) => {
1201                                tracing::warn!(
1202                                    session_id = %context.session_id,
1203                                    schedule_id = %schedule.id,
1204                                    error = %e,
1205                                    "Failed to create monitor task for schedule (best-effort)"
1206                                );
1207                            }
1208                        }
1209                    }
1210                    ToolExecutionResult::success(json!({
1211                        "created": true,
1212                        "status": "scheduled",
1213                        "title": title,
1214                        "tool": tool_name,
1215                        "signal_on_completion": signal_on_completion,
1216                        "schedule_id": schedule.id.to_string(),
1217                        "schedule_type": schedule.schedule_type,
1218                        "cron_expression": schedule.cron_expression,
1219                        "scheduled_at": schedule.scheduled_at,
1220                        "timezone": schedule.timezone,
1221                        "next_trigger_at": schedule.next_trigger_at,
1222                        "enabled": schedule.enabled,
1223                        "task_id": monitor_task_id,
1224                    }))
1225                }
1226                Err(err) => ToolExecutionResult::internal_error(err),
1227            };
1228        }
1229
1230        let Some(task_registry) = &context.session_task_registry else {
1231            return ToolExecutionResult::tool_error(
1232                "Session task registry not available in this context. Background runs require task tracking.",
1233            );
1234        };
1235        if context.file_store.is_none() {
1236            return ToolExecutionResult::tool_error(
1237                "Session file store not available in this context. spawn_background requires artifact persistence.",
1238            );
1239        }
1240
1241        let background_run_permit = match ACTIVE_BACKGROUND_RUNS_PER_WORKER.try_acquire() {
1242            Ok(permit) => permit,
1243            Err(_) => {
1244                return ToolExecutionResult::tool_error(format!(
1245                    "Worker is already running the maximum {MAX_ACTIVE_BACKGROUND_RUNS_PER_WORKER} active background runs. Try again after an existing run finishes."
1246                ));
1247            }
1248        };
1249
1250        let session_run_permit = match try_acquire_session_background_permit(context.session_id) {
1251            Ok(permit) => permit,
1252            Err(_) => {
1253                return ToolExecutionResult::tool_error(format!(
1254                    "Maximum {MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION} active background runs per session. Wait for an existing run to finish before starting another."
1255                ));
1256            }
1257        };
1258
1259        let run_id = format!("bg_{}", uuid::Uuid::now_v7().simple());
1260        let artifact_dir = format!("/.background/{run_id}");
1261        let log_path = format!("{artifact_dir}/output.log");
1262        let result_path = format!("{artifact_dir}/result.json");
1263
1264        // Create the session task tracking this run (specs/session-tasks.md).
1265        // task_registry is guaranteed Some above.
1266        let (task_id, task_attempt): (Option<String>, i32) = match task_registry
1267            .create(crate::session_task::CreateSessionTask {
1268                session_id: context.session_id,
1269                id: None,
1270                kind: crate::session_task::TASK_KIND_BACKGROUND_TOOL.to_string(),
1271                display_name: title.clone(),
1272                spec: json!({
1273                    "tool": tool_name,
1274                    "arguments": &tool_args,
1275                    // Reaper uses this to decide whether to re-run on orphan.
1276                    // Only idempotent/readonly tools are safe to re-execute.
1277                    "reattachable": tool.hints().idempotent.unwrap_or(false)
1278                        || tool.hints().readonly.unwrap_or(false),
1279                    // Persisted so re-attach can restore the original signaling behavior.
1280                    "signal_on_completion": signal_on_completion,
1281                }),
1282                state: crate::session_task::SessionTaskState::Running,
1283                links: crate::session_task::TaskLinks::default(),
1284                wake_policy: crate::session_task::TaskWakePolicy::Silent,
1285            })
1286            .await
1287        {
1288            Ok(task) => (Some(task.id), task.attempt),
1289            Err(e) => {
1290                return ToolExecutionResult::internal_error_msg(format!(
1291                    "Failed to create background run task: {e}"
1292                ));
1293            }
1294        };
1295
1296        let background_context = context.clone().with_tool_registry(tool_registry.clone());
1297        let sink = Arc::new(SessionBackgroundSink::new(
1298            background_context.clone(),
1299            run_id.clone(),
1300            title.clone(),
1301            tool_name.to_string(),
1302            log_path.clone(),
1303            result_path.clone(),
1304            signal_on_completion,
1305            task_id.clone(),
1306        ));
1307        let run_id_for_task = run_id.clone();
1308        let tool_for_task = tool.clone();
1309        let tool_name_for_task = tool_name.to_string();
1310
1311        // Clone registry/ids for the cancel-watcher inside the spawned task.
1312        let cancel_registry = context.session_task_registry.clone();
1313        let cancel_session_id = context.session_id;
1314        let cancel_task_id = task_id.clone();
1315        // Attempt captured at task creation for stale-attempt fencing.
1316        let cancel_task_attempt = task_attempt;
1317
1318        tokio::spawn(async move {
1319            let _background_run_permit = background_run_permit;
1320            let _session_run_permit = session_run_permit;
1321            let _ = sink.status("Starting").await;
1322
1323            // Run the tool future inside a select! against a cancel-watch loop
1324            // when a task registry and task_id are wired up.  The watcher sends
1325            // a heartbeat every ~2 s and checks cancel_requested_at; when set,
1326            // it wins the select and the tool future is dropped.
1327            //
1328            // Rationale: cancel intent is recorded in shared storage so this
1329            // design works even when cancel_task executes on a different worker.
1330            let outcome: std::result::Result<BackgroundOutcome, ToolExecutionResult> = match (
1331                cancel_registry.as_ref(),
1332                cancel_task_id.as_deref(),
1333            ) {
1334                (Some(registry), Some(task_id_str)) => {
1335                    let registry = registry.clone();
1336                    let task_id_str = task_id_str.to_string();
1337                    let tool_fut = async {
1338                        match tool_for_task.as_background_executable() {
1339                            Some(background_tool) => {
1340                                background_tool
1341                                    .execute_background(tool_args, background_context, sink.clone())
1342                                    .await
1343                            }
1344                            None => Err(ToolExecutionResult::tool_error(format!(
1345                                "Tool declared background support but has no background executor: {}",
1346                                tool_name_for_task
1347                            ))),
1348                        }
1349                    };
1350                    let watch_fut = async {
1351                        loop {
1352                            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1353                            // Send heartbeat with stale-attempt fencing so a
1354                            // superseded executor's heartbeats are rejected once
1355                            // the reaper increments the attempt counter.
1356                            let _ = registry
1357                                .update(
1358                                    cancel_session_id,
1359                                    &task_id_str,
1360                                    crate::session_task::SessionTaskUpdate {
1361                                        heartbeat_at: Some(chrono::Utc::now()),
1362                                        expected_attempt: Some(cancel_task_attempt),
1363                                        ..Default::default()
1364                                    },
1365                                )
1366                                .await;
1367                            // Check for cooperative cancel intent.
1368                            if let Ok(Some(task)) =
1369                                registry.get(cancel_session_id, &task_id_str).await
1370                                && task.cancel_requested_at.is_some()
1371                            {
1372                                break;
1373                            }
1374                        }
1375                    };
1376                    tokio::select! {
1377                        result = tool_fut => result,
1378                        () = watch_fut => {
1379                            // Cancel was requested; the tool future is dropped.
1380                            Err(ToolExecutionResult::ToolError(
1381                                BACKGROUND_CANCEL_SENTINEL.to_string(),
1382                            ))
1383                        }
1384                    }
1385                }
1386                // No registry or no task_id: run without cancel watch (unchanged behaviour).
1387                _ => match tool_for_task.as_background_executable() {
1388                    Some(background_tool) => {
1389                        background_tool
1390                            .execute_background(tool_args, background_context, sink.clone())
1391                            .await
1392                    }
1393                    None => Err(ToolExecutionResult::tool_error(format!(
1394                        "Tool declared background support but has no background executor: {}",
1395                        tool_name_for_task
1396                    ))),
1397                },
1398            };
1399
1400            // Route canceled outcome through finalize_canceled so file/resource
1401            // cleanup is consistent with the success/fail paths.
1402            let finalize_result = if is_canceled_outcome(&outcome) {
1403                sink.finalize_canceled().await
1404            } else {
1405                sink.finalize(outcome).await
1406            };
1407            if let Err(err) = finalize_result {
1408                tracing::warn!(
1409                    run_id = run_id_for_task,
1410                    error = %err,
1411                    "Background run finalization failed"
1412                );
1413            }
1414        });
1415
1416        ToolExecutionResult::success(json!({
1417            "run_id": run_id,
1418            "resource_id": run_id,
1419            "task_id": task_id,
1420            "title": title,
1421            "tool": tool_name,
1422            "status": "running",
1423            "signal_on_completion": signal_on_completion,
1424            "artifact_dir": artifact_dir,
1425            "log_path": log_path,
1426            "result_path": result_path
1427        }))
1428    }
1429
1430    fn requires_context(&self) -> bool {
1431        true
1432    }
1433}
1434
1435#[derive(Debug, Default)]
1436struct SessionBackgroundState {
1437    status_text: String,
1438    progress: Option<BackgroundProgress>,
1439    output_tail: String,
1440    output_log: String,
1441    output_log_chars: usize,
1442    output_log_truncated: bool,
1443}
1444
1445const MAX_BACKGROUND_OUTPUT_LOG_CHARS: usize = 256 * 1024;
1446
1447struct SessionBackgroundSink {
1448    context: ToolContext,
1449    run_id: String,
1450    display_name: String,
1451    tool_name: String,
1452    log_path: String,
1453    result_path: String,
1454    signal_on_completion: bool,
1455    /// Session task mirroring this run; None when no task registry is wired.
1456    task_id: Option<String>,
1457    state: tokio::sync::Mutex<SessionBackgroundState>,
1458}
1459
1460impl SessionBackgroundSink {
1461    #[allow(clippy::too_many_arguments)]
1462    fn new(
1463        context: ToolContext,
1464        run_id: String,
1465        display_name: String,
1466        tool_name: String,
1467        log_path: String,
1468        result_path: String,
1469        signal_on_completion: bool,
1470        task_id: Option<String>,
1471    ) -> Self {
1472        Self {
1473            context,
1474            run_id,
1475            display_name,
1476            tool_name,
1477            log_path,
1478            result_path,
1479            signal_on_completion,
1480            task_id,
1481            state: tokio::sync::Mutex::new(SessionBackgroundState {
1482                status_text: "Queued".to_string(),
1483                ..Default::default()
1484            }),
1485        }
1486    }
1487
1488    /// Mirror an update onto the session task (best-effort).
1489    async fn mirror_task(&self, update: crate::session_task::SessionTaskUpdate) {
1490        let (Some(registry), Some(task_id)) = (&self.context.session_task_registry, &self.task_id)
1491        else {
1492            return;
1493        };
1494        let _ = registry
1495            .update(self.context.session_id, task_id, update)
1496            .await;
1497    }
1498
1499    async fn finalize_canceled(&self) -> Result<()> {
1500        let output_log = {
1501            let state = self.state.lock().await;
1502            let mut log = Self::final_output_log(&state);
1503            log.push_str("\nCanceled by request.\n");
1504            log
1505        };
1506        self.write_text_file(&self.log_path, &output_log).await?;
1507        let result_json = serde_json::to_string_pretty(&serde_json::json!({"status": "canceled"}))
1508            .unwrap_or_else(|_| r#"{"status":"canceled"}"#.to_string());
1509        self.write_text_file(&self.result_path, &result_json)
1510            .await?;
1511
1512        let mut state = self.state.lock().await;
1513        state.status_text = "Canceled".to_string();
1514        drop(state);
1515
1516        self.mirror_task(crate::session_task::SessionTaskUpdate {
1517            state: Some(crate::session_task::SessionTaskState::Canceled),
1518            summary: Some("Canceled by request.".to_string()),
1519            result_path: Some(self.result_path.clone()),
1520            ..Default::default()
1521        })
1522        .await;
1523        if self.signal_on_completion {
1524            self.signal_session("canceled", "Canceled by request.")
1525                .await?;
1526        }
1527        Ok(())
1528    }
1529
1530    async fn finalize(
1531        &self,
1532        outcome: std::result::Result<BackgroundOutcome, ToolExecutionResult>,
1533    ) -> Result<()> {
1534        match outcome {
1535            Ok(outcome) => {
1536                let output_log = if let Some(raw_output) = &outcome.raw_output {
1537                    raw_output.clone()
1538                } else {
1539                    let state = self.state.lock().await;
1540                    Self::final_output_log(&state)
1541                };
1542                self.write_text_file(&self.log_path, &output_log).await?;
1543                let result_json = serde_json::to_string_pretty(&outcome.result)
1544                    .unwrap_or_else(|_| outcome.result.to_string());
1545                self.write_text_file(&self.result_path, &result_json)
1546                    .await?;
1547
1548                let mut state = self.state.lock().await;
1549                state.status_text = "Completed".to_string();
1550                drop(state);
1551                self.mirror_task(crate::session_task::SessionTaskUpdate {
1552                    state: Some(crate::session_task::SessionTaskState::Succeeded),
1553                    summary: Some(outcome.summary.clone()),
1554                    result_path: Some(self.result_path.clone()),
1555                    ..Default::default()
1556                })
1557                .await;
1558                if self.signal_on_completion {
1559                    self.signal_session("completed", &outcome.summary).await?;
1560                }
1561            }
1562            Err(err) => {
1563                let message = match err {
1564                    ToolExecutionResult::ToolError(msg) => msg,
1565                    ToolExecutionResult::InternalError(inner) => inner.message,
1566                    ToolExecutionResult::ConnectionRequired { provider } => {
1567                        format!("Background tool requires connection setup: {provider}")
1568                    }
1569                    ToolExecutionResult::Success(_)
1570                    | ToolExecutionResult::SuccessWithImages { .. } => {
1571                        "Background run ended unexpectedly".to_string()
1572                    }
1573                };
1574                let output_log = {
1575                    let state = self.state.lock().await;
1576                    Self::final_output_log(&state)
1577                };
1578                self.write_text_file(&self.log_path, &output_log).await?;
1579                let error_json = serde_json::to_string_pretty(&json!({
1580                    "status": "failed",
1581                    "error": &message,
1582                }))
1583                .unwrap_or_else(|_| {
1584                    json!({
1585                        "status": "failed",
1586                        "error": &message,
1587                    })
1588                    .to_string()
1589                });
1590                self.write_text_file(&self.result_path, &error_json).await?;
1591                let mut state = self.state.lock().await;
1592                state.status_text = "Failed".to_string();
1593                drop(state);
1594                self.mirror_task(crate::session_task::SessionTaskUpdate {
1595                    state: Some(crate::session_task::SessionTaskState::Failed),
1596                    summary: Some(message.clone()),
1597                    result_path: Some(self.result_path.clone()),
1598                    error: Some(crate::session_task::TaskError {
1599                        kind: "error".to_string(),
1600                        message: message.clone(),
1601                    }),
1602                    ..Default::default()
1603                })
1604                .await;
1605                if self.signal_on_completion {
1606                    self.signal_session("failed", &message).await?;
1607                }
1608            }
1609        }
1610
1611        Ok(())
1612    }
1613
1614    async fn signal_session(&self, status: &str, summary: &str) -> Result<()> {
1615        let Some(platform_store) = &self.context.platform_store else {
1616            return Ok(());
1617        };
1618        let message = format!(
1619            "Background run {status}.\n- run_id: {}\n- title: {}\n- tool: {}\n- summary: {}\n- result_path: {}\n- log_path: {}",
1620            self.run_id,
1621            self.display_name,
1622            self.tool_name,
1623            summary,
1624            self.result_path,
1625            self.log_path
1626        );
1627        platform_store
1628            .send_message(self.context.session_id, &message)
1629            .await
1630    }
1631
1632    async fn write_text_file(&self, path: &str, content: &str) -> Result<()> {
1633        let file_store = self.context.file_store.as_ref().ok_or_else(|| {
1634            anyhow::anyhow!(
1635                "background run {} cannot persist artifact {} because no session file store is configured",
1636                self.run_id,
1637                path
1638            )
1639        })?;
1640
1641        ensure_directory(file_store.as_ref(), self.context.session_id, "/.background").await?;
1642        let run_dir = format!("/.background/{}", self.run_id);
1643        ensure_directory(file_store.as_ref(), self.context.session_id, &run_dir).await?;
1644        file_store
1645            .write_file(self.context.session_id, path, content, "text")
1646            .await?;
1647        Ok(())
1648    }
1649}
1650
1651#[async_trait]
1652impl BackgroundEventSink for SessionBackgroundSink {
1653    async fn status(&self, message: &str) -> Result<()> {
1654        let mut state = self.state.lock().await;
1655        state.status_text = message.to_string();
1656        drop(state);
1657        self.mirror_task(crate::session_task::SessionTaskUpdate {
1658            state_detail: Some(message.to_string()),
1659            ..Default::default()
1660        })
1661        .await;
1662        Ok(())
1663    }
1664
1665    async fn output(&self, stream: &str, delta: &str) -> Result<()> {
1666        let mut state = self.state.lock().await;
1667        if !delta.is_empty() {
1668            let prefix = format!("[{stream}] ");
1669            state.output_tail.push_str(&prefix);
1670            state.output_tail.push_str(delta);
1671            Self::append_to_output_log(&mut state, &prefix, delta);
1672            if state.output_tail.chars().count() > 2048 {
1673                state.output_tail = state
1674                    .output_tail
1675                    .chars()
1676                    .rev()
1677                    .take(2048)
1678                    .collect::<Vec<_>>()
1679                    .into_iter()
1680                    .rev()
1681                    .collect();
1682            }
1683        }
1684        Ok(())
1685    }
1686
1687    async fn progress(&self, progress: BackgroundProgress) -> Result<()> {
1688        let mut state = self.state.lock().await;
1689        state.progress = Some(progress.clone());
1690        drop(state);
1691        self.mirror_task(crate::session_task::SessionTaskUpdate {
1692            progress: Some(progress),
1693            ..Default::default()
1694        })
1695        .await;
1696        Ok(())
1697    }
1698}
1699
1700impl SessionBackgroundSink {
1701    fn append_to_output_log(state: &mut SessionBackgroundState, prefix: &str, delta: &str) {
1702        if state.output_log_chars >= MAX_BACKGROUND_OUTPUT_LOG_CHARS {
1703            state.output_log_truncated = true;
1704            return;
1705        }
1706
1707        let chunk = format!("{prefix}{delta}");
1708        let remaining = MAX_BACKGROUND_OUTPUT_LOG_CHARS - state.output_log_chars;
1709        let chunk_chars = chunk.chars().count();
1710
1711        if chunk_chars <= remaining {
1712            state.output_log.push_str(&chunk);
1713            state.output_log_chars += chunk_chars;
1714            return;
1715        }
1716
1717        let truncated_chunk: String = chunk.chars().take(remaining).collect();
1718        state.output_log.push_str(&truncated_chunk);
1719        state.output_log_chars += truncated_chunk.chars().count();
1720        state.output_log_truncated = true;
1721    }
1722
1723    fn final_output_log(state: &SessionBackgroundState) -> String {
1724        if !state.output_log_truncated {
1725            return state.output_log.clone();
1726        }
1727
1728        format!(
1729            "{}\n[system] background output truncated at {} characters\n",
1730            state.output_log, MAX_BACKGROUND_OUTPUT_LOG_CHARS
1731        )
1732    }
1733}
1734
1735/// Internal sentinel injected by the cancel-watch select branch. Namespaced
1736/// so a background tool that legitimately fails with "canceled" is never
1737/// misclassified as a cooperative cancel.
1738const BACKGROUND_CANCEL_SENTINEL: &str = "__everruns_background_cancel__";
1739
1740/// Returns true when an outcome from the cancel-watch select is the sentinel
1741/// cancel signal.  Factored out so the condition is testable without a
1742/// running async runtime.
1743fn is_canceled_outcome(
1744    outcome: &std::result::Result<BackgroundOutcome, ToolExecutionResult>,
1745) -> bool {
1746    matches!(outcome, Err(ToolExecutionResult::ToolError(msg)) if msg == BACKGROUND_CANCEL_SENTINEL)
1747}
1748
1749/// Re-attach a `background_tool` task after worker loss.
1750///
1751/// Called by `BackgroundToolTaskExecutor::start()` when the reaper decides the
1752/// task is safe to restart. Reads `spec["tool"]` and `spec["arguments"]` from
1753/// the task, looks up the tool in the built-in default registry, and spawns a
1754/// fresh background run with `task.attempt` as the heartbeat fence. New artifact
1755/// paths are generated so old partial artifacts do not conflict.
1756///
1757/// Returns an error (→ reaper falls back to orphaned-fail) when:
1758/// - `context.file_store` or `context.session_task_registry` is absent
1759/// - `spec["tool"]` is absent or empty
1760/// - the tool is not in the built-in default registry
1761/// - the tool does not implement `BackgroundExecutable`
1762/// - the tool's current hints are not `idempotent` or `readonly`
1763/// - per-worker or per-session background concurrency caps are exhausted
1764pub(crate) async fn reattach_background_run(
1765    task: &crate::session_task::SessionTask,
1766    context: &crate::traits::ToolContext,
1767) -> crate::error::Result<()> {
1768    // Fail fast before spawning a tokio task so the reaper can fall back to
1769    // orphaned-fail rather than leaving the task stuck in Running forever.
1770    if context.file_store.is_none() {
1771        return Err(crate::error::AgentLoopError::tool(
1772            "file store not available; cannot re-attach background run",
1773        ));
1774    }
1775    if context.session_task_registry.is_none() {
1776        return Err(crate::error::AgentLoopError::tool(
1777            "task registry not available; cannot re-attach background run",
1778        ));
1779    }
1780
1781    let tool_name: String = task
1782        .spec
1783        .get("tool")
1784        .and_then(|v| v.as_str())
1785        .filter(|s| !s.is_empty())
1786        .map(str::to_owned)
1787        .ok_or_else(|| {
1788            crate::error::AgentLoopError::tool(
1789                "background_tool spec missing 'tool' field; cannot re-attach",
1790            )
1791        })?;
1792
1793    let tool_args = task
1794        .spec
1795        .get("arguments")
1796        .cloned()
1797        .unwrap_or_else(|| serde_json::Value::Object(Default::default()));
1798
1799    let registry = std::sync::Arc::new(ToolRegistry::with_defaults());
1800
1801    let Some(tool) = registry.get(&tool_name).cloned() else {
1802        return Err(crate::error::AgentLoopError::tool(format!(
1803            "tool '{tool_name}' not found in built-in registry; cannot re-attach"
1804        )));
1805    };
1806
1807    if tool.as_background_executable().is_none() {
1808        return Err(crate::error::AgentLoopError::tool(format!(
1809            "tool '{tool_name}' does not support background execution; cannot re-attach"
1810        )));
1811    }
1812
1813    // Re-verify tool hints from the live registry rather than trusting
1814    // spec["reattachable"], which could be forged via task creation APIs.
1815    let hints = tool.hints();
1816    if !hints.idempotent.unwrap_or(false) && !hints.readonly.unwrap_or(false) {
1817        return Err(crate::error::AgentLoopError::tool(format!(
1818            "tool '{tool_name}' is not idempotent or readonly; re-attach declined",
1819        )));
1820    }
1821
1822    // Enforce the same concurrency caps as spawn_background so many concurrent
1823    // re-attaches cannot exhaust worker or session limits.
1824    let background_run_permit = ACTIVE_BACKGROUND_RUNS_PER_WORKER
1825        .try_acquire()
1826        .map_err(|_| {
1827            crate::error::AgentLoopError::tool(
1828                "worker background run limit reached; re-attach deferred",
1829            )
1830        })?;
1831    let session_run_permit =
1832        try_acquire_session_background_permit(task.session_id).map_err(|_| {
1833            crate::error::AgentLoopError::tool(
1834                "session background run limit reached; re-attach deferred",
1835            )
1836        })?;
1837
1838    // Restore original signaling behavior; default true for tasks created before
1839    // this field was persisted.
1840    let signal_on_completion = task
1841        .spec
1842        .get("signal_on_completion")
1843        .and_then(|v| v.as_bool())
1844        .unwrap_or(true);
1845
1846    let run_id = format!("bg_{}", uuid::Uuid::now_v7().simple());
1847    let artifact_dir = format!("/.background/{run_id}");
1848    let log_path = format!("{artifact_dir}/output.log");
1849    let result_path = format!("{artifact_dir}/result.json");
1850
1851    let task_id = task.id.clone();
1852    let task_attempt = task.attempt;
1853    let session_id = task.session_id;
1854
1855    let sink_context = context.clone().with_tool_registry(registry);
1856    let sink = std::sync::Arc::new(SessionBackgroundSink::new(
1857        sink_context.clone(),
1858        run_id.clone(),
1859        task.display_name.clone(),
1860        tool_name.to_string(),
1861        log_path,
1862        result_path,
1863        signal_on_completion,
1864        Some(task_id.clone()),
1865    ));
1866
1867    let cancel_registry = context.session_task_registry.clone();
1868    let run_id_for_log = run_id.clone();
1869
1870    tokio::spawn(async move {
1871        // Hold permits for the duration of the re-attached run.
1872        let _background_run_permit = background_run_permit;
1873        let _session_run_permit = session_run_permit;
1874        let _ = sink.status("Re-attaching").await;
1875
1876        let outcome: std::result::Result<BackgroundOutcome, ToolExecutionResult> =
1877            match (cancel_registry.as_ref(), Some(task_id.as_str())) {
1878                (Some(registry), Some(task_id_str)) => {
1879                    let registry = registry.clone();
1880                    let task_id_str = task_id_str.to_string();
1881                    let tool_fut = async {
1882                        match tool.as_background_executable() {
1883                            Some(bg) => {
1884                                bg.execute_background(tool_args, sink_context.clone(), sink.clone())
1885                                    .await
1886                            }
1887                            None => Err(ToolExecutionResult::tool_error(format!(
1888                                "tool '{tool_name}' lost background support during re-attach"
1889                            ))),
1890                        }
1891                    };
1892                    let watch_fut = async {
1893                        loop {
1894                            tokio::time::sleep(std::time::Duration::from_secs(2)).await;
1895                            let _ = registry
1896                                .update(
1897                                    session_id,
1898                                    &task_id_str,
1899                                    crate::session_task::SessionTaskUpdate {
1900                                        heartbeat_at: Some(chrono::Utc::now()),
1901                                        expected_attempt: Some(task_attempt),
1902                                        ..Default::default()
1903                                    },
1904                                )
1905                                .await;
1906                            if let Ok(Some(t)) = registry.get(session_id, &task_id_str).await
1907                                && t.cancel_requested_at.is_some()
1908                            {
1909                                break;
1910                            }
1911                        }
1912                    };
1913                    tokio::select! {
1914                        result = tool_fut => result,
1915                        () = watch_fut => Err(ToolExecutionResult::ToolError(
1916                            BACKGROUND_CANCEL_SENTINEL.to_string(),
1917                        )),
1918                    }
1919                }
1920                _ => match tool.as_background_executable() {
1921                    Some(bg) => {
1922                        bg.execute_background(tool_args, sink_context, sink.clone())
1923                            .await
1924                    }
1925                    None => Err(ToolExecutionResult::tool_error(format!(
1926                        "tool '{tool_name}' lost background support during re-attach"
1927                    ))),
1928                },
1929            };
1930
1931        let finalize_result = if is_canceled_outcome(&outcome) {
1932            sink.finalize_canceled().await
1933        } else {
1934            sink.finalize(outcome).await
1935        };
1936        if let Err(err) = finalize_result {
1937            tracing::warn!(
1938                run_id = run_id_for_log,
1939                error = %err,
1940                "Background run re-attach finalization failed"
1941            );
1942        }
1943    });
1944
1945    Ok(())
1946}
1947
1948async fn ensure_directory(
1949    file_store: &dyn crate::traits::SessionFileSystem,
1950    session_id: crate::SessionId,
1951    path: &str,
1952) -> Result<()> {
1953    if let Some(entry) = file_store.stat_file(session_id, path).await? {
1954        if entry.is_directory {
1955            return Ok(());
1956        }
1957        return Err(anyhow::anyhow!("path exists but is not a directory: {path}").into());
1958    }
1959    let _ = file_store.create_directory(session_id, path).await?;
1960    Ok(())
1961}
1962
1963/// A tool that always fails (useful for testing error handling)
1964pub struct FailingTool {
1965    error_message: String,
1966    use_internal_error: bool,
1967}
1968
1969impl FailingTool {
1970    /// Create a failing tool with a tool-level error
1971    pub fn with_tool_error(message: impl Into<String>) -> Self {
1972        Self {
1973            error_message: message.into(),
1974            use_internal_error: false,
1975        }
1976    }
1977
1978    /// Create a failing tool with an internal error
1979    pub fn with_internal_error(message: impl Into<String>) -> Self {
1980        Self {
1981            error_message: message.into(),
1982            use_internal_error: true,
1983        }
1984    }
1985}
1986
1987impl Default for FailingTool {
1988    fn default() -> Self {
1989        Self::with_tool_error("Tool execution failed")
1990    }
1991}
1992
1993#[async_trait]
1994impl Tool for FailingTool {
1995    fn name(&self) -> &str {
1996        "failing_tool"
1997    }
1998
1999    fn display_name(&self) -> Option<&str> {
2000        Some("Failing Tool")
2001    }
2002
2003    fn description(&self) -> &str {
2004        "A tool that always fails (for testing error handling)"
2005    }
2006
2007    fn parameters_schema(&self) -> Value {
2008        serde_json::json!({
2009            "type": "object",
2010            "properties": {},
2011            "additionalProperties": false
2012        })
2013    }
2014
2015    fn hints(&self) -> ToolHints {
2016        ToolHints::default()
2017            .with_readonly(true)
2018            .with_idempotent(true)
2019    }
2020
2021    async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2022        if self.use_internal_error {
2023            ToolExecutionResult::internal_error_msg(&self.error_message)
2024        } else {
2025            ToolExecutionResult::tool_error(&self.error_message)
2026        }
2027    }
2028}
2029
2030// ============================================================================
2031// Tests
2032// ============================================================================
2033
2034#[cfg(test)]
2035mod tests {
2036    use super::*;
2037    use crate::capabilities::GetCurrentTimeTool;
2038    use crate::platform_store::PlatformStore;
2039    use crate::session_file::{FileInfo, FileStat, SessionFile};
2040    use crate::session_task::SessionTaskRegistry;
2041    use crate::traits::{SessionFileSystem, SessionScheduleStore};
2042    use crate::typed_id::{HarnessId, SessionId};
2043    use crate::{AgentId, KeyInfo, PlatformMessage, SecretInfo};
2044    use async_trait::async_trait;
2045    use std::sync::{
2046        Arc as StdArc, Mutex,
2047        atomic::{AtomicBool, Ordering},
2048    };
2049
2050    #[derive(Default)]
2051    struct TestBackgroundTool;
2052
2053    #[async_trait]
2054    impl BackgroundExecutableTool for TestBackgroundTool {
2055        async fn execute_background(
2056            &self,
2057            arguments: Value,
2058            _context: ToolContext,
2059            sink: Arc<dyn BackgroundEventSink>,
2060        ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
2061            sink.status("Waiting for test result")
2062                .await
2063                .map_err(ToolExecutionResult::internal_error)?;
2064            sink.output("stdout", "hello from background")
2065                .await
2066                .map_err(ToolExecutionResult::internal_error)?;
2067            sink.progress(BackgroundProgress {
2068                current: Some(1),
2069                total: Some(1),
2070                unit: Some("step".to_string()),
2071                label: Some("done".to_string()),
2072            })
2073            .await
2074            .map_err(ToolExecutionResult::internal_error)?;
2075
2076            Ok(BackgroundOutcome {
2077                summary: arguments["summary"].as_str().unwrap_or("done").to_string(),
2078                result: json!({"ok": true}),
2079                raw_output: None,
2080            })
2081        }
2082    }
2083
2084    #[async_trait]
2085    impl Tool for TestBackgroundTool {
2086        fn name(&self) -> &str {
2087            "test_background"
2088        }
2089
2090        fn display_name(&self) -> Option<&str> {
2091            Some("Test Background")
2092        }
2093
2094        fn description(&self) -> &str {
2095            "test tool"
2096        }
2097
2098        fn parameters_schema(&self) -> Value {
2099            json!({
2100                "type": "object",
2101                "properties": {
2102                    "summary": { "type": "string" }
2103                }
2104            })
2105        }
2106
2107        async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2108            ToolExecutionResult::tool_error("foreground unsupported")
2109        }
2110
2111        fn hints(&self) -> ToolHints {
2112            ToolHints::default().with_supports_background(true)
2113        }
2114
2115        fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
2116            Some(self)
2117        }
2118    }
2119
2120    #[derive(Default)]
2121    struct TestFailingBackgroundTool;
2122
2123    #[async_trait]
2124    impl BackgroundExecutableTool for TestFailingBackgroundTool {
2125        async fn execute_background(
2126            &self,
2127            _arguments: Value,
2128            _context: ToolContext,
2129            sink: Arc<dyn BackgroundEventSink>,
2130        ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
2131            sink.status("Running failing test")
2132                .await
2133                .map_err(ToolExecutionResult::internal_error)?;
2134            sink.output("stderr", "background failed")
2135                .await
2136                .map_err(ToolExecutionResult::internal_error)?;
2137            Err(ToolExecutionResult::tool_error("boom"))
2138        }
2139    }
2140
2141    #[async_trait]
2142    impl Tool for TestFailingBackgroundTool {
2143        fn name(&self) -> &str {
2144            "test_background_fail"
2145        }
2146
2147        fn display_name(&self) -> Option<&str> {
2148            Some("Test Background Fail")
2149        }
2150
2151        fn description(&self) -> &str {
2152            "failing background test tool"
2153        }
2154
2155        fn parameters_schema(&self) -> Value {
2156            json!({
2157                "type": "object",
2158                "properties": {}
2159            })
2160        }
2161
2162        async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2163            ToolExecutionResult::tool_error("foreground unsupported")
2164        }
2165
2166        fn hints(&self) -> ToolHints {
2167            ToolHints::default().with_supports_background(true)
2168        }
2169
2170        fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
2171            Some(self)
2172        }
2173    }
2174
2175    #[derive(Default)]
2176    struct TestLargeOutputBackgroundTool;
2177
2178    #[async_trait]
2179    impl BackgroundExecutableTool for TestLargeOutputBackgroundTool {
2180        async fn execute_background(
2181            &self,
2182            _arguments: Value,
2183            _context: ToolContext,
2184            sink: Arc<dyn BackgroundEventSink>,
2185        ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
2186            let large_chunk = "x".repeat(MAX_BACKGROUND_OUTPUT_LOG_CHARS + 4096);
2187            sink.output("stdout", &large_chunk)
2188                .await
2189                .map_err(ToolExecutionResult::internal_error)?;
2190            Ok(BackgroundOutcome {
2191                summary: "large output complete".to_string(),
2192                result: json!({"ok": true}),
2193                raw_output: None,
2194            })
2195        }
2196    }
2197
2198    #[async_trait]
2199    impl Tool for TestLargeOutputBackgroundTool {
2200        fn name(&self) -> &str {
2201            "test_background_large_output"
2202        }
2203
2204        fn display_name(&self) -> Option<&str> {
2205            Some("Test Background Large Output")
2206        }
2207
2208        fn description(&self) -> &str {
2209            "background test tool with huge output"
2210        }
2211
2212        fn parameters_schema(&self) -> Value {
2213            json!({
2214                "type": "object",
2215                "properties": {}
2216            })
2217        }
2218
2219        async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2220            ToolExecutionResult::tool_error("foreground unsupported")
2221        }
2222
2223        fn hints(&self) -> ToolHints {
2224            ToolHints::default().with_supports_background(true)
2225        }
2226
2227        fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
2228            Some(self)
2229        }
2230    }
2231
2232    struct BlockingBackgroundTool {
2233        release: StdArc<AtomicBool>,
2234    }
2235
2236    #[async_trait]
2237    impl BackgroundExecutableTool for BlockingBackgroundTool {
2238        async fn execute_background(
2239            &self,
2240            _arguments: Value,
2241            _context: ToolContext,
2242            sink: Arc<dyn BackgroundEventSink>,
2243        ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
2244            sink.status("Blocking until released")
2245                .await
2246                .map_err(ToolExecutionResult::internal_error)?;
2247            while !self.release.load(Ordering::SeqCst) {
2248                tokio::time::sleep(std::time::Duration::from_millis(10)).await;
2249            }
2250            Ok(BackgroundOutcome {
2251                summary: "released".to_string(),
2252                result: json!({"ok": true}),
2253                raw_output: None,
2254            })
2255        }
2256    }
2257
2258    #[async_trait]
2259    impl Tool for BlockingBackgroundTool {
2260        fn name(&self) -> &str {
2261            "test_background_blocking"
2262        }
2263
2264        fn display_name(&self) -> Option<&str> {
2265            Some("Test Background Blocking")
2266        }
2267
2268        fn description(&self) -> &str {
2269            "background test tool that waits for test release"
2270        }
2271
2272        fn parameters_schema(&self) -> Value {
2273            json!({
2274                "type": "object",
2275                "properties": {}
2276            })
2277        }
2278
2279        async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
2280            ToolExecutionResult::tool_error("foreground unsupported")
2281        }
2282
2283        fn hints(&self) -> ToolHints {
2284            ToolHints::default().with_supports_background(true)
2285        }
2286
2287        fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
2288            Some(self)
2289        }
2290    }
2291
2292    #[derive(Default)]
2293    struct TestFileStore {
2294        files: Mutex<HashMap<String, SessionFile>>,
2295    }
2296
2297    #[async_trait]
2298    impl crate::traits::SessionFileSystem for TestFileStore {
2299        async fn read_file(
2300            &self,
2301            _session_id: SessionId,
2302            path: &str,
2303        ) -> crate::Result<Option<SessionFile>> {
2304            Ok(self.files.lock().unwrap().get(path).cloned())
2305        }
2306
2307        async fn write_file(
2308            &self,
2309            session_id: SessionId,
2310            path: &str,
2311            content: &str,
2312            encoding: &str,
2313        ) -> crate::Result<SessionFile> {
2314            let now = chrono::Utc::now();
2315            let file = SessionFile {
2316                id: uuid::Uuid::now_v7(),
2317                session_id: session_id.uuid(),
2318                path: path.to_string(),
2319                name: FileInfo::name_from_path(path),
2320                content: Some(content.to_string()),
2321                encoding: encoding.to_string(),
2322                is_directory: false,
2323                is_readonly: false,
2324                size_bytes: content.len() as i64,
2325                created_at: now,
2326                updated_at: now,
2327            };
2328            self.files
2329                .lock()
2330                .unwrap()
2331                .insert(path.to_string(), file.clone());
2332            Ok(file)
2333        }
2334
2335        async fn delete_file(
2336            &self,
2337            _session_id: SessionId,
2338            _path: &str,
2339            _recursive: bool,
2340        ) -> crate::Result<bool> {
2341            Ok(false)
2342        }
2343
2344        async fn list_directory(
2345            &self,
2346            _session_id: SessionId,
2347            _path: &str,
2348        ) -> crate::Result<Vec<FileInfo>> {
2349            Ok(Vec::new())
2350        }
2351
2352        async fn stat_file(
2353            &self,
2354            _session_id: SessionId,
2355            path: &str,
2356        ) -> crate::Result<Option<FileStat>> {
2357            let file = self.files.lock().unwrap().get(path).cloned();
2358            Ok(file.map(|entry| FileStat {
2359                path: entry.path,
2360                name: entry.name,
2361                is_directory: entry.is_directory,
2362                is_readonly: entry.is_readonly,
2363                size_bytes: entry.size_bytes,
2364                created_at: entry.created_at,
2365                updated_at: entry.updated_at,
2366            }))
2367        }
2368
2369        async fn grep_files(
2370            &self,
2371            _session_id: SessionId,
2372            _pattern: &str,
2373            _path_pattern: Option<&str>,
2374        ) -> crate::Result<Vec<crate::session_file::GrepMatch>> {
2375            Ok(Vec::new())
2376        }
2377
2378        async fn create_directory(
2379            &self,
2380            session_id: SessionId,
2381            path: &str,
2382        ) -> crate::Result<FileInfo> {
2383            let now = chrono::Utc::now();
2384            let id = uuid::Uuid::now_v7();
2385            let dir = SessionFile {
2386                id,
2387                session_id: session_id.uuid(),
2388                path: path.to_string(),
2389                name: FileInfo::name_from_path(path),
2390                content: None,
2391                encoding: "text".to_string(),
2392                is_directory: true,
2393                is_readonly: false,
2394                size_bytes: 0,
2395                created_at: now,
2396                updated_at: now,
2397            };
2398            self.files.lock().unwrap().insert(path.to_string(), dir);
2399            Ok(FileInfo {
2400                id,
2401                session_id: session_id.uuid(),
2402                path: path.to_string(),
2403                name: FileInfo::name_from_path(path),
2404                is_directory: true,
2405                is_readonly: false,
2406                size_bytes: 0,
2407                created_at: now,
2408                updated_at: now,
2409            })
2410        }
2411    }
2412
2413    #[derive(Default)]
2414    struct TestPlatformStore {
2415        sent_messages: Mutex<Vec<String>>,
2416    }
2417
2418    #[async_trait]
2419    impl PlatformStore for TestPlatformStore {
2420        async fn list_harnesses(&self) -> crate::Result<Vec<crate::Harness>> {
2421            Ok(Vec::new())
2422        }
2423        async fn get_harness(&self, _id: HarnessId) -> crate::Result<Option<crate::Harness>> {
2424            Ok(None)
2425        }
2426        async fn create_harness(
2427            &self,
2428            _name: &str,
2429            _display_name: Option<&str>,
2430            _description: Option<&str>,
2431            _system_prompt: Option<&str>,
2432            _parent_harness_id: Option<HarnessId>,
2433            _capabilities: &[String],
2434        ) -> crate::Result<crate::Harness> {
2435            unreachable!()
2436        }
2437        async fn update_harness(
2438            &self,
2439            _id: HarnessId,
2440            _name: Option<&str>,
2441            _display_name: Option<&str>,
2442            _description: Option<&str>,
2443            _system_prompt: Option<&str>,
2444            _parent_harness_id: Option<Option<HarnessId>>,
2445        ) -> crate::Result<crate::Harness> {
2446            unreachable!()
2447        }
2448        async fn delete_harness(&self, _id: HarnessId) -> crate::Result<()> {
2449            Ok(())
2450        }
2451        async fn copy_harness(
2452            &self,
2453            _id: HarnessId,
2454            _new_name: Option<&str>,
2455        ) -> crate::Result<crate::Harness> {
2456            unreachable!()
2457        }
2458        async fn list_agents(&self) -> crate::Result<Vec<crate::Agent>> {
2459            Ok(Vec::new())
2460        }
2461        async fn get_agent_by_id(&self, _id: AgentId) -> crate::Result<Option<crate::Agent>> {
2462            Ok(None)
2463        }
2464        async fn create_agent(
2465            &self,
2466            _name: &str,
2467            _display_name: Option<&str>,
2468            _description: Option<&str>,
2469            _system_prompt: &str,
2470            _capabilities: &[String],
2471        ) -> crate::Result<crate::Agent> {
2472            unreachable!()
2473        }
2474        async fn update_agent(
2475            &self,
2476            _id: AgentId,
2477            _name: Option<&str>,
2478            _display_name: Option<&str>,
2479            _description: Option<&str>,
2480            _system_prompt: Option<&str>,
2481        ) -> crate::Result<crate::Agent> {
2482            unreachable!()
2483        }
2484        async fn delete_agent(&self, _id: AgentId) -> crate::Result<()> {
2485            Ok(())
2486        }
2487        async fn list_apps(
2488            &self,
2489            _search: Option<&str>,
2490            _include_archived: bool,
2491        ) -> crate::Result<Vec<crate::App>> {
2492            Ok(Vec::new())
2493        }
2494        async fn get_app(&self, _id: crate::AppId) -> crate::Result<Option<crate::App>> {
2495            Ok(None)
2496        }
2497        async fn create_app(
2498            &self,
2499            _name: &str,
2500            _description: Option<&str>,
2501            _harness_id: HarnessId,
2502            _agent_id: Option<AgentId>,
2503            _agent_identity_id: Option<crate::AgentIdentityId>,
2504            _channel_type: Option<crate::ChannelType>,
2505            _channel_config: Option<&serde_json::Value>,
2506        ) -> crate::Result<crate::App> {
2507            unreachable!()
2508        }
2509        async fn update_app(
2510            &self,
2511            _id: crate::AppId,
2512            _name: Option<&str>,
2513            _description: Option<&str>,
2514            _harness_id: Option<HarnessId>,
2515            _agent_id: Option<AgentId>,
2516            _agent_identity_id: Option<Option<crate::AgentIdentityId>>,
2517        ) -> crate::Result<crate::App> {
2518            unreachable!()
2519        }
2520        async fn delete_app(&self, _id: crate::AppId) -> crate::Result<()> {
2521            Ok(())
2522        }
2523        async fn destroy_app(&self, _id: crate::AppId) -> crate::Result<()> {
2524            Ok(())
2525        }
2526        async fn publish_app(&self, _id: crate::AppId) -> crate::Result<crate::App> {
2527            unreachable!()
2528        }
2529        async fn unpublish_app(&self, _id: crate::AppId) -> crate::Result<crate::App> {
2530            unreachable!()
2531        }
2532        async fn add_app_channel(
2533            &self,
2534            _app_id: crate::AppId,
2535            _channel_type: crate::ChannelType,
2536            _channel_config: Option<&serde_json::Value>,
2537            _enabled: Option<bool>,
2538        ) -> crate::Result<crate::AppChannel> {
2539            unreachable!()
2540        }
2541        async fn update_app_channel(
2542            &self,
2543            _app_id: crate::AppId,
2544            _channel_id: crate::AppChannelId,
2545            _channel_type: Option<crate::ChannelType>,
2546            _channel_config: Option<&serde_json::Value>,
2547            _enabled: Option<bool>,
2548        ) -> crate::Result<crate::AppChannel> {
2549            unreachable!()
2550        }
2551        async fn delete_app_channel(
2552            &self,
2553            _app_id: crate::AppId,
2554            _channel_id: crate::AppChannelId,
2555        ) -> crate::Result<()> {
2556            Ok(())
2557        }
2558        async fn list_sessions(
2559            &self,
2560            _limit: Option<usize>,
2561            _agent_id: Option<AgentId>,
2562        ) -> crate::Result<Vec<crate::Session>> {
2563            Ok(Vec::new())
2564        }
2565        async fn create_session(
2566            &self,
2567            _harness_id: HarnessId,
2568            _agent_id: Option<AgentId>,
2569            _title: Option<&str>,
2570            _locale: Option<&str>,
2571            _blueprint_id: Option<&str>,
2572            _blueprint_config: Option<&serde_json::Value>,
2573            _parent_session_id: Option<SessionId>,
2574        ) -> crate::Result<crate::Session> {
2575            unreachable!()
2576        }
2577        async fn get_session_by_id(&self, _id: SessionId) -> crate::Result<Option<crate::Session>> {
2578            Ok(None)
2579        }
2580        async fn get_session_context_report(
2581            &self,
2582            id: SessionId,
2583        ) -> crate::Result<crate::SessionContextReport> {
2584            Ok(crate::SessionContextReport {
2585                session_id: id.to_string(),
2586                model: "llmsim".to_string(),
2587                context_window_tokens: None,
2588                estimated_input_tokens: 0,
2589                sections: vec![],
2590                contributions: vec![],
2591                cumulative_usage: None,
2592            })
2593        }
2594        async fn delete_session(&self, _id: SessionId) -> crate::Result<()> {
2595            Ok(())
2596        }
2597        async fn send_message(&self, _session_id: SessionId, content: &str) -> crate::Result<()> {
2598            self.sent_messages.lock().unwrap().push(content.to_string());
2599            Ok(())
2600        }
2601        async fn get_messages(
2602            &self,
2603            _session_id: SessionId,
2604            _limit: Option<usize>,
2605        ) -> crate::Result<Vec<PlatformMessage>> {
2606            Ok(Vec::new())
2607        }
2608        async fn wait_for_idle(
2609            &self,
2610            _session_id: SessionId,
2611            _timeout_secs: Option<u64>,
2612        ) -> crate::Result<String> {
2613            Ok("idle".to_string())
2614        }
2615        async fn list_capabilities(
2616            &self,
2617            _search: Option<&str>,
2618        ) -> crate::Result<Vec<crate::CapabilityInfo>> {
2619            Ok(Vec::new())
2620        }
2621        fn base_url(&self) -> &str {
2622            "http://localhost:9300"
2623        }
2624    }
2625
2626    #[derive(Default)]
2627    struct NoopStorageStore;
2628
2629    #[derive(Default)]
2630    struct TestScheduleStore {
2631        schedules: Mutex<Vec<crate::session_schedule::SessionSchedule>>,
2632    }
2633
2634    #[async_trait]
2635    impl crate::traits::SessionScheduleStore for TestScheduleStore {
2636        async fn create_schedule(
2637            &self,
2638            session_id: SessionId,
2639            description: String,
2640            cron_expression: Option<String>,
2641            scheduled_at: Option<chrono::DateTime<chrono::Utc>>,
2642            timezone: String,
2643        ) -> crate::Result<crate::session_schedule::SessionSchedule> {
2644            let schedule = crate::session_schedule::SessionSchedule {
2645                id: crate::typed_id::ScheduleId::new(),
2646                session_id,
2647                owner_principal_id: crate::PrincipalId::from_seed(1),
2648                resolved_owner_user_id: None,
2649                owner: None,
2650                effective_owner: None,
2651                description,
2652                cron_expression: cron_expression.clone(),
2653                scheduled_at,
2654                timezone,
2655                enabled: true,
2656                schedule_type: crate::session_schedule::SessionSchedule::derive_type(
2657                    &cron_expression,
2658                ),
2659                next_trigger_at: Some(chrono::Utc::now() + chrono::Duration::minutes(10)),
2660                last_triggered_at: None,
2661                trigger_count: 0,
2662                created_at: chrono::Utc::now(),
2663                updated_at: chrono::Utc::now(),
2664            };
2665            self.schedules.lock().unwrap().push(schedule.clone());
2666            Ok(schedule)
2667        }
2668
2669        async fn cancel_schedule(
2670            &self,
2671            _session_id: SessionId,
2672            schedule_id: crate::ScheduleId,
2673        ) -> crate::Result<crate::session_schedule::SessionSchedule> {
2674            let mut schedules = self.schedules.lock().unwrap();
2675            let schedule = schedules
2676                .iter_mut()
2677                .find(|schedule| schedule.id == schedule_id)
2678                .ok_or_else(|| crate::AgentLoopError::tool("Schedule not found".to_string()))?;
2679            schedule.enabled = false;
2680            Ok(schedule.clone())
2681        }
2682
2683        async fn list_schedules(
2684            &self,
2685            session_id: SessionId,
2686        ) -> crate::Result<Vec<crate::session_schedule::SessionSchedule>> {
2687            Ok(self
2688                .schedules
2689                .lock()
2690                .unwrap()
2691                .iter()
2692                .filter(|schedule| schedule.session_id == session_id)
2693                .cloned()
2694                .collect())
2695        }
2696
2697        async fn count_active_schedules(&self, session_id: SessionId) -> crate::Result<u32> {
2698            Ok(self
2699                .schedules
2700                .lock()
2701                .unwrap()
2702                .iter()
2703                .filter(|schedule| schedule.session_id == session_id && schedule.enabled)
2704                .count() as u32)
2705        }
2706    }
2707
2708    #[async_trait]
2709    impl crate::traits::SessionStorageStore for NoopStorageStore {
2710        async fn set_value(
2711            &self,
2712            _session_id: SessionId,
2713            _key: &str,
2714            _value: &str,
2715        ) -> crate::Result<()> {
2716            Ok(())
2717        }
2718        async fn get_value(
2719            &self,
2720            _session_id: SessionId,
2721            _key: &str,
2722        ) -> crate::Result<Option<String>> {
2723            Ok(None)
2724        }
2725        async fn delete_value(&self, _session_id: SessionId, _key: &str) -> crate::Result<bool> {
2726            Ok(false)
2727        }
2728        async fn list_keys(&self, _session_id: SessionId) -> crate::Result<Vec<KeyInfo>> {
2729            Ok(Vec::new())
2730        }
2731        async fn set_secret(
2732            &self,
2733            _session_id: SessionId,
2734            _name: &str,
2735            _value: &str,
2736        ) -> crate::Result<()> {
2737            Ok(())
2738        }
2739        async fn get_secret(
2740            &self,
2741            _session_id: SessionId,
2742            _name: &str,
2743        ) -> crate::Result<Option<String>> {
2744            Ok(None)
2745        }
2746        async fn delete_secret(&self, _session_id: SessionId, _name: &str) -> crate::Result<bool> {
2747            Ok(false)
2748        }
2749        async fn list_secrets(&self, _session_id: SessionId) -> crate::Result<Vec<SecretInfo>> {
2750            Ok(Vec::new())
2751        }
2752    }
2753
2754    #[tokio::test]
2755    async fn test_echo_tool() {
2756        let tool = EchoTool;
2757
2758        let result = tool
2759            .execute(serde_json::json!({"message": "Hello, world!"}))
2760            .await;
2761
2762        if let ToolExecutionResult::Success(value) = result {
2763            assert_eq!(
2764                value.get("echoed").unwrap().as_str().unwrap(),
2765                "Hello, world!"
2766            );
2767            assert_eq!(value.get("length").unwrap().as_u64().unwrap(), 13);
2768        } else {
2769            panic!("Expected success");
2770        }
2771    }
2772
2773    #[tokio::test]
2774    async fn test_failing_tool_with_tool_error() {
2775        let tool = FailingTool::with_tool_error("Something went wrong");
2776
2777        let result = tool.execute(serde_json::json!({})).await;
2778
2779        if let ToolExecutionResult::ToolError(msg) = result {
2780            assert_eq!(msg, "Something went wrong");
2781        } else {
2782            panic!("Expected tool error");
2783        }
2784    }
2785
2786    #[tokio::test]
2787    async fn test_failing_tool_with_internal_error() {
2788        let tool = FailingTool::with_internal_error("Database connection failed");
2789
2790        let result = tool.execute(serde_json::json!({})).await;
2791
2792        if let ToolExecutionResult::InternalError(err) = result {
2793            assert_eq!(err.message, "Database connection failed");
2794        } else {
2795            panic!("Expected internal error");
2796        }
2797    }
2798
2799    #[tokio::test]
2800    async fn test_tool_result_conversion() {
2801        // Success
2802        let result = ToolExecutionResult::success(serde_json::json!({"value": 42}));
2803        let tool_result = result.into_tool_result("call_1", "test_tool");
2804        assert!(tool_result.error.is_none());
2805        assert_eq!(tool_result.result.unwrap()["value"], 42);
2806
2807        // Tool error (packaged as {"error": "..."} in result field, also sets error)
2808        let result = ToolExecutionResult::tool_error("Invalid input");
2809        let tool_result = result.into_tool_result("call_2", "test_tool");
2810        assert_eq!(tool_result.error.as_deref(), Some("Invalid input"));
2811        assert_eq!(
2812            tool_result.result.unwrap(),
2813            serde_json::json!({"error": "Invalid input"})
2814        );
2815
2816        // Internal error (packaged as {"error": "..."} with generic message)
2817        let result = ToolExecutionResult::internal_error_msg("Secret database error");
2818        let tool_result = result.into_tool_result("call_3", "test_tool");
2819        assert_eq!(
2820            tool_result.error.as_deref(),
2821            Some("An internal error occurred while executing the tool")
2822        );
2823        assert_eq!(
2824            tool_result.result.unwrap(),
2825            serde_json::json!({"error": "An internal error occurred while executing the tool"})
2826        );
2827    }
2828
2829    #[tokio::test]
2830    async fn test_tool_registry() {
2831        let mut registry = ToolRegistry::new();
2832        registry.register(GetCurrentTimeTool);
2833        registry.register(EchoTool);
2834
2835        assert_eq!(registry.len(), 2);
2836        assert!(registry.has("get_current_time"));
2837        assert!(registry.has("echo"));
2838        assert!(!registry.has("nonexistent"));
2839
2840        let definitions = registry.tool_definitions();
2841        assert_eq!(definitions.len(), 2);
2842    }
2843
2844    #[tokio::test]
2845    async fn test_tool_registry_builder() {
2846        let registry = ToolRegistry::builder()
2847            .tool(GetCurrentTimeTool)
2848            .tool(EchoTool)
2849            .build();
2850
2851        assert_eq!(registry.len(), 2);
2852    }
2853
2854    #[test]
2855    fn test_tool_display_name_in_definition() {
2856        // GetCurrentTimeTool has display_name "Get Current Time"
2857        let tool = GetCurrentTimeTool;
2858        assert_eq!(tool.display_name(), Some("Get Current Time"));
2859
2860        let def = tool.to_definition();
2861        assert_eq!(def.display_name(), Some("Get Current Time"));
2862    }
2863
2864    #[test]
2865    fn test_success_with_raw_output_object_preserves_shape() {
2866        let res = ToolExecutionResult::success_with_raw_output(
2867            serde_json::json!({"stdout": "hello"}),
2868            "raw stdout bytes".to_string(),
2869        );
2870        let tr = res.into_tool_result("call_1", "demo");
2871        assert_eq!(tr.result.as_ref().unwrap()["stdout"], "hello");
2872        assert!(
2873            tr.result
2874                .as_ref()
2875                .unwrap()
2876                .as_object()
2877                .unwrap()
2878                .get("_raw_output")
2879                .is_none(),
2880            "sidecar key must not leak to the LLM-visible result"
2881        );
2882        assert_eq!(tr.raw_output.as_deref(), Some("raw stdout bytes"));
2883    }
2884
2885    #[test]
2886    fn test_success_with_raw_output_scalar_unwraps_to_string() {
2887        let res = ToolExecutionResult::success_with_raw_output(
2888            "compact summary".to_string(),
2889            "full output bytes".to_string(),
2890        );
2891        let tr = res.into_tool_result("call_1", "demo");
2892        assert_eq!(
2893            tr.result,
2894            Some(serde_json::Value::String("compact summary".into()))
2895        );
2896        assert_eq!(tr.raw_output.as_deref(), Some("full output bytes"));
2897    }
2898
2899    #[test]
2900    fn test_success_result_with_raw_output_scalar_key_is_not_unwrapped() {
2901        let res = ToolExecutionResult::success(
2902            serde_json::json!({"_raw_output_scalar": "user_value", "kept": true}),
2903        );
2904        let tr = res.into_tool_result("call_1", "demo");
2905        assert_eq!(
2906            tr.result,
2907            Some(serde_json::json!({"_raw_output_scalar": "user_value", "kept": true}))
2908        );
2909        assert_eq!(tr.raw_output, None);
2910    }
2911
2912    #[test]
2913    fn test_success_result_with_only_raw_output_scalar_key_is_not_unwrapped() {
2914        // Single-key object with _raw_output_scalar must not be mistaken for a
2915        // success_with_raw_output carrier when raw_output is absent.
2916        let res = ToolExecutionResult::success(serde_json::json!({"_raw_output_scalar": "v"}));
2917        let tr = res.into_tool_result("call_1", "demo");
2918        assert_eq!(
2919            tr.result,
2920            Some(serde_json::json!({"_raw_output_scalar": "v"}))
2921        );
2922        assert_eq!(tr.raw_output, None);
2923    }
2924
2925    #[test]
2926    fn test_echo_tool_display_name() {
2927        let tool = EchoTool;
2928        assert_eq!(tool.display_name(), Some("Echo"));
2929
2930        let def = tool.to_definition();
2931        assert_eq!(def.display_name(), Some("Echo"));
2932    }
2933
2934    #[test]
2935    fn test_all_default_tools_have_display_names() {
2936        let registry = ToolRegistry::with_defaults();
2937        let definitions = registry.tool_definitions();
2938
2939        for def in &definitions {
2940            assert!(
2941                def.display_name().is_some(),
2942                "Tool '{}' should have a display_name",
2943                def.name()
2944            );
2945        }
2946    }
2947
2948    #[tokio::test]
2949    async fn test_tool_registry_as_executor() {
2950        let mut registry = ToolRegistry::new();
2951        registry.register(EchoTool);
2952
2953        let tool_call = ToolCall {
2954            id: "call_1".to_string(),
2955            name: "echo".to_string(),
2956            arguments: serde_json::json!({"message": "test"}),
2957        };
2958
2959        let tool_def = registry.get("echo").unwrap().to_definition();
2960        let result = registry.execute(&tool_call, &tool_def).await.unwrap();
2961
2962        assert!(result.error.is_none());
2963        assert_eq!(result.result.unwrap()["echoed"], "test");
2964    }
2965
2966    #[test]
2967    fn test_tool_to_definition() {
2968        let tool = GetCurrentTimeTool;
2969        let def = tool.to_definition();
2970
2971        let ToolDefinition::Builtin(builtin) = def else {
2972            panic!("expected Builtin variant");
2973        };
2974        assert_eq!(builtin.name, "get_current_time");
2975        assert_eq!(builtin.policy, ToolPolicy::Auto);
2976    }
2977
2978    #[test]
2979    fn test_with_defaults_has_expected_tools() {
2980        let registry = ToolRegistry::with_defaults();
2981
2982        // Core tools
2983        assert!(
2984            registry.has("get_current_time"),
2985            "should have get_current_time"
2986        );
2987        assert!(registry.has("echo"), "should have echo");
2988        // spawn_background is contributed by the background_execution
2989        // capability (auto-activated) — it must NOT be in defaults.
2990        assert!(
2991            !registry.has("spawn_background"),
2992            "spawn_background must NOT be in defaults — it comes from the \
2993             background_execution capability"
2994        );
2995        assert!(
2996            registry.has("report_progress"),
2997            "should have report_progress"
2998        );
2999
3000        // TestMath capability tools
3001        assert!(registry.has("add"), "should have add");
3002        assert!(registry.has("subtract"), "should have subtract");
3003        assert!(registry.has("multiply"), "should have multiply");
3004        assert!(registry.has("divide"), "should have divide");
3005
3006        // TestWeather capability tools
3007        assert!(registry.has("get_weather"), "should have get_weather");
3008        assert!(registry.has("get_forecast"), "should have get_forecast");
3009
3010        // TaskList capability tools
3011        assert!(registry.has("write_todos"), "should have write_todos");
3012
3013        // FileSystem capability tools
3014        assert!(registry.has("read_file"), "should have read_file");
3015        assert!(registry.has("write_file"), "should have write_file");
3016        assert!(registry.has("edit_file"), "should have edit_file");
3017        assert!(registry.has("list_directory"), "should have list_directory");
3018        assert!(registry.has("grep_files"), "should have grep_files");
3019        assert!(registry.has("delete_file"), "should have delete_file");
3020        assert!(registry.has("stat_file"), "should have stat_file");
3021
3022        // WebFetch capability tools
3023        assert!(registry.has("web_fetch"), "should have web_fetch");
3024
3025        // Total count: 19 - 1 (spawn_background, moved to capability) = 18
3026        assert_eq!(registry.len(), 18, "should have 18 default tools");
3027    }
3028
3029    #[tokio::test]
3030    async fn test_with_defaults_tools_are_executable() {
3031        let registry = ToolRegistry::with_defaults();
3032
3033        // Test echo tool execution
3034        let tool_call = ToolCall {
3035            id: "call_1".to_string(),
3036            name: "echo".to_string(),
3037            arguments: serde_json::json!({"message": "hello from defaults"}),
3038        };
3039
3040        let tool_def = registry.get("echo").unwrap().to_definition();
3041        let result = registry.execute(&tool_call, &tool_def).await.unwrap();
3042
3043        assert!(result.error.is_none());
3044        assert_eq!(result.result.unwrap()["echoed"], "hello from defaults");
3045    }
3046
3047    #[tokio::test]
3048    async fn test_with_defaults_math_tools() {
3049        let registry = ToolRegistry::with_defaults();
3050
3051        // Test add tool
3052        let tool_call = ToolCall {
3053            id: "call_add".to_string(),
3054            name: "add".to_string(),
3055            arguments: serde_json::json!({"a": 5, "b": 3}),
3056        };
3057
3058        let tool_def = registry.get("add").unwrap().to_definition();
3059        let result = registry.execute(&tool_call, &tool_def).await.unwrap();
3060
3061        assert!(result.error.is_none());
3062        // AddTool returns floats, so compare as f64
3063        assert_eq!(result.result.unwrap()["result"].as_f64().unwrap(), 8.0);
3064    }
3065
3066    /// Regression: with_defaults() must NOT include capability-provided tools like
3067    /// 'bash'. These tools come from capabilities and must be registered separately.
3068    /// If bash were in defaults, the harness capability fallback would be masked.
3069    #[test]
3070    fn test_with_defaults_excludes_capability_only_tools() {
3071        let registry = ToolRegistry::with_defaults();
3072
3073        // bash comes from bashkit_shell capability, not defaults
3074        assert!(
3075            !registry.has("bash"),
3076            "bash must not be in defaults — it comes from bashkit_shell capability"
3077        );
3078        // kv_store/secret_store come from session_storage capability
3079        assert!(
3080            !registry.has("kv_store"),
3081            "kv_store must not be in defaults — it comes from session_storage capability"
3082        );
3083        // spawn_background comes from background_execution capability and is
3084        // auto-activated by `collect_capabilities_with_configs` when a
3085        // background-capable tool is present (see EVE-501).
3086        assert!(
3087            !registry.has("spawn_background"),
3088            "spawn_background must not be in defaults — it comes from the \
3089             background_execution capability (auto-activated by tool hints)"
3090        );
3091    }
3092
3093    #[tokio::test]
3094    async fn test_spawn_background_executes_and_signals_session() {
3095        let session_id = SessionId::new();
3096        let file_store = Arc::new(TestFileStore::default());
3097        let platform_store = Arc::new(TestPlatformStore::default());
3098        let storage_store = Arc::new(NoopStorageStore);
3099        let task_registry = Arc::new(InMemoryTaskRegistry::default());
3100        let tool_registry = ToolRegistry::builder()
3101            .tool(SpawnBackgroundTool)
3102            .tool(TestBackgroundTool)
3103            .build();
3104
3105        let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3106            .with_tool_registry(Arc::new(tool_registry))
3107            .with_platform_store(platform_store.clone())
3108            .with_session_task_registry(task_registry.clone());
3109
3110        let tool = SpawnBackgroundTool;
3111        let result = tool
3112            .execute_with_context(
3113                json!({
3114                    "tool": "test_background",
3115                    "args": { "summary": "Background complete" }
3116                }),
3117                &context,
3118            )
3119            .await;
3120
3121        let ToolExecutionResult::Success(value) = result else {
3122            panic!("spawn_background should succeed");
3123        };
3124        let run_id = value["run_id"].as_str().unwrap().to_string();
3125        let task_id = value["task_id"].as_str().unwrap().to_string();
3126
3127        tokio::time::timeout(std::time::Duration::from_secs(2), async {
3128            loop {
3129                if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3130                    && task.state == crate::session_task::SessionTaskState::Succeeded
3131                {
3132                    break task;
3133                }
3134                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3135            }
3136        })
3137        .await
3138        .expect("background run should complete");
3139        let _ = run_id; // still available in result json
3140
3141        let messages = platform_store.sent_messages.lock().unwrap().clone();
3142        assert_eq!(messages.len(), 1);
3143        assert!(messages[0].contains("Background run completed"));
3144
3145        let log_file = file_store
3146            .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3147            .await
3148            .unwrap()
3149            .expect("log file");
3150        assert!(
3151            log_file
3152                .content
3153                .as_deref()
3154                .unwrap_or_default()
3155                .contains("hello from background")
3156        );
3157    }
3158
3159    #[tokio::test]
3160    async fn test_spawn_background_persists_failure_artifacts() {
3161        let session_id = SessionId::new();
3162        let file_store = Arc::new(TestFileStore::default());
3163        let storage_store = Arc::new(NoopStorageStore);
3164        let task_registry = Arc::new(InMemoryTaskRegistry::default());
3165        let tool_registry = ToolRegistry::builder()
3166            .tool(SpawnBackgroundTool)
3167            .tool(TestFailingBackgroundTool)
3168            .build();
3169
3170        let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3171            .with_tool_registry(Arc::new(tool_registry))
3172            .with_session_task_registry(task_registry.clone());
3173
3174        let result = SpawnBackgroundTool
3175            .execute_with_context(
3176                json!({
3177                    "tool": "test_background_fail",
3178                    "args": {}
3179                }),
3180                &context,
3181            )
3182            .await;
3183
3184        let ToolExecutionResult::Success(value) = result else {
3185            panic!("spawn_background should succeed");
3186        };
3187        let run_id = value["run_id"].as_str().unwrap().to_string();
3188        let task_id = value["task_id"].as_str().unwrap().to_string();
3189
3190        tokio::time::timeout(std::time::Duration::from_secs(2), async {
3191            loop {
3192                if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3193                    && task.state == crate::session_task::SessionTaskState::Failed
3194                {
3195                    break task;
3196                }
3197                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3198            }
3199        })
3200        .await
3201        .expect("background run should fail");
3202        let _ = run_id;
3203
3204        let log_file = file_store
3205            .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3206            .await
3207            .unwrap()
3208            .expect("log file");
3209        assert!(
3210            log_file
3211                .content
3212                .as_deref()
3213                .unwrap_or_default()
3214                .contains("background failed")
3215        );
3216
3217        let result_file = file_store
3218            .read_file(session_id, &format!("/.background/{run_id}/result.json"))
3219            .await
3220            .unwrap()
3221            .expect("result file");
3222        let result_json: Value =
3223            serde_json::from_str(result_file.content.as_deref().unwrap_or_default())
3224                .expect("valid json");
3225        assert_eq!(result_json["status"], "failed");
3226        assert_eq!(result_json["error"], "boom");
3227    }
3228
3229    #[tokio::test]
3230    async fn test_spawn_background_rejects_when_session_active_run_limit_reached() {
3231        let session_id = SessionId::new();
3232        let file_store = Arc::new(TestFileStore::default());
3233        let storage_store = Arc::new(NoopStorageStore);
3234        let task_registry = Arc::new(InMemoryTaskRegistry::default());
3235        let release = StdArc::new(AtomicBool::new(false));
3236        let tool_registry = ToolRegistry::builder()
3237            .tool(SpawnBackgroundTool)
3238            .tool(BlockingBackgroundTool {
3239                release: release.clone(),
3240            })
3241            .build();
3242
3243        let context = ToolContext::with_stores(session_id, file_store, storage_store)
3244            .with_tool_registry(Arc::new(tool_registry))
3245            .with_session_task_registry(task_registry.clone());
3246
3247        let mut task_ids = Vec::new();
3248        for _ in 0..MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION {
3249            let result = SpawnBackgroundTool
3250                .execute_with_context(
3251                    json!({
3252                        "tool": "test_background_blocking",
3253                        "args": {}
3254                    }),
3255                    &context,
3256                )
3257                .await;
3258
3259            let ToolExecutionResult::Success(value) = result else {
3260                panic!("background run below the session limit should start");
3261            };
3262            task_ids.push(value["task_id"].as_str().unwrap().to_string());
3263        }
3264
3265        // Wait for all tasks to be running (semaphore acquired).
3266        tokio::time::timeout(std::time::Duration::from_secs(2), async {
3267            loop {
3268                let running = task_registry
3269                    .list(
3270                        session_id,
3271                        Some(&crate::session_task::SessionTaskFilter {
3272                            kind: Some(crate::session_task::TASK_KIND_BACKGROUND_TOOL.to_string()),
3273                            state: Some(crate::session_task::SessionTaskState::Running),
3274                        }),
3275                    )
3276                    .await
3277                    .unwrap();
3278                if running.len() == MAX_ACTIVE_BACKGROUND_RUNS_PER_SESSION {
3279                    break;
3280                }
3281                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3282            }
3283        })
3284        .await
3285        .expect("background runs should become running");
3286
3287        let result = SpawnBackgroundTool
3288            .execute_with_context(
3289                json!({
3290                    "tool": "test_background_blocking",
3291                    "args": {}
3292                }),
3293                &context,
3294            )
3295            .await;
3296
3297        let ToolExecutionResult::ToolError(message) = result else {
3298            release.store(true, Ordering::SeqCst);
3299            panic!("spawn_background should reject once the session limit is reached");
3300        };
3301        assert!(message.contains("active background runs per session"));
3302
3303        release.store(true, Ordering::SeqCst);
3304        tokio::time::timeout(std::time::Duration::from_secs(2), async {
3305            for task_id in task_ids {
3306                loop {
3307                    if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3308                        && task.state == crate::session_task::SessionTaskState::Succeeded
3309                    {
3310                        break;
3311                    }
3312                    tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3313                }
3314            }
3315        })
3316        .await
3317        .expect("blocking background runs should complete after release");
3318
3319        // The permit drop is enqueued in the spawned task after it marks the
3320        // resource Completed, so the cache entry may still exist briefly once
3321        // we observe Completed status.  Poll until pruned rather than asserting
3322        // immediately to avoid a race.
3323        tokio::time::timeout(std::time::Duration::from_secs(1), async {
3324            loop {
3325                if !has_session_background_permits(session_id) {
3326                    break;
3327                }
3328                tokio::time::sleep(std::time::Duration::from_millis(5)).await;
3329            }
3330        })
3331        .await
3332        .expect("completed background runs should prune their per-session permit cache entry");
3333    }
3334
3335    #[tokio::test]
3336    async fn test_spawn_background_requires_task_registry() {
3337        let session_id = SessionId::new();
3338        let file_store = Arc::new(TestFileStore::default());
3339        let storage_store = Arc::new(NoopStorageStore);
3340        let tool_registry = ToolRegistry::builder()
3341            .tool(SpawnBackgroundTool)
3342            .tool(TestBackgroundTool)
3343            .build();
3344
3345        // No task_registry wired — should fail.
3346        let context = ToolContext::with_stores(session_id, file_store, storage_store)
3347            .with_tool_registry(Arc::new(tool_registry));
3348
3349        let result = SpawnBackgroundTool
3350            .execute_with_context(
3351                json!({
3352                    "tool": "test_background",
3353                    "args": {}
3354                }),
3355                &context,
3356            )
3357            .await;
3358
3359        let ToolExecutionResult::ToolError(message) = result else {
3360            panic!("spawn_background should reject missing task registry");
3361        };
3362        assert!(message.contains("Session task registry not available"));
3363    }
3364
3365    #[tokio::test]
3366    async fn test_spawn_background_requires_file_store() {
3367        let session_id = SessionId::new();
3368        let storage_store = Arc::new(NoopStorageStore);
3369        let task_registry = Arc::new(InMemoryTaskRegistry::default());
3370        let tool_registry = ToolRegistry::builder()
3371            .tool(SpawnBackgroundTool)
3372            .tool(TestBackgroundTool)
3373            .build();
3374
3375        // No file_store wired — should fail.
3376        let context = ToolContext::with_storage_store(session_id, storage_store)
3377            .with_tool_registry(Arc::new(tool_registry))
3378            .with_session_task_registry(task_registry);
3379
3380        let result = SpawnBackgroundTool
3381            .execute_with_context(
3382                json!({
3383                    "tool": "test_background",
3384                    "args": {}
3385                }),
3386                &context,
3387            )
3388            .await;
3389
3390        let ToolExecutionResult::ToolError(message) = result else {
3391            panic!("spawn_background should reject missing file store");
3392        };
3393        assert!(message.contains("Session file store not available"));
3394    }
3395
3396    #[tokio::test]
3397    async fn test_spawn_background_caps_output_log_size() {
3398        let session_id = SessionId::new();
3399        let file_store = Arc::new(TestFileStore::default());
3400        let storage_store = Arc::new(NoopStorageStore);
3401        let task_registry = Arc::new(InMemoryTaskRegistry::default());
3402        let tool_registry = ToolRegistry::builder()
3403            .tool(SpawnBackgroundTool)
3404            .tool(TestLargeOutputBackgroundTool)
3405            .build();
3406
3407        let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3408            .with_tool_registry(Arc::new(tool_registry))
3409            .with_session_task_registry(task_registry.clone());
3410
3411        let result = SpawnBackgroundTool
3412            .execute_with_context(
3413                json!({
3414                    "tool": "test_background_large_output",
3415                    "args": {}
3416                }),
3417                &context,
3418            )
3419            .await;
3420
3421        let ToolExecutionResult::Success(value) = result else {
3422            panic!("spawn_background should succeed");
3423        };
3424        let run_id = value["run_id"].as_str().unwrap().to_string();
3425        let task_id = value["task_id"].as_str().unwrap().to_string();
3426
3427        tokio::time::timeout(std::time::Duration::from_secs(2), async {
3428            loop {
3429                if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3430                    && task.state == crate::session_task::SessionTaskState::Succeeded
3431                {
3432                    break;
3433                }
3434                tokio::time::sleep(std::time::Duration::from_millis(20)).await;
3435            }
3436        })
3437        .await
3438        .expect("background run should complete");
3439        let _ = run_id;
3440
3441        let log_content = file_store
3442            .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3443            .await
3444            .unwrap()
3445            .expect("log file")
3446            .content
3447            .unwrap_or_default();
3448
3449        assert!(log_content.contains("[system] background output truncated"));
3450        assert!(log_content.chars().count() <= MAX_BACKGROUND_OUTPUT_LOG_CHARS + 128);
3451    }
3452
3453    #[tokio::test]
3454    async fn test_spawn_background_can_create_scheduled_monitor() {
3455        let session_id = SessionId::new();
3456        let schedule_store = Arc::new(TestScheduleStore::default());
3457        let storage_store = Arc::new(NoopStorageStore);
3458        let tool_registry = ToolRegistry::builder()
3459            .tool(SpawnBackgroundTool)
3460            .tool(TestBackgroundTool)
3461            .build();
3462
3463        let context = ToolContext::with_storage_store(session_id, storage_store)
3464            .with_tool_registry(Arc::new(tool_registry))
3465            .with_schedule_store(schedule_store.clone());
3466
3467        let result = SpawnBackgroundTool
3468            .execute_with_context(
3469                json!({
3470                    "tool": "test_background",
3471                    "title": "Watch PR 1319",
3472                    "args": { "summary": "Background complete" },
3473                    "schedule": {
3474                        "cron_expression": "*/10 * * * *",
3475                        "timezone": "America/Chicago"
3476                    }
3477                }),
3478                &context,
3479            )
3480            .await;
3481
3482        let ToolExecutionResult::Success(value) = result else {
3483            panic!("spawn_background should create a schedule: {result:?}");
3484        };
3485
3486        assert_eq!(value["status"], "scheduled");
3487        assert_eq!(value["title"], "Watch PR 1319");
3488        assert_eq!(value["cron_expression"], "*/10 * * * *");
3489        assert_eq!(value["timezone"], "America/Chicago");
3490
3491        let schedules = schedule_store.list_schedules(session_id).await.unwrap();
3492        assert_eq!(schedules.len(), 1);
3493        assert_eq!(
3494            schedules[0].cron_expression.as_deref(),
3495            Some("*/10 * * * *")
3496        );
3497        assert!(schedules[0].description.contains("Monitor: Watch PR 1319"));
3498        assert!(
3499            schedules[0]
3500                .description
3501                .contains("\"summary\": \"Background complete\"")
3502        );
3503    }
3504
3505    #[tokio::test]
3506    async fn test_spawn_background_rejects_invalid_scheduled_at() {
3507        let session_id = SessionId::new();
3508        let storage_store = Arc::new(NoopStorageStore);
3509        let tool_registry = ToolRegistry::builder()
3510            .tool(SpawnBackgroundTool)
3511            .tool(TestBackgroundTool)
3512            .build();
3513        let context = ToolContext::with_storage_store(session_id, storage_store)
3514            .with_tool_registry(Arc::new(tool_registry));
3515
3516        let result = SpawnBackgroundTool
3517            .execute_with_context(
3518                json!({
3519                    "tool": "test_background",
3520                    "args": {},
3521                    "schedule": {
3522                        "scheduled_at": "tomorrow at noon"
3523                    }
3524                }),
3525                &context,
3526            )
3527            .await;
3528
3529        let ToolExecutionResult::ToolError(message) = result else {
3530            panic!("spawn_background should reject invalid scheduled_at");
3531        };
3532        assert!(message.contains("scheduled_at must be RFC3339"));
3533    }
3534
3535    #[tokio::test]
3536    async fn test_spawn_background_rejects_ambiguous_schedule_shape() {
3537        let session_id = SessionId::new();
3538        let storage_store = Arc::new(NoopStorageStore);
3539        let tool_registry = ToolRegistry::builder()
3540            .tool(SpawnBackgroundTool)
3541            .tool(TestBackgroundTool)
3542            .build();
3543        let context = ToolContext::with_storage_store(session_id, storage_store)
3544            .with_tool_registry(Arc::new(tool_registry));
3545
3546        let result = SpawnBackgroundTool
3547            .execute_with_context(
3548                json!({
3549                    "tool": "test_background",
3550                    "args": {},
3551                    "schedule": {
3552                        "cron_expression": "*/10 * * * *",
3553                        "scheduled_at": "2026-04-16T15:30:00Z"
3554                    }
3555                }),
3556                &context,
3557            )
3558            .await;
3559
3560        let ToolExecutionResult::ToolError(message) = result else {
3561            panic!("spawn_background should reject ambiguous schedule shape");
3562        };
3563        assert!(message.contains("must not include both cron_expression and scheduled_at"));
3564    }
3565
3566    // =========================================================================
3567    // Cooperative cancellation tests
3568    // =========================================================================
3569
3570    #[test]
3571    fn test_is_canceled_outcome_detects_sentinel() {
3572        // The sentinel produced by the cancel-watcher branch.
3573        let sentinel: std::result::Result<BackgroundOutcome, ToolExecutionResult> = Err(
3574            ToolExecutionResult::ToolError(BACKGROUND_CANCEL_SENTINEL.to_string()),
3575        );
3576        assert!(is_canceled_outcome(&sentinel));
3577    }
3578
3579    #[test]
3580    fn test_is_canceled_outcome_does_not_match_other_errors() {
3581        let other_err: std::result::Result<BackgroundOutcome, ToolExecutionResult> =
3582            Err(ToolExecutionResult::ToolError("boom".to_string()));
3583        assert!(!is_canceled_outcome(&other_err));
3584
3585        let success: std::result::Result<BackgroundOutcome, ToolExecutionResult> =
3586            Ok(BackgroundOutcome {
3587                summary: "ok".to_string(),
3588                result: json!({"ok": true}),
3589                raw_output: None,
3590            });
3591        assert!(!is_canceled_outcome(&success));
3592    }
3593
3594    /// A background tool that sleeps indefinitely, allowing the test to exercise
3595    /// cancel via the cancel-watcher without actually waiting forever.
3596    #[derive(Default)]
3597    struct SleepingBackgroundTool;
3598
3599    #[async_trait]
3600    impl BackgroundExecutableTool for SleepingBackgroundTool {
3601        async fn execute_background(
3602            &self,
3603            _arguments: Value,
3604            _context: ToolContext,
3605            sink: Arc<dyn BackgroundEventSink>,
3606        ) -> std::result::Result<BackgroundOutcome, ToolExecutionResult> {
3607            sink.status("Sleeping forever")
3608                .await
3609                .map_err(ToolExecutionResult::internal_error)?;
3610            // Sleep for a very long time; the cancel-watcher will win the select.
3611            tokio::time::sleep(std::time::Duration::from_secs(3600)).await;
3612            Ok(BackgroundOutcome {
3613                summary: "should not reach here".to_string(),
3614                result: json!({}),
3615                raw_output: None,
3616            })
3617        }
3618    }
3619
3620    #[async_trait]
3621    impl Tool for SleepingBackgroundTool {
3622        fn name(&self) -> &str {
3623            "test_background_sleeping"
3624        }
3625
3626        fn display_name(&self) -> Option<&str> {
3627            Some("Test Background Sleeping")
3628        }
3629
3630        fn description(&self) -> &str {
3631            "background test tool that sleeps indefinitely"
3632        }
3633
3634        fn parameters_schema(&self) -> Value {
3635            json!({
3636                "type": "object",
3637                "properties": {}
3638            })
3639        }
3640
3641        async fn execute(&self, _arguments: Value) -> ToolExecutionResult {
3642            ToolExecutionResult::tool_error("foreground unsupported")
3643        }
3644
3645        fn hints(&self) -> ToolHints {
3646            ToolHints::default().with_supports_background(true)
3647        }
3648
3649        fn as_background_executable(&self) -> Option<&dyn BackgroundExecutableTool> {
3650            Some(self)
3651        }
3652    }
3653
3654    // Minimal in-memory SessionTaskRegistry for cancel tests.
3655    // (Mirrors the double in capabilities/session_tasks.rs — kept local because
3656    //  that module is private.)
3657    #[derive(Default)]
3658    struct InMemoryTaskRegistry {
3659        tasks: Mutex<HashMap<String, crate::session_task::SessionTask>>,
3660    }
3661
3662    #[async_trait]
3663    impl crate::session_task::SessionTaskRegistry for InMemoryTaskRegistry {
3664        async fn create(
3665            &self,
3666            input: crate::session_task::CreateSessionTask,
3667        ) -> crate::Result<crate::session_task::SessionTask> {
3668            let mut tasks = self.tasks.lock().unwrap();
3669            if let Some(id) = &input.id
3670                && let Some(existing) = tasks.get(id)
3671            {
3672                return Ok(existing.clone());
3673            }
3674            let task = crate::session_task::new_session_task(input, chrono::Utc::now());
3675            tasks.insert(task.id.clone(), task.clone());
3676            Ok(task)
3677        }
3678
3679        async fn update(
3680            &self,
3681            _session_id: SessionId,
3682            task_id: &str,
3683            update: crate::session_task::SessionTaskUpdate,
3684        ) -> crate::Result<Option<crate::session_task::SessionTask>> {
3685            let mut tasks = self.tasks.lock().unwrap();
3686            let Some(task) = tasks.get_mut(task_id) else {
3687                return Ok(None);
3688            };
3689            crate::session_task::apply_task_update(task, update, chrono::Utc::now());
3690            Ok(Some(task.clone()))
3691        }
3692
3693        async fn get(
3694            &self,
3695            _session_id: SessionId,
3696            task_id: &str,
3697        ) -> crate::Result<Option<crate::session_task::SessionTask>> {
3698            Ok(self.tasks.lock().unwrap().get(task_id).cloned())
3699        }
3700
3701        async fn list(
3702            &self,
3703            _session_id: SessionId,
3704            filter: Option<&crate::session_task::SessionTaskFilter>,
3705        ) -> crate::Result<Vec<crate::session_task::SessionTask>> {
3706            let tasks = self.tasks.lock().unwrap();
3707            Ok(tasks
3708                .values()
3709                .filter(|task| {
3710                    filter.is_none_or(|f| {
3711                        f.kind.as_deref().is_none_or(|kind| task.kind == kind)
3712                            && f.state.is_none_or(|state| task.state == state)
3713                    })
3714                })
3715                .cloned()
3716                .collect())
3717        }
3718
3719        async fn request_cancel(
3720            &self,
3721            _session_id: SessionId,
3722            task_id: &str,
3723        ) -> crate::Result<Option<crate::session_task::SessionTask>> {
3724            let mut tasks = self.tasks.lock().unwrap();
3725            let Some(task) = tasks.get_mut(task_id) else {
3726                return Ok(None);
3727            };
3728            task.cancel_requested_at
3729                .get_or_insert_with(chrono::Utc::now);
3730            task.updated_at = chrono::Utc::now();
3731            Ok(Some(task.clone()))
3732        }
3733
3734        async fn record_message(
3735            &self,
3736            _session_id: SessionId,
3737            task_id: &str,
3738            message: crate::session_task::NewTaskMessage,
3739        ) -> crate::Result<crate::session_task::TaskMessage> {
3740            let tasks = self.tasks.lock().unwrap();
3741            let _task = tasks
3742                .get(task_id)
3743                .ok_or_else(|| crate::AgentLoopError::tool(format!("no task {task_id}")))?;
3744            Ok(crate::session_task::TaskMessage {
3745                id: crate::session_task::generate_task_message_id(),
3746                task_id: task_id.to_string(),
3747                direction: message.direction,
3748                content: message.content,
3749                in_reply_to: message.in_reply_to,
3750                created_at: chrono::Utc::now(),
3751            })
3752        }
3753
3754        async fn list_messages(
3755            &self,
3756            _session_id: SessionId,
3757            _task_id: &str,
3758            _limit: Option<u32>,
3759            _after_id: Option<&str>,
3760        ) -> crate::Result<Vec<crate::session_task::TaskMessage>> {
3761            Ok(Vec::new())
3762        }
3763    }
3764
3765    /// End-to-end cancel test: spawn a long-sleeping background tool, then call
3766    /// `request_cancel` on the task registry, and assert the task ends Canceled.
3767    #[tokio::test]
3768    async fn test_cancel_background_run_via_task_registry() {
3769        let session_id = SessionId::new();
3770        let file_store = Arc::new(TestFileStore::default());
3771        let storage_store = Arc::new(NoopStorageStore);
3772        let task_registry = Arc::new(InMemoryTaskRegistry::default());
3773
3774        let tool_registry = ToolRegistry::builder()
3775            .tool(SpawnBackgroundTool)
3776            .tool(SleepingBackgroundTool)
3777            .build();
3778
3779        let context = ToolContext::with_stores(session_id, file_store.clone(), storage_store)
3780            .with_tool_registry(Arc::new(tool_registry))
3781            .with_session_task_registry(task_registry.clone());
3782
3783        let result = SpawnBackgroundTool
3784            .execute_with_context(
3785                json!({
3786                    "tool": "test_background_sleeping",
3787                    "args": {},
3788                    "signal_on_completion": false
3789                }),
3790                &context,
3791            )
3792            .await;
3793
3794        let ToolExecutionResult::Success(value) = result else {
3795            panic!("spawn_background should succeed");
3796        };
3797        let run_id = value["run_id"].as_str().unwrap().to_string();
3798        let task_id = value["task_id"].as_str().unwrap().to_string();
3799
3800        // Wait until the background task is Running (heartbeat loop started).
3801        tokio::time::timeout(std::time::Duration::from_secs(5), async {
3802            loop {
3803                // Wait for a heartbeat to confirm the watcher is live.
3804                if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3805                    && task.heartbeat_at.is_some()
3806                {
3807                    break;
3808                }
3809                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3810            }
3811        })
3812        .await
3813        .expect("background run should start and send at least one heartbeat");
3814
3815        // Request cancel.
3816        task_registry
3817            .request_cancel(session_id, &task_id)
3818            .await
3819            .expect("request_cancel should succeed");
3820
3821        // Wait for the task to reach Canceled state.
3822        tokio::time::timeout(std::time::Duration::from_secs(10), async {
3823            loop {
3824                if let Ok(Some(task)) = task_registry.get(session_id, &task_id).await
3825                    && task.state == crate::session_task::SessionTaskState::Canceled
3826                {
3827                    break task;
3828                }
3829                tokio::time::sleep(std::time::Duration::from_millis(50)).await;
3830            }
3831        })
3832        .await
3833        .expect("background task should reach Canceled state");
3834
3835        // Verify result.json and output.log were written.
3836        let result_file = file_store
3837            .read_file(session_id, &format!("/.background/{run_id}/result.json"))
3838            .await
3839            .unwrap()
3840            .expect("result.json should exist");
3841        let result_json: Value =
3842            serde_json::from_str(result_file.content.as_deref().unwrap_or_default())
3843                .expect("valid json");
3844        assert_eq!(result_json["status"], "canceled");
3845
3846        let log_file = file_store
3847            .read_file(session_id, &format!("/.background/{run_id}/output.log"))
3848            .await
3849            .unwrap()
3850            .expect("output.log should exist");
3851        assert!(
3852            log_file
3853                .content
3854                .as_deref()
3855                .unwrap_or_default()
3856                .contains("Canceled by request.")
3857        );
3858    }
3859
3860    // -------------------------------------------------------------------------
3861    // reattach_background_run early-guard tests
3862    // -------------------------------------------------------------------------
3863
3864    fn make_reattach_task(spec: serde_json::Value) -> crate::session_task::SessionTask {
3865        use crate::session_task::{SessionTaskState, TaskLinks, TaskWakePolicy};
3866        crate::session_task::SessionTask {
3867            id: "t-reattach".to_string(),
3868            session_id: SessionId::new(),
3869            kind: crate::session_task::TASK_KIND_BACKGROUND_TOOL.to_string(),
3870            display_name: "Reattach test".to_string(),
3871            spec,
3872            state: SessionTaskState::Running,
3873            state_detail: None,
3874            progress: None,
3875            input_request: None,
3876            cancel_requested_at: None,
3877            summary: None,
3878            result_path: None,
3879            artifacts: vec![],
3880            error: None,
3881            attempt: 2,
3882            worker_id: None,
3883            heartbeat_at: None,
3884            links: TaskLinks::default(),
3885            wake_policy: TaskWakePolicy::Silent,
3886            created_at: chrono::Utc::now(),
3887            started_at: None,
3888            finished_at: None,
3889            updated_at: chrono::Utc::now(),
3890        }
3891    }
3892
3893    #[tokio::test]
3894    async fn reattach_fails_with_missing_file_store() {
3895        let session_id = SessionId::new();
3896        // Context with no file_store — only session_task_registry is wired.
3897        let task_registry = Arc::new(InMemoryTaskRegistry::default());
3898        let context =
3899            crate::traits::ToolContext::new(session_id).with_session_task_registry(task_registry);
3900        let task = make_reattach_task(serde_json::json!({
3901            "tool": "get_current_time",
3902            "arguments": {},
3903            "reattachable": true,
3904            "signal_on_completion": true,
3905        }));
3906        let err = reattach_background_run(&task, &context)
3907            .await
3908            .expect_err("should fail without file store");
3909        assert!(
3910            err.to_string().contains("file store"),
3911            "error should mention file store, got: {err}"
3912        );
3913    }
3914
3915    #[tokio::test]
3916    async fn reattach_fails_with_missing_task_registry() {
3917        let session_id = SessionId::new();
3918        let file_store = Arc::new(TestFileStore::default());
3919        let storage_store = Arc::new(NoopStorageStore);
3920        // Context has a file_store but no session_task_registry.
3921        let context =
3922            crate::traits::ToolContext::with_stores(session_id, file_store, storage_store);
3923        let task = make_reattach_task(serde_json::json!({
3924            "tool": "get_current_time",
3925            "arguments": {},
3926            "reattachable": true,
3927            "signal_on_completion": true,
3928        }));
3929        let err = reattach_background_run(&task, &context)
3930            .await
3931            .expect_err("should fail without task registry");
3932        assert!(
3933            err.to_string().contains("task registry"),
3934            "error should mention task registry, got: {err}"
3935        );
3936    }
3937
3938    #[tokio::test]
3939    async fn reattach_fails_with_unknown_tool_name() {
3940        let session_id = SessionId::new();
3941        let file_store = Arc::new(TestFileStore::default());
3942        let storage_store = Arc::new(NoopStorageStore);
3943        let task_registry = Arc::new(InMemoryTaskRegistry::default());
3944        let context =
3945            crate::traits::ToolContext::with_stores(session_id, file_store, storage_store)
3946                .with_session_task_registry(task_registry);
3947        // "test_background" is not in ToolRegistry::with_defaults().
3948        let task = make_reattach_task(serde_json::json!({
3949            "tool": "test_background",
3950            "arguments": {},
3951            "reattachable": true,
3952            "signal_on_completion": true,
3953        }));
3954        let err = reattach_background_run(&task, &context)
3955            .await
3956            .expect_err("should fail for unknown tool");
3957        assert!(
3958            err.to_string().contains("not found in built-in registry"),
3959            "error should mention built-in registry, got: {err}"
3960        );
3961    }
3962
3963    #[tokio::test]
3964    async fn reattach_fails_with_missing_tool_spec_field() {
3965        let session_id = SessionId::new();
3966        let file_store = Arc::new(TestFileStore::default());
3967        let storage_store = Arc::new(NoopStorageStore);
3968        let task_registry = Arc::new(InMemoryTaskRegistry::default());
3969        let context =
3970            crate::traits::ToolContext::with_stores(session_id, file_store, storage_store)
3971                .with_session_task_registry(task_registry);
3972        // Spec has no "tool" field.
3973        let task = make_reattach_task(serde_json::json!({ "reattachable": true }));
3974        let err = reattach_background_run(&task, &context)
3975            .await
3976            .expect_err("should fail with missing tool field");
3977        assert!(
3978            err.to_string().contains("missing 'tool' field"),
3979            "error should mention missing tool field, got: {err}"
3980        );
3981    }
3982}