riglr_core/
tool.rs

1//! Tool orchestration and execution framework.
2//!
3//! Provides the ToolWorker abstraction for coordinating tool execution
4//! with proper error handling, logging, and integration with rig agents.
5//!
6//! This module provides the core abstractions for executing tools in a resilient,
7//! asynchronous manner with support for retries, timeouts, and job queuing.
8//!
9//! ## Overview
10//!
11//! The tool execution system is designed around several key components:
12//!
13//! - **[`Tool`]** - The core trait defining executable tools
14//! - **[`ToolWorker`]** - The execution engine that processes jobs using registered tools
15//! - **[`ExecutionConfig`]** - Configuration for retry behavior, timeouts, and resource limits
16//! - **[`ResourceLimits`]** - Fine-grained control over resource usage per tool type
17//! - **[`WorkerMetrics`]** - Performance and operational metrics for monitoring
18//!
19//! ## Key Features
20//!
21//! ### Resilient Execution
22//! - **Exponential backoff** retry logic with configurable delays
23//! - **Timeout protection** to prevent hanging operations
24//! - **Error classification** to distinguish retriable vs permanent failures
25//! - **Idempotency support** to safely retry operations
26//!
27//! ### Resource Management
28//! - **Semaphore-based limits** for different resource types (RPC calls, HTTP requests)
29//! - **Concurrent execution** with configurable limits per resource
30//! - **Tool-specific resource mapping** based on naming patterns
31//!
32//! ### Monitoring and Observability
33//! - **Comprehensive metrics** tracking success/failure rates
34//! - **Structured logging** with correlation IDs
35//! - **Performance monitoring** with execution timings
36//!
37//! ## Examples
38//!
39//! ### Basic Tool Implementation
40//!
41//! ```ignore
42//! use riglr_core::{Tool, JobResult};
43//! use async_trait::async_trait;
44//! use serde_json::Value;
45//!
46//! struct SimpleCalculator;
47//!
48//! #[async_trait]
49//! impl Tool for SimpleCalculator {
50//!     async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
51//!         let a = params["a"].as_f64().ok_or("Missing parameter 'a'")?;
52//!         let b = params["b"].as_f64().ok_or("Missing parameter 'b'")?;
53//!         let operation = params["op"].as_str().unwrap_or("add");
54//!
55//!         let result = match operation {
56//!             "add" => a + b,
57//!             "multiply" => a * b,
58//!             "divide" => {
59//!                 if b == 0.0 {
60//!                     return Err(ToolError::permanent_string("Division by zero", "Cannot divide by zero"));
61//!                 }
62//!                 a / b
63//!             }
64//!             _ => return Err(ToolError::permanent_string("Unknown operation", "Operation not supported")),
65//!         };
66//!
67//!         Ok(JobResult::success(&result)?)
68//!     }
69//!
70//!     fn name(&self) -> &str {
71//!         "calculator"
72//!     }
73//! }
74//! ```
75//!
76//! ### Worker Setup and Execution
77//!
78//! ```ignore
79//! use riglr_core::{
80//!     ToolWorker, ExecutionConfig, ResourceLimits, Job,
81//!     idempotency::InMemoryIdempotencyStore
82//! };
83//! use std::{sync::Arc, time::Duration};
84//!
85//! # async fn example() -> anyhow::Result<()> {
86//! // Configure execution behavior
87//! let config = ExecutionConfig {
88//!     max_concurrency: 20,
89//!     default_timeout: Duration::from_secs(60),
90//!     max_retries: 3,
91//!     initial_retry_delay: Duration::from_millis(100),
92//!     max_retry_delay: Duration::from_secs(30),
93//!     enable_idempotency: true,
94//!     ..Default::default()
95//! };
96//!
97//! // Set up resource limits
98//! let limits = ResourceLimits::default()
99//!     .with_limit("solana_rpc", 5)   // Limit Solana RPC calls
100//!     .with_limit("evm_rpc", 10)     // Limit EVM RPC calls
101//!     .with_limit("http_api", 20);   // Limit HTTP API calls
102//!
103//! // Create worker with idempotency store
104//! let store = Arc::new(InMemoryIdempotencyStore::new());
105//! let worker = ToolWorker::new(config)
106//!     .with_idempotency_store(store)
107//!     .with_resource_limits(limits);
108//!
109//! // Register tools
110//! # use riglr_core::{Tool, JobResult};
111//! # use async_trait::async_trait;
112//! # struct SimpleCalculator;
113//! # #[async_trait]
114//! # impl Tool for SimpleCalculator {
115//! #     async fn execute(&self, _: serde_json::Value, _: &crate::provider::ApplicationContext) -> Result<JobResult, ToolError> {
116//! #         Ok(JobResult::success(&42)?)
117//! #     }
118//! #     fn name(&self) -> &str { "calculator" }
119//! # }
120//! worker.register_tool(Arc::new(SimpleCalculator)).await;
121//!
122//! // Process a job
123//! let job = Job::new_idempotent(
124//!     "calculator",
125//!     &serde_json::json!({"a": 10, "b": 5, "op": "add"}),
126//!     3, // max retries
127//!     "calc_10_plus_5" // idempotency key
128//! )?;
129//!
130//! let result = worker.process_job(job).await.unwrap();
131//! println!("Result: {:?}", result);
132//!
133//! // Get metrics
134//! let metrics = worker.metrics();
135//! println!("Jobs processed: {}", metrics.jobs_processed.load(std::sync::atomic::Ordering::Relaxed));
136//! # Ok(())
137//! # }
138//! ```
139
140use async_trait::async_trait;
141use dashmap::DashMap;
142use std::collections::HashMap;
143use std::sync::Arc;
144use std::time::Duration;
145use tokio::sync::{OwnedSemaphorePermit, Semaphore};
146use tracing::{debug, error, info, warn};
147
148use crate::error::{ToolError, WorkerError};
149use crate::idempotency::IdempotencyStore;
150use crate::jobs::{Job, JobResult};
151use crate::queue::JobQueue;
152use crate::signer::SignerContext;
153
154/// A trait defining the execution interface for tools.
155///
156/// This is compatible with `rig::Tool` and provides the foundation
157/// for executing tools within the riglr ecosystem. Tools represent
158/// individual operations that can be executed by the [`ToolWorker`].
159///
160/// ## Design Principles
161///
162/// - **Stateless**: Tools should not maintain internal state between executions
163/// - **Idempotent**: When possible, tools should be safe to retry
164/// - **Error-aware**: Tools should classify errors as retriable or permanent
165/// - **Resource-conscious**: Tools should handle rate limits and timeouts gracefully
166///
167/// ## Error Handling
168///
169/// Tools should carefully distinguish between different types of errors:
170///
171/// - **Retriable errors**: Network timeouts, rate limits, temporary service unavailability
172/// - **Permanent errors**: Invalid parameters, insufficient funds, authorization failures
173/// - **System errors**: Internal configuration issues, unexpected state
174///
175/// ## Examples
176///
177/// ### Basic Tool Implementation
178///
179/// ```ignore
180/// use riglr_core::{Tool, JobResult};
181/// use async_trait::async_trait;
182/// use serde_json::Value;
183///
184/// struct HttpFetcher;
185///
186/// #[async_trait]
187/// impl Tool for HttpFetcher {
188///     async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
189///         let url = params["url"].as_str()
190///             .ok_or("Missing required parameter: url")?;
191///
192///         // Mock HTTP client behavior for this example
193///         if url.starts_with("https://") {
194///             let body = r#"{"data": "success"}"#;
195///             Ok(JobResult::success(&serde_json::json!({"body": body}))?)
196///         } else if url.contains("error") {
197///             // Simulate client error
198///             Ok(JobResult::Failure {
199///                 error: ToolError::permanent_string("Client error: Invalid URL")
200///             })
201///         } else if url.contains("timeout") {
202///             // Simulate timeout
203///             Ok(JobResult::Failure {
204///                 error: ToolError::retriable_string("Request timeout: Connection timed out")
205///             })
206///         } else {
207///             // Simulate server error
208///             Ok(JobResult::Failure {
209///                 error: ToolError::retriable_string("Server error: HTTP 503")
210///             })
211///         }
212///     }
213///
214///     fn name(&self) -> &str {
215///         "http_fetcher"
216///     }
217/// }
218/// ```
219///
220/// ### Tool with Resource-Aware Naming
221///
222/// ```ignore
223/// use riglr_core::{Tool, JobResult};
224/// use async_trait::async_trait;
225/// use serde_json::Value;
226///
227/// // The name starts with "solana_" which will use the solana_rpc resource limit
228/// struct SolanaBalanceChecker;
229///
230/// #[async_trait]
231/// impl Tool for SolanaBalanceChecker {
232///     async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
233///         let address = params["address"].as_str()
234///             .ok_or("Missing required parameter: address")?;
235///
236///         // This tool will be rate-limited by the "solana_rpc" resource limit
237///         // because its name starts with "solana_"
238///
239///         // Implementation would use Solana RPC client here...
240///         let balance = 1.5; // Mock balance
241///
242///         Ok(JobResult::success(&serde_json::json!({
243///             "address": address,
244///             "balance": balance,
245///             "unit": "SOL"
246///         }))?)
247///     }
248///
249///     fn name(&self) -> &str {
250///         "solana_balance_check"  // Name prefix determines resource pool
251///     }
252/// }
253/// ```
254///
255/// ### Tool Using Signer Context
256///
257/// ```ignore
258/// use riglr_core::{Tool, JobResult, SignerContext};
259/// use async_trait::async_trait;
260/// use serde_json::Value;
261///
262/// struct TransferTool;
263///
264/// #[async_trait]
265/// impl Tool for TransferTool {
266///     async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
267///         // Check if we have a signer context
268///         if !SignerContext::is_available().await {
269///             return Ok(JobResult::Failure {
270///                 error: ToolError::permanent_string("Transfer operations require a signer context")
271///             });
272///         }
273///
274///         let signer = SignerContext::current().await
275///             .map_err(|_| "Failed to get signer context")?;
276///
277///         let recipient = params["recipient"].as_str()
278///             .ok_or("Missing required parameter: recipient")?;
279///         let amount = params["amount"].as_f64()
280///             .ok_or("Missing required parameter: amount")?;
281///
282///         // Use the signer to perform the transfer...
283///         // This is just a mock implementation
284///         Ok(JobResult::success_with_tx(
285///             &serde_json::json!({
286///                 "recipient": recipient,
287///                 "amount": amount,
288///                 "status": "completed"
289///             }),
290///             "mock_tx_hash_12345"
291///         )?)
292///     }
293///
294///     fn name(&self) -> &str {
295///         "transfer"
296///     }
297/// }
298/// ```
299#[async_trait]
300pub trait Tool: Send + Sync {
301    /// Execute the tool with the given parameters.
302    ///
303    /// This method performs the core work of the tool. It receives parameters
304    /// as a JSON value and should return a [`JobResult`] indicating success or failure.
305    ///
306    /// # Parameters
307    /// * `params` - JSON parameters passed to the tool. Tools should validate
308    ///              and extract required parameters, returning appropriate errors
309    ///              for missing or invalid data.
310    ///
311    /// # Returns
312    /// * `Ok(JobResult::Success)` - Tool executed successfully with result data
313    /// * `Ok(JobResult::Failure { retriable: true })` - Tool failed but can be retried
314    /// * `Ok(JobResult::Failure { retriable: false })` - Tool failed permanently
315    /// * `Err(Box<dyn Error>)` - Unexpected system error occurred
316    ///
317    /// # Error Classification Guidelines
318    ///
319    /// Return retriable failures (`JobResult::Failure { error: ToolError::retriable_string(...) }`) for:
320    /// - Network timeouts or connection errors
321    /// - Rate limiting (HTTP 429, RPC rate limits) - use `ToolError::rate_limited_string` for these
322    /// - Temporary service unavailability (HTTP 503)
323    /// - Blockchain congestion or temporary RPC failures
324    ///
325    /// Return permanent failures (`JobResult::Failure { error: ToolError::permanent_string(...) }`) for:
326    /// - Invalid parameters or malformed requests - use `ToolError::invalid_input_string` for these
327    /// - Authentication/authorization failures
328    /// - Insufficient funds or balance
329    /// - Invalid blockchain addresses or transaction data
330    ///
331    /// # Examples
332    ///
333    /// ```ignore
334    /// use riglr_core::{Tool, JobResult};
335    /// use async_trait::async_trait;
336    /// use serde_json::Value;
337    ///
338    /// struct WeatherTool;
339    ///
340    /// #[async_trait]
341    /// impl Tool for WeatherTool {
342    ///     async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
343    ///         // Validate parameters
344    ///         let city = params["city"].as_str()
345    ///             .ok_or("Missing required parameter: city")?;
346    ///
347    ///         if city.is_empty() {
348    ///             return Ok(JobResult::Failure {
349    ///                 error: ToolError::invalid_input_string("City name cannot be empty")
350    ///             });
351    ///         }
352    ///
353    ///         // Simulate API call with mock weather service
354    ///         let weather_data = if city == "InvalidCity" {
355    ///             return Ok(JobResult::Failure {
356    ///                 error: ToolError::permanent_string(format!("City not found: {}", city))
357    ///             });
358    ///         } else if city == "TimeoutCity" {
359    ///             return Ok(JobResult::Failure {
360    ///                 error: ToolError::retriable_string("Network timeout")
361    ///             });
362    ///         } else {
363    ///             serde_json::json!({
364    ///                 "city": city,
365    ///                 "temperature": 22,
366    ///                 "condition": "sunny"
367    ///             })
368    ///         };
369    ///
370    ///         Ok(JobResult::success(&weather_data)?)
371    ///     }
372    ///
373    ///     fn name(&self) -> &str {
374    ///         "weather"
375    ///     }
376    /// }
377    /// ```
378    async fn execute(
379        &self,
380        params: serde_json::Value,
381        context: &crate::provider::ApplicationContext,
382    ) -> Result<JobResult, ToolError>;
383
384    /// Get the name of this tool.
385    ///
386    /// The tool name is used for:
387    /// - Tool registration and lookup in the worker
388    /// - Job identification and logging
389    /// - Resource limit mapping (based on name prefixes)
390    /// - Metrics and monitoring
391    ///
392    /// # Naming Conventions
393    ///
394    /// Tool names should follow these patterns for automatic resource management:
395    ///
396    /// - `solana_*` - Uses "solana_rpc" resource pool (e.g., "solana_balance", "solana_transfer")
397    /// - `evm_*` - Uses "evm_rpc" resource pool (e.g., "evm_balance", "evm_swap")
398    /// - `web_*` - Uses "http_api" resource pool (e.g., "web_fetch", "web_scrape")
399    /// - Others - Use default resource pool
400    ///
401    /// # Returns
402    /// A string identifier for this tool that must be unique within a worker.
403    ///
404    /// # Examples
405    ///
406    /// ```ignore
407    /// use riglr_core::Tool;
408    /// # use async_trait::async_trait;
409    /// # use riglr_core::JobResult;
410    ///
411    /// struct BitcoinPriceChecker;
412    ///
413    /// # #[async_trait]
414    /// # impl Tool for BitcoinPriceChecker {
415    /// #     async fn execute(&self, _: serde_json::Value, _: &crate::provider::ApplicationContext) -> Result<JobResult, ToolError> {
416    /// #         Ok(JobResult::success(&42)?)
417    /// #     }
418    /// #
419    /// fn name(&self) -> &str {
420    ///     "web_bitcoin_price"  // Will use "http_api" resource pool
421    /// }
422    /// # }
423    /// ```
424    fn name(&self) -> &str;
425
426    /// A concise, AI-friendly description of this tool's purpose.
427    ///
428    /// This should be a short, human-readable sentence that explains
429    /// what the tool does and when to use it. It's intended for AI models
430    /// and end-user UIs, and is separate from developer-facing rustdoc.
431    ///
432    /// When using the `#[tool]` macro from `riglr-macros`, this value is:
433    /// - Taken from the explicit `#[tool(description = "...")]` attribute when provided
434    /// - Otherwise derived from the item's doc comments
435    /// - Falls back to an empty string if neither is present
436    fn description(&self) -> &str;
437
438    /// Returns the JSON schema for this tool's parameters.
439    ///
440    /// This schema is used by AI models to understand what parameters
441    /// the tool accepts and their types. It should be a valid JSON Schema
442    /// object describing the parameters structure.
443    ///
444    /// The default implementation returns a generic object schema that
445    /// accepts any properties, which may not work well with all AI providers.
446    /// Tools should override this to provide their actual parameter schema.
447    fn schema(&self) -> serde_json::Value {
448        // Default to a generic object schema
449        serde_json::json!({
450            "type": "object",
451            "additionalProperties": true
452        })
453    }
454}
455
456/// Configuration for tool execution behavior.
457///
458/// This struct controls all aspects of how tools are executed by the [`ToolWorker`],
459/// including retry behavior, timeouts, concurrency limits, and idempotency settings.
460///
461/// # Examples
462///
463/// ```ignore
464/// use riglr_core::ExecutionConfig;
465/// use std::time::Duration;
466///
467/// // Default configuration
468/// let config = ExecutionConfig::default();
469///
470/// // Custom configuration for high-throughput scenario
471/// let high_throughput_config = ExecutionConfig {
472///     max_concurrency: 50,
473///     default_timeout: Duration::from_secs(120),
474///     max_retries: 5,
475///     initial_retry_delay: Duration::from_millis(200),
476///     max_retry_delay: Duration::from_secs(60),
477///     enable_idempotency: true,
478///     ..Default::default()
479/// };
480///
481/// // Conservative configuration for sensitive operations
482/// let conservative_config = ExecutionConfig {
483///     max_concurrency: 5,
484///     default_timeout: Duration::from_secs(300),
485///     max_retries: 1,
486///     initial_retry_delay: Duration::from_secs(5),
487///     max_retry_delay: Duration::from_secs(30),
488///     enable_idempotency: true,
489///     ..Default::default()
490/// };
491/// ```
492#[derive(Debug, Clone)]
493pub struct ExecutionConfig {
494    /// Maximum number of concurrent executions per resource type.
495    ///
496    /// This controls the default concurrency limit when no specific resource
497    /// limit is configured. Individual resource types can have their own
498    /// limits configured via [`ResourceLimits`].
499    ///
500    /// **Default**: 10
501    pub max_concurrency: usize,
502
503    /// Default timeout for tool execution.
504    ///
505    /// Individual tool executions will be cancelled if they exceed this duration.
506    /// Tools should be designed to complete within reasonable time bounds.
507    ///
508    /// **Default**: 30 seconds
509    pub default_timeout: Duration,
510
511    /// Maximum number of retry attempts for failed operations.
512    ///
513    /// This applies only to retriable failures. Permanent failures are never retried.
514    /// The total number of execution attempts will be `max_retries + 1`.
515    ///
516    /// **Default**: 3 retries (4 total attempts)
517    pub max_retries: u32,
518
519    /// Initial retry delay for exponential backoff.
520    ///
521    /// The first retry will wait this long after the initial failure.
522    /// Subsequent retries will use exponentially increasing delays.
523    ///
524    /// **Default**: 100 milliseconds
525    pub initial_retry_delay: Duration,
526
527    /// Maximum retry delay for exponential backoff.
528    ///
529    /// Retry delays will not exceed this value, even with exponential backoff.
530    /// This prevents excessive wait times for highly retried operations.
531    ///
532    /// **Default**: 10 seconds
533    pub max_retry_delay: Duration,
534
535    /// TTL for idempotency cache entries.
536    ///
537    /// Completed operations with idempotency keys will be cached for this duration.
538    /// Subsequent requests with the same key will return the cached result.
539    ///
540    /// **Default**: 1 hour
541    pub idempotency_ttl: Duration,
542
543    /// Whether to enable idempotency checking.
544    ///
545    /// When enabled, jobs with idempotency keys will have their results cached
546    /// and subsequent identical requests will return cached results instead of
547    /// re-executing the tool.
548    ///
549    /// **Default**: true
550    pub enable_idempotency: bool,
551}
552
553impl Default for ExecutionConfig {
554    fn default() -> Self {
555        Self {
556            max_concurrency: 10,
557            default_timeout: Duration::from_secs(30),
558            max_retries: 3,
559            initial_retry_delay: Duration::from_millis(100),
560            max_retry_delay: Duration::from_secs(10),
561            idempotency_ttl: Duration::from_secs(3600), // 1 hour
562            enable_idempotency: true,
563        }
564    }
565}
566
567/// Resource limits configuration for fine-grained concurrency control.
568///
569/// This struct allows you to set different concurrency limits for different
570/// types of operations, preventing any single resource type from overwhelming
571/// external services or consuming too many system resources.
572///
573/// Resource limits are applied based on tool name prefixes:
574/// - Tools starting with `solana_` use the "solana_rpc" resource pool
575/// - Tools starting with `evm_` use the "evm_rpc" resource pool
576/// - Tools starting with `web_` use the "http_api" resource pool
577/// - All other tools use the default concurrency limit
578///
579/// # Examples
580///
581/// ```rust
582/// use riglr_core::ResourceLimits;
583///
584/// // Create limits for different resource types
585/// let limits = ResourceLimits::default()
586///     .with_limit("solana_rpc", 3)     // Max 3 concurrent Solana RPC calls
587///     .with_limit("evm_rpc", 8)        // Max 8 concurrent EVM RPC calls
588///     .with_limit("http_api", 15)      // Max 15 concurrent HTTP requests
589///     .with_limit("database", 5);      // Max 5 concurrent database operations
590///
591/// // Use default limits (solana_rpc: 5, evm_rpc: 10, http_api: 20)
592/// let default_limits = ResourceLimits::default();
593/// ```
594///
595/// # Resource Pool Mapping
596///
597/// The system automatically maps tool names to resource pools:
598///
599/// ```text
600/// Tool Name              → Resource Pool    → Limit
601/// "solana_balance"       → "solana_rpc"     → configured limit
602/// "evm_swap"             → "evm_rpc"        → configured limit
603/// "web_fetch"            → "http_api"       → configured limit
604/// "custom_tool"          → default          → ExecutionConfig.max_concurrency
605/// ```
606#[derive(Debug, Clone)]
607pub struct ResourceLimits {
608    /// Resource name to semaphore mapping.
609    ///
610    /// Each semaphore controls the maximum number of concurrent operations
611    /// for its associated resource type.
612    semaphores: Arc<HashMap<String, Arc<Semaphore>>>,
613}
614
615impl ResourceLimits {
616    /// Add a resource limit for the specified resource type.
617    ///
618    /// This creates a semaphore that will limit concurrent access to the
619    /// specified resource. Tools with names matching the resource mapping
620    /// will be subject to this limit.
621    ///
622    /// # Parameters
623    /// * `resource` - The resource identifier (e.g., "solana_rpc", "evm_rpc")
624    /// * `limit` - Maximum number of concurrent operations for this resource
625    ///
626    /// # Returns
627    /// Self, for method chaining
628    ///
629    /// # Examples
630    ///
631    /// ```ignore
632    /// use riglr_core::ResourceLimits;
633    ///
634    /// let limits = ResourceLimits::default()
635    ///     .with_limit("solana_rpc", 3)     // Limit Solana RPC calls
636    ///     .with_limit("database", 10)      // Limit database connections
637    ///     .with_limit("external_api", 5);  // Limit external API calls
638    /// ```
639    pub fn with_limit(mut self, resource: impl Into<String>, limit: usize) -> Self {
640        let semaphores = Arc::make_mut(&mut self.semaphores);
641        semaphores.insert(resource.into(), Arc::new(Semaphore::new(limit)));
642        self
643    }
644
645    /// Get the semaphore for a specific resource type.
646    ///
647    /// This is used internally by the [`ToolWorker`] to acquire permits
648    /// before executing tools. Returns `None` if no limit is configured
649    /// for the specified resource.
650    ///
651    /// # Parameters
652    /// * `resource` - The resource identifier to look up
653    ///
654    /// # Returns
655    /// * `Some(Arc<Semaphore>)` - If a limit is configured for this resource
656    /// * `None` - If no limit is configured (will use default limit)
657    ///
658    /// # Examples
659    ///
660    /// ```rust
661    /// use riglr_core::ResourceLimits;
662    ///
663    /// let limits = ResourceLimits::default()
664    ///     .with_limit("test_resource", 5);
665    ///
666    /// assert!(limits.get_semaphore("test_resource").is_some());
667    /// assert!(limits.get_semaphore("unknown_resource").is_none());
668    /// ```
669    pub fn get_semaphore(&self, resource: &str) -> Option<Arc<Semaphore>> {
670        self.semaphores.get(resource).cloned()
671    }
672}
673
674impl Default for ResourceLimits {
675    fn default() -> Self {
676        let mut semaphores = HashMap::new();
677        semaphores.insert("solana_rpc".to_string(), Arc::new(Semaphore::new(5)));
678        semaphores.insert("evm_rpc".to_string(), Arc::new(Semaphore::new(10)));
679        semaphores.insert("http_api".to_string(), Arc::new(Semaphore::new(20)));
680        Self {
681            semaphores: Arc::new(semaphores),
682        }
683    }
684}
685
686/// A worker that processes jobs from a queue using registered tools.
687///
688/// The `ToolWorker` is the core execution engine of the riglr system. It manages
689/// registered tools, processes jobs with resilience features, and provides
690/// comprehensive monitoring and observability.
691///
692/// ## Key Features
693///
694/// - **Resilient execution**: Automatic retries with exponential backoff
695/// - **Resource management**: Configurable limits per resource type
696/// - **Idempotency support**: Cached results for safe retries
697/// - **Comprehensive monitoring**: Built-in metrics and structured logging
698/// - **Concurrent processing**: Efficient parallel job execution
699///
700/// ## Architecture
701///
702/// The worker operates on a job-based model where:
703/// 1. Jobs are dequeued from a [`JobQueue`]
704/// 2. Tools are looked up by name and executed
705/// 3. Results are processed with retry logic for failures
706/// 4. Successful results are optionally cached for idempotency
707/// 5. Metrics are updated for monitoring
708///
709/// ## Examples
710///
711/// ### Basic Setup
712///
713/// ```ignore
714/// use riglr_core::{
715///     ToolWorker, ExecutionConfig, ResourceLimits,
716///     idempotency::InMemoryIdempotencyStore
717/// };
718/// use std::sync::Arc;
719///
720/// # async fn example() -> anyhow::Result<()> {
721/// // Create worker with default configuration
722/// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
723///     ExecutionConfig::default()
724/// );
725///
726/// // Add idempotency store for safe retries
727/// let store = Arc::new(InMemoryIdempotencyStore::new());
728/// let worker = worker.with_idempotency_store(store);
729///
730/// // Configure resource limits
731/// let limits = ResourceLimits::default()
732///     .with_limit("solana_rpc", 3)
733///     .with_limit("evm_rpc", 5);
734/// let worker = worker.with_resource_limits(limits);
735/// # Ok(())
736/// # }
737/// ```
738///
739/// ### Processing Jobs
740///
741/// ```ignore
742/// use riglr_core::{ToolWorker, Job, Tool, JobResult, ExecutionConfig};
743/// use riglr_core::idempotency::InMemoryIdempotencyStore;
744/// use async_trait::async_trait;
745/// use std::sync::Arc;
746///
747/// # struct ExampleTool;
748/// # #[async_trait]
749/// # impl Tool for ExampleTool {
750/// #     async fn execute(&self, _: serde_json::Value, _: &crate::provider::ApplicationContext) -> Result<JobResult, ToolError> {
751/// #         Ok(JobResult::success(&"example result")?)
752/// #     }
753/// #     fn name(&self) -> &str { "example" }
754/// # }
755/// # async fn example() -> anyhow::Result<()> {
756/// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
757///     ExecutionConfig::default()
758/// );
759///
760/// // Register tools
761/// worker.register_tool(Arc::new(ExampleTool)).await;
762///
763/// // Process a job
764/// let job = Job::new("example", &serde_json::json!({}), 3)?;
765/// let result = worker.process_job(job).await.unwrap();
766///
767/// println!("Job result: {:?}", result);
768///
769/// // Check metrics
770/// let metrics = worker.metrics();
771/// println!("Jobs processed: {}",
772///     metrics.jobs_processed.load(std::sync::atomic::Ordering::Relaxed));
773/// # Ok(())
774/// # }
775/// ```
776pub struct ToolWorker<I: IdempotencyStore + 'static> {
777    /// Registered tools, indexed by name for fast lookup
778    tools: Arc<DashMap<String, Arc<dyn Tool>>>,
779
780    /// Default semaphore for general concurrency control
781    default_semaphore: Arc<Semaphore>,
782
783    /// Resource-specific limits for fine-grained control
784    resource_limits: ResourceLimits,
785
786    /// Configuration for retry behavior, timeouts, etc.
787    config: ExecutionConfig,
788
789    /// Optional idempotency store for caching results
790    idempotency_store: Option<Arc<I>>,
791
792    /// Performance and operational metrics
793    metrics: Arc<WorkerMetrics>,
794
795    /// Application context providing access to RPC providers and shared resources
796    app_context: crate::provider::ApplicationContext,
797}
798
799/// Performance and operational metrics for monitoring worker health.
800///
801/// These metrics provide insight into worker performance and can be used
802/// for monitoring, alerting, and performance optimization.
803///
804/// All metrics use atomic integers for thread-safe updates and reads.
805///
806/// ## Metrics Tracked
807///
808/// - **jobs_processed**: Total number of jobs dequeued and processed
809/// - **jobs_succeeded**: Number of jobs that completed successfully
810/// - **jobs_failed**: Number of jobs that failed permanently (after all retries)
811/// - **jobs_retried**: Total number of retry attempts across all jobs
812///
813/// ## Examples
814///
815/// ```ignore
816/// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
817/// use std::sync::atomic::Ordering;
818///
819/// # async fn example() {
820/// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
821///     ExecutionConfig::default()
822/// );
823///
824/// let metrics = worker.metrics();
825///
826/// // Read current metrics
827/// let processed = metrics.jobs_processed.load(Ordering::Relaxed);
828/// let succeeded = metrics.jobs_succeeded.load(Ordering::Relaxed);
829/// let failed = metrics.jobs_failed.load(Ordering::Relaxed);
830/// let retried = metrics.jobs_retried.load(Ordering::Relaxed);
831///
832/// println!("Worker Stats:");
833/// println!("  Processed: {}", processed);
834/// println!("  Succeeded: {}", succeeded);
835/// println!("  Failed: {}", failed);
836/// println!("  Retried: {}", retried);
837/// println!("  Success Rate: {:.2}%",
838///     if processed > 0 { (succeeded as f64 / processed as f64) * 100.0 } else { 0.0 });
839/// # }
840/// ```
841#[derive(Debug, Default)]
842pub struct WorkerMetrics {
843    /// Total number of jobs processed (dequeued and executed)
844    pub jobs_processed: std::sync::atomic::AtomicU64,
845
846    /// Number of jobs that completed successfully
847    pub jobs_succeeded: std::sync::atomic::AtomicU64,
848
849    /// Number of jobs that failed permanently (after all retries)
850    pub jobs_failed: std::sync::atomic::AtomicU64,
851
852    /// Total number of retry attempts across all jobs
853    pub jobs_retried: std::sync::atomic::AtomicU64,
854}
855
856impl<I: IdempotencyStore + 'static> std::fmt::Debug for ToolWorker<I> {
857    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
858        f.debug_struct("ToolWorker")
859            .field("tools_count", &self.tools.len())
860            .field("config", &self.config)
861            .finish_non_exhaustive()
862    }
863}
864
865impl<I: IdempotencyStore + 'static> ToolWorker<I> {
866    /// Create a new tool worker with the given configuration and application context.
867    ///
868    /// This creates a worker ready to process jobs, but no tools are registered yet.
869    /// Use [`register_tool()`](Self::register_tool) to add tools before processing jobs.
870    ///
871    /// # Parameters
872    /// * `config` - Execution configuration controlling retry behavior, timeouts, etc.
873    /// * `app_context` - Application context providing access to RPC providers and shared resources
874    ///
875    /// # Examples
876    ///
877    /// ```ignore
878    /// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore, provider::ApplicationContext};
879    /// use std::time::Duration;
880    /// use riglr_config::Config;
881    ///
882    /// let exec_config = ExecutionConfig {
883    ///     max_concurrency: 20,
884    ///     default_timeout: Duration::from_secs(60),
885    ///     max_retries: 5,
886    ///     ..Default::default()
887    /// };
888    /// let config = Config::from_env();
889    /// let app_context = ApplicationContext::from_config(&config);
890    ///
891    /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(exec_config, app_context);
892    /// ```
893    pub fn new(config: ExecutionConfig, app_context: crate::provider::ApplicationContext) -> Self {
894        Self {
895            tools: Arc::new(DashMap::new()),
896            default_semaphore: Arc::new(Semaphore::new(config.max_concurrency)),
897            resource_limits: ResourceLimits::default(),
898            config,
899            idempotency_store: None,
900            metrics: Arc::new(WorkerMetrics::default()),
901            app_context,
902        }
903    }
904
905    /// Configure an idempotency store for result caching.
906    ///
907    /// When an idempotency store is configured, jobs with idempotency keys
908    /// will have their results cached. Subsequent executions with the same
909    /// idempotency key will return the cached result instead of re-executing.
910    ///
911    /// # Parameters
912    /// * `store` - The idempotency store implementation to use
913    ///
914    /// # Returns
915    /// Self, for method chaining
916    ///
917    /// # Examples
918    ///
919    /// ```ignore
920    /// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
921    /// use std::sync::Arc;
922    ///
923    /// let store = Arc::new(InMemoryIdempotencyStore::new());
924    /// let worker = ToolWorker::new(ExecutionConfig::default())
925    ///     .with_idempotency_store(store);
926    /// ```
927    pub fn with_idempotency_store(mut self, store: Arc<I>) -> Self {
928        self.idempotency_store = Some(store);
929        self
930    }
931
932    /// Configure custom resource limits.
933    ///
934    /// Resource limits control how many concurrent operations can run
935    /// for each resource type. This prevents overwhelming external
936    /// services and provides fine-grained control over resource usage.
937    ///
938    /// # Parameters
939    /// * `limits` - The resource limits configuration to use
940    ///
941    /// # Returns
942    /// Self, for method chaining
943    ///
944    /// # Examples
945    ///
946    /// ```ignore
947    /// use riglr_core::{ToolWorker, ExecutionConfig, ResourceLimits, idempotency::InMemoryIdempotencyStore};
948    ///
949    /// let limits = ResourceLimits::default()
950    ///     .with_limit("solana_rpc", 3)
951    ///     .with_limit("evm_rpc", 8)
952    ///     .with_limit("http_api", 15);
953    ///
954    /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default())
955    ///     .with_resource_limits(limits);
956    /// ```
957    pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
958        self.resource_limits = limits;
959        self
960    }
961
962    /// Register a tool with this worker.
963    ///
964    /// Tools must be registered before they can be executed by jobs.
965    /// Each tool is indexed by its name, so tool names must be unique
966    /// within a single worker.
967    ///
968    /// # Parameters
969    /// * `tool` - The tool implementation to register
970    ///
971    /// # Panics
972    /// This method does not panic, but if a tool with the same name
973    /// is already registered, it will be replaced.
974    ///
975    /// # Examples
976    ///
977    /// ```ignore
978    /// use riglr_core::{ToolWorker, ExecutionConfig, Tool, JobResult, idempotency::InMemoryIdempotencyStore};
979    /// use async_trait::async_trait;
980    /// use std::sync::Arc;
981    ///
982    /// struct CalculatorTool;
983    ///
984    /// #[async_trait]
985    /// impl Tool for CalculatorTool {
986    ///     async fn execute(&self, params: serde_json::Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
987    ///         let a = params["a"].as_f64().unwrap_or(0.0);
988    ///         let b = params["b"].as_f64().unwrap_or(0.0);
989    ///         Ok(JobResult::success(&(a + b))?)
990    ///     }
991    ///
992    ///     fn name(&self) -> &str {
993    ///         "calculator"
994    ///     }
995    /// }
996    ///
997    /// # async fn example() {
998    /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default());
999    /// worker.register_tool(Arc::new(CalculatorTool)).await;
1000    /// # }
1001    /// ```
1002    pub async fn register_tool(&self, tool: Arc<dyn Tool>) {
1003        self.tools.insert(tool.name().to_string(), tool);
1004    }
1005
1006    /// Get access to worker metrics.
1007    ///
1008    /// The returned metrics can be used for monitoring worker performance
1009    /// and health. All metrics are thread-safe and can be read at any time.
1010    ///
1011    /// # Returns
1012    /// A reference to the worker's metrics
1013    ///
1014    /// # Examples
1015    ///
1016    /// ```ignore
1017    /// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
1018    /// use std::sync::atomic::Ordering;
1019    ///
1020    /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default());
1021    /// let metrics = worker.metrics();
1022    ///
1023    /// println!("Jobs processed: {}",
1024    ///     metrics.jobs_processed.load(Ordering::Relaxed));
1025    /// println!("Success rate: {:.2}%", {
1026    ///     let processed = metrics.jobs_processed.load(Ordering::Relaxed);
1027    ///     let succeeded = metrics.jobs_succeeded.load(Ordering::Relaxed);
1028    ///     if processed > 0 { (succeeded as f64 / processed as f64) * 100.0 } else { 0.0 }
1029    /// });
1030    /// ```
1031    pub fn metrics(&self) -> &WorkerMetrics {
1032        &self.metrics
1033    }
1034
1035    /// Process a single job with all resilience features.
1036    ///
1037    /// Returns:
1038    /// - `Ok(JobResult)` - The job was processed (successfully or with business logic failure)
1039    /// - `Err(WorkerError)` - System-level worker failure (tool not found, semaphore issues, etc.)
1040    pub async fn process_job(&self, mut job: Job) -> Result<JobResult, WorkerError> {
1041        // Check idempotency cache first
1042        if let Some(cached_result) = self.check_idempotency_cache(&job).await? {
1043            return Ok(Arc::try_unwrap(cached_result).unwrap_or_else(|arc| (*arc).clone()));
1044        }
1045
1046        // Apply rate limiting for operations within a SignerContext that has a user_id
1047        if let Ok(signer) = SignerContext::current().await {
1048            if let Some(user_id) = signer.user_id() {
1049                if let Err(rate_limit_error) =
1050                    self.app_context.rate_limiter.check_rate_limit(&user_id)
1051                {
1052                    // Convert rate limit error to a JobResult::Failure
1053                    return Ok(JobResult::Failure {
1054                        error: rate_limit_error,
1055                    });
1056                }
1057            }
1058        }
1059
1060        // Acquire appropriate semaphore
1061        let _permit = self.acquire_semaphore(&job.tool_name).await.map_err(|e| {
1062            WorkerError::SemaphoreAcquisition {
1063                tool_name: job.tool_name.clone(),
1064                source_message: e.to_string(),
1065            }
1066        })?;
1067
1068        // Get the tool for this job
1069        let tool = self.get_tool(&job.tool_name).await?;
1070
1071        // Execute with retries
1072        let result = self.execute_with_retries(tool, &mut job).await;
1073
1074        // Cache successful results
1075        if let Ok(ref job_result) = result {
1076            self.cache_result(&job, job_result).await;
1077        }
1078
1079        result
1080    }
1081
1082    /// Check if there's a cached result for this job
1083    async fn check_idempotency_cache(
1084        &self,
1085        job: &Job,
1086    ) -> Result<Option<Arc<JobResult>>, WorkerError> {
1087        if let Some(ref idempotency_key) = job.idempotency_key {
1088            if self.config.enable_idempotency {
1089                if let Some(ref store) = self.idempotency_store {
1090                    match store.get(idempotency_key).await {
1091                        Ok(Some(cached_result)) => {
1092                            info!(
1093                                "Returning cached result for idempotency key: {}",
1094                                idempotency_key
1095                            );
1096                            return Ok(Some(cached_result));
1097                        }
1098                        Ok(None) => {} // No cached result
1099                        Err(e) => {
1100                            return Err(WorkerError::IdempotencyStore {
1101                                source_message: e.to_string(),
1102                            });
1103                        }
1104                    }
1105                }
1106            }
1107        }
1108        Ok(None)
1109    }
1110
1111    /// Get a tool from the registry
1112    async fn get_tool(&self, tool_name: &str) -> Result<Arc<dyn Tool>, WorkerError> {
1113        self.tools
1114            .get(tool_name)
1115            .ok_or_else(|| WorkerError::ToolNotFound {
1116                tool_name: tool_name.to_string(),
1117            })
1118            .map(|entry| (*entry).clone())
1119    }
1120
1121    /// Execute a tool with retry logic using unified retry helper
1122    async fn execute_with_retries(
1123        &self,
1124        tool: Arc<dyn Tool>,
1125        job: &mut Job,
1126    ) -> Result<JobResult, WorkerError> {
1127        use crate::retry::{retry_async, ErrorClass, RetryConfig};
1128
1129        // Create retry config from worker config
1130        let retry_config = RetryConfig {
1131            max_retries: job.max_retries,
1132            base_delay_ms: self.config.initial_retry_delay.as_millis() as u64,
1133            max_delay_ms: self.config.max_retry_delay.as_millis() as u64,
1134            backoff_multiplier: 2.0,
1135            use_jitter: true,
1136        };
1137
1138        // Use unified retry helper
1139        let result = retry_async(
1140            || async { self.execute_single_attempt(&tool, &job.params).await },
1141            |error| {
1142                // Classify error based on ToolError's is_retriable
1143                if error.is_retriable() {
1144                    ErrorClass::Retryable
1145                } else {
1146                    ErrorClass::Permanent
1147                }
1148            },
1149            &retry_config,
1150            &format!("job_{}", job.job_id),
1151        )
1152        .await;
1153
1154        // Update metrics based on result
1155        match result {
1156            Ok(job_result) => {
1157                self.metrics
1158                    .jobs_succeeded
1159                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1160                Ok(job_result)
1161            }
1162            Err(tool_error) => {
1163                self.metrics
1164                    .jobs_failed
1165                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1166                // Track retries that occurred - the retry helper manages the actual retries
1167                if retry_config.max_retries > 0 {
1168                    self.metrics
1169                        .jobs_retried
1170                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1171                }
1172                Ok(JobResult::Failure { error: tool_error })
1173            }
1174        }
1175    }
1176
1177    /// Execute a single attempt of a tool
1178    async fn execute_single_attempt(
1179        &self,
1180        tool: &Arc<dyn Tool>,
1181        params: &serde_json::Value,
1182    ) -> Result<JobResult, ToolError> {
1183        // Execute with timeout
1184        let result = tokio::time::timeout(
1185            self.config.default_timeout,
1186            tool.execute(params.clone(), &self.app_context),
1187        )
1188        .await;
1189
1190        match result {
1191            Ok(Ok(job_result)) => Ok(job_result),
1192            Ok(Err(e)) => {
1193                // Propagate the ToolError directly to preserve its structure
1194                warn!("Tool execution failed: {}", e);
1195                Err(e)
1196            }
1197            Err(_) => {
1198                let error_msg = format!(
1199                    "Tool execution timed out after {:?}",
1200                    self.config.default_timeout
1201                );
1202                warn!("{}", error_msg);
1203                // Timeout errors are retriable
1204                Err(ToolError::retriable_string(error_msg))
1205            }
1206        }
1207    }
1208
1209    // Note: create_backoff_strategy and wait_with_backoff methods removed
1210    // as they are now handled by the unified retry helper in retry.rs
1211
1212    /// Cache a successful result
1213    async fn cache_result(&self, job: &Job, job_result: &JobResult) {
1214        if let Some(ref idempotency_key) = job.idempotency_key {
1215            if self.config.enable_idempotency {
1216                if let Some(ref store) = self.idempotency_store {
1217                    let _ = store
1218                        .set(
1219                            idempotency_key,
1220                            Arc::new(job_result.clone()),
1221                            self.config.idempotency_ttl,
1222                        )
1223                        .await;
1224                }
1225            }
1226        }
1227    }
1228
1229    /// Acquire the appropriate semaphore for a tool
1230    async fn acquire_semaphore(
1231        &self,
1232        tool_name: &str,
1233    ) -> Result<OwnedSemaphorePermit, Box<dyn std::error::Error + Send + Sync>> {
1234        // Check if there's a specific resource limit for this tool
1235        let resource_name = match tool_name {
1236            name if name.starts_with("solana_") => "solana_rpc",
1237            name if name.starts_with("evm_") => "evm_rpc",
1238            name if name.starts_with("web_") => "http_api",
1239            _ => "",
1240        };
1241
1242        if !resource_name.is_empty() {
1243            if let Some(semaphore) = self.resource_limits.get_semaphore(resource_name) {
1244                return Ok(semaphore.acquire_owned().await?);
1245            }
1246        }
1247
1248        // Fall back to default semaphore
1249        Ok(self.default_semaphore.clone().acquire_owned().await?)
1250    }
1251
1252    /// Start the worker loop, processing jobs from the given queue.
1253    ///
1254    /// The worker will continue processing jobs until the provided cancellation token
1255    /// is cancelled. This allows for graceful shutdown where in-flight jobs can complete
1256    /// before the worker stops.
1257    ///
1258    /// # Parameters
1259    /// * `queue` - The job queue to process jobs from
1260    /// * `cancellation_token` - Token to signal when the worker should stop
1261    ///
1262    /// # Examples
1263    ///
1264    /// ```rust
1265    /// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
1266    /// use riglr_core::provider::ApplicationContext;
1267    /// use riglr_core::queue::InMemoryJobQueue;
1268    /// use riglr_config::ConfigBuilder;
1269    /// use tokio_util::sync::CancellationToken;
1270    /// use std::sync::Arc;
1271    ///
1272    /// # async fn example() -> anyhow::Result<()> {
1273    /// let config = ConfigBuilder::default().build().unwrap();
1274    /// let app_context = ApplicationContext::from_config(&config);
1275    /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default(), app_context);
1276    /// let queue = Arc::new(InMemoryJobQueue::new());
1277    /// let cancellation_token = CancellationToken::new();
1278    ///
1279    /// // Start worker in background
1280    /// let token_clone = cancellation_token.clone();
1281    /// let worker_handle = tokio::spawn(async move {
1282    ///     worker.run(queue, token_clone).await
1283    /// });
1284    ///
1285    /// // Later, signal shutdown
1286    /// cancellation_token.cancel();
1287    /// // Await the task and ignore the inner result for simplicity in docs
1288    /// let _ = worker_handle.await;
1289    /// # Ok(())
1290    /// # }
1291    /// ```
1292    pub async fn run<Q: JobQueue>(
1293        &self,
1294        queue: Arc<Q>,
1295        cancellation_token: tokio_util::sync::CancellationToken,
1296    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1297        info!(
1298            "Starting ToolWorker with {} tools registered",
1299            self.tools.len()
1300        );
1301
1302        while !cancellation_token.is_cancelled() {
1303            tokio::select! {
1304                // Check for cancellation
1305                _ = cancellation_token.cancelled() => {
1306                    info!("Worker shutdown requested, stopping job processing");
1307                    break;
1308                }
1309
1310                // Try to dequeue and process jobs
1311                result = queue.dequeue_with_timeout(Duration::from_secs(5)) => {
1312                    match result {
1313                        Ok(Some(job)) => {
1314                            let job_id = job.job_id;
1315                            let tool_name = job.tool_name.clone();
1316
1317                            self.metrics
1318                                .jobs_processed
1319                                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1320
1321                            // Spawn task to process job asynchronously
1322                            let worker = self.clone();
1323                            tokio::spawn(async move {
1324                                match worker.process_job(job).await {
1325                                    Ok(job_result) => {
1326                                        if job_result.is_success() {
1327                                            info!("Job {} ({}) completed successfully", job_id, tool_name);
1328                                        } else {
1329                                            warn!(
1330                                                "Job {} ({}) failed: {:?}",
1331                                                job_id, tool_name, job_result
1332                                            );
1333                                        }
1334                                    }
1335                                    Err(e) => {
1336                                        error!("Job {} ({}) processing error: {}", job_id, tool_name, e);
1337                                    }
1338                                }
1339                            });
1340                        }
1341                        Ok(None) => {
1342                            // No jobs available, continue
1343                            debug!("No jobs available in queue");
1344                        }
1345                        Err(e) => {
1346                            error!("Failed to dequeue job: {}", e);
1347                            tokio::time::sleep(Duration::from_secs(1)).await;
1348                        }
1349                    }
1350                }
1351            }
1352        }
1353
1354        info!("ToolWorker shutdown completed");
1355        Ok(())
1356    }
1357}
1358
1359// Implement Clone for ToolWorker to enable spawning tasks
1360impl<I: IdempotencyStore + 'static> Clone for ToolWorker<I> {
1361    fn clone(&self) -> Self {
1362        Self {
1363            tools: self.tools.clone(),
1364            default_semaphore: self.default_semaphore.clone(),
1365            resource_limits: self.resource_limits.clone(),
1366            config: self.config.clone(),
1367            idempotency_store: self.idempotency_store.clone(),
1368            metrics: self.metrics.clone(),
1369            app_context: self.app_context.clone(),
1370        }
1371    }
1372}
1373
1374#[cfg(test)]
1375mod tests {
1376    use super::*;
1377    use crate::idempotency::InMemoryIdempotencyStore;
1378    use crate::jobs::Job;
1379    use crate::provider::ApplicationContext;
1380    use uuid::Uuid;
1381
1382    fn test_app_context() -> ApplicationContext {
1383        // Use default configuration for tests
1384        ApplicationContext::default()
1385    }
1386
1387    struct MockTool {
1388        name: String,
1389        should_fail: bool,
1390    }
1391
1392    struct RetriableMockTool {
1393        name: String,
1394    }
1395
1396    #[async_trait]
1397    impl Tool for MockTool {
1398        async fn execute(
1399            &self,
1400            _params: serde_json::Value,
1401            _context: &crate::provider::ApplicationContext,
1402        ) -> Result<JobResult, ToolError> {
1403            if self.should_fail {
1404                Err(ToolError::permanent_string("Mock failure"))
1405            } else {
1406                Ok(JobResult::Success {
1407                    value: serde_json::json!({"result": "success"}),
1408                    tx_hash: None,
1409                })
1410            }
1411        }
1412
1413        fn name(&self) -> &str {
1414            &self.name
1415        }
1416
1417        fn description(&self) -> &str {
1418            ""
1419        }
1420
1421        fn schema(&self) -> serde_json::Value {
1422            serde_json::json!({
1423                "type": "object",
1424                "additionalProperties": true
1425            })
1426        }
1427    }
1428
1429    #[async_trait]
1430    impl Tool for RetriableMockTool {
1431        async fn execute(
1432            &self,
1433            _params: serde_json::Value,
1434            _context: &crate::provider::ApplicationContext,
1435        ) -> Result<JobResult, ToolError> {
1436            Err(ToolError::retriable_string("Mock retriable failure"))
1437        }
1438
1439        fn name(&self) -> &str {
1440            &self.name
1441        }
1442
1443        fn description(&self) -> &str {
1444            ""
1445        }
1446
1447        fn schema(&self) -> serde_json::Value {
1448            serde_json::json!({
1449                "type": "object",
1450                "additionalProperties": true
1451            })
1452        }
1453    }
1454
1455    #[tokio::test]
1456    async fn test_tool_worker_process_job() {
1457        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1458            ExecutionConfig::default(),
1459            test_app_context(),
1460        );
1461        let tool = Arc::new(MockTool {
1462            name: "test_tool".to_string(),
1463            should_fail: false,
1464        });
1465        worker.register_tool(tool).await;
1466
1467        let job = Job {
1468            job_id: Uuid::new_v4(),
1469            tool_name: "test_tool".to_string(),
1470            params: serde_json::json!({}),
1471            idempotency_key: None,
1472            max_retries: 3,
1473            retry_count: 0,
1474        };
1475
1476        let result = worker.process_job(job).await.unwrap();
1477        match result {
1478            JobResult::Success { .. } => (),
1479            _ => panic!("Expected success"),
1480        }
1481    }
1482
1483    #[tokio::test]
1484    async fn test_tool_worker_with_idempotency() {
1485        let store = Arc::new(InMemoryIdempotencyStore::new());
1486        let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
1487            .with_idempotency_store(store.clone());
1488
1489        let tool = Arc::new(MockTool {
1490            name: "test_tool".to_string(),
1491            should_fail: false,
1492        });
1493        worker.register_tool(tool).await;
1494
1495        let job = Job {
1496            job_id: Uuid::new_v4(),
1497            tool_name: "test_tool".to_string(),
1498            params: serde_json::json!({}),
1499            idempotency_key: Some("test_key".to_string()),
1500            max_retries: 3,
1501            retry_count: 0,
1502        };
1503
1504        // First execution
1505        let result1 = worker.process_job(job.clone()).await.unwrap();
1506        assert!(result1.is_success());
1507
1508        // Second execution should return cached result
1509        let result2 = worker.process_job(job).await.unwrap();
1510        assert!(result2.is_success());
1511    }
1512
1513    #[tokio::test]
1514    async fn test_tool_worker_with_retries() {
1515        let config = ExecutionConfig {
1516            initial_retry_delay: Duration::from_millis(10),
1517            ..Default::default()
1518        };
1519
1520        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
1521        let tool = Arc::new(MockTool {
1522            name: "test_tool".to_string(),
1523            should_fail: true,
1524        });
1525        worker.register_tool(tool).await;
1526
1527        let job = Job {
1528            job_id: Uuid::new_v4(),
1529            tool_name: "test_tool".to_string(),
1530            params: serde_json::json!({}),
1531            idempotency_key: None,
1532            max_retries: 2,
1533            retry_count: 0,
1534        };
1535
1536        let result = worker.process_job(job).await.unwrap();
1537        match result {
1538            JobResult::Failure { .. } => {
1539                assert!(!result.is_retriable()); // Should not be retriable after exhausting retries
1540            }
1541            _ => panic!("Expected failure"),
1542        }
1543    }
1544
1545    #[tokio::test]
1546    async fn test_tool_worker_tool_not_found() {
1547        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1548            ExecutionConfig::default(),
1549            test_app_context(),
1550        );
1551
1552        let job = Job {
1553            job_id: Uuid::new_v4(),
1554            tool_name: "nonexistent_tool".to_string(),
1555            params: serde_json::json!({}),
1556            idempotency_key: None,
1557            max_retries: 0,
1558            retry_count: 0,
1559        };
1560
1561        let result = worker.process_job(job).await;
1562        assert!(result.is_err());
1563        assert!(result
1564            .unwrap_err()
1565            .to_string()
1566            .contains("Tool 'nonexistent_tool' not found"));
1567    }
1568
1569    #[tokio::test]
1570    async fn test_tool_worker_timeout() {
1571        let config = ExecutionConfig {
1572            default_timeout: Duration::from_millis(10), // Very short timeout
1573            ..Default::default()
1574        };
1575
1576        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
1577        let tool = Arc::new(SlowMockTool {
1578            name: "slow_tool".to_string(),
1579            delay: Duration::from_millis(100),
1580        });
1581        worker.register_tool(tool).await;
1582
1583        let job = Job {
1584            job_id: Uuid::new_v4(),
1585            tool_name: "slow_tool".to_string(),
1586            params: serde_json::json!({}),
1587            idempotency_key: None,
1588            max_retries: 1,
1589            retry_count: 0,
1590        };
1591
1592        let result = worker.process_job(job).await.unwrap();
1593        match result {
1594            JobResult::Failure { error, .. } => {
1595                assert!(error.to_string().to_lowercase().contains("timed out"));
1596            }
1597            _ => panic!("Expected timeout failure"),
1598        }
1599    }
1600
1601    #[tokio::test]
1602    async fn test_tool_worker_with_resource_limits() {
1603        let config = ExecutionConfig::default();
1604        let limits = ResourceLimits::default()
1605            .with_limit("solana_rpc", 2)
1606            .with_limit("evm_rpc", 3);
1607
1608        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context())
1609            .with_resource_limits(limits);
1610
1611        // Test semaphore acquisition for different tool types
1612        let solana_tool = Arc::new(MockTool {
1613            name: "solana_test".to_string(),
1614            should_fail: false,
1615        });
1616        let evm_tool = Arc::new(MockTool {
1617            name: "evm_test".to_string(),
1618            should_fail: false,
1619        });
1620        let web_tool = Arc::new(MockTool {
1621            name: "web_test".to_string(),
1622            should_fail: false,
1623        });
1624        let other_tool = Arc::new(MockTool {
1625            name: "other_test".to_string(),
1626            should_fail: false,
1627        });
1628
1629        worker.register_tool(solana_tool).await;
1630        worker.register_tool(evm_tool).await;
1631        worker.register_tool(web_tool).await;
1632        worker.register_tool(other_tool).await;
1633
1634        // Test different tool name patterns
1635        let jobs = vec![
1636            Job {
1637                job_id: Uuid::new_v4(),
1638                tool_name: "solana_test".to_string(),
1639                params: serde_json::json!({}),
1640                idempotency_key: None,
1641                max_retries: 0,
1642                retry_count: 0,
1643            },
1644            Job {
1645                job_id: Uuid::new_v4(),
1646                tool_name: "evm_test".to_string(),
1647                params: serde_json::json!({}),
1648                idempotency_key: None,
1649                max_retries: 0,
1650                retry_count: 0,
1651            },
1652            Job {
1653                job_id: Uuid::new_v4(),
1654                tool_name: "web_test".to_string(),
1655                params: serde_json::json!({}),
1656                idempotency_key: None,
1657                max_retries: 0,
1658                retry_count: 0,
1659            },
1660            Job {
1661                job_id: Uuid::new_v4(),
1662                tool_name: "other_test".to_string(),
1663                params: serde_json::json!({}),
1664                idempotency_key: None,
1665                max_retries: 0,
1666                retry_count: 0,
1667            },
1668        ];
1669
1670        // All should succeed
1671        for job in jobs {
1672            let result = worker.process_job(job).await.unwrap();
1673            assert!(result.is_success());
1674        }
1675    }
1676
1677    #[tokio::test]
1678    async fn test_tool_worker_idempotency_disabled() {
1679        let config = ExecutionConfig {
1680            enable_idempotency: false,
1681            ..Default::default()
1682        };
1683
1684        let store = Arc::new(InMemoryIdempotencyStore::new());
1685        let worker =
1686            ToolWorker::new(config, test_app_context()).with_idempotency_store(store.clone());
1687
1688        let tool = Arc::new(MockTool {
1689            name: "test_tool".to_string(),
1690            should_fail: false,
1691        });
1692        worker.register_tool(tool).await;
1693
1694        let job = Job {
1695            job_id: Uuid::new_v4(),
1696            tool_name: "test_tool".to_string(),
1697            params: serde_json::json!({}),
1698            idempotency_key: Some("test_key".to_string()),
1699            max_retries: 0,
1700            retry_count: 0,
1701        };
1702
1703        // First execution
1704        let result1 = worker.process_job(job.clone()).await.unwrap();
1705        assert!(result1.is_success());
1706
1707        // Second execution should NOT use cache due to disabled idempotency
1708        let result2 = worker.process_job(job).await.unwrap();
1709        assert!(result2.is_success());
1710
1711        // Verify the key was never set in the store
1712        assert!(store.get("test_key").await.unwrap().is_none());
1713    }
1714
1715    #[tokio::test]
1716    async fn test_tool_worker_metrics() {
1717        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1718            ExecutionConfig::default(),
1719            test_app_context(),
1720        );
1721        let success_tool = Arc::new(MockTool {
1722            name: "success_tool".to_string(),
1723            should_fail: false,
1724        });
1725        let fail_tool = Arc::new(RetriableMockTool {
1726            name: "fail_tool".to_string(),
1727        });
1728
1729        worker.register_tool(success_tool).await;
1730        worker.register_tool(fail_tool).await;
1731
1732        let metrics = worker.metrics();
1733
1734        // Initial state
1735        assert_eq!(
1736            metrics
1737                .jobs_processed
1738                .load(std::sync::atomic::Ordering::Relaxed),
1739            0
1740        );
1741        assert_eq!(
1742            metrics
1743                .jobs_succeeded
1744                .load(std::sync::atomic::Ordering::Relaxed),
1745            0
1746        );
1747        assert_eq!(
1748            metrics
1749                .jobs_failed
1750                .load(std::sync::atomic::Ordering::Relaxed),
1751            0
1752        );
1753        assert_eq!(
1754            metrics
1755                .jobs_retried
1756                .load(std::sync::atomic::Ordering::Relaxed),
1757            0
1758        );
1759
1760        // Process successful job
1761        let success_job = Job {
1762            job_id: Uuid::new_v4(),
1763            tool_name: "success_tool".to_string(),
1764            params: serde_json::json!({}),
1765            idempotency_key: None,
1766            max_retries: 0,
1767            retry_count: 0,
1768        };
1769        worker.process_job(success_job).await.unwrap();
1770        assert_eq!(
1771            metrics
1772                .jobs_succeeded
1773                .load(std::sync::atomic::Ordering::Relaxed),
1774            1
1775        );
1776
1777        // Process failing job with retries
1778        let fail_job = Job {
1779            job_id: Uuid::new_v4(),
1780            tool_name: "fail_tool".to_string(),
1781            params: serde_json::json!({}),
1782            idempotency_key: None,
1783            max_retries: 2,
1784            retry_count: 0,
1785        };
1786        worker.process_job(fail_job).await.unwrap();
1787        assert_eq!(
1788            metrics
1789                .jobs_failed
1790                .load(std::sync::atomic::Ordering::Relaxed),
1791            1
1792        );
1793        assert_eq!(
1794            metrics
1795                .jobs_retried
1796                .load(std::sync::atomic::Ordering::Relaxed),
1797            1
1798        );
1799    }
1800
1801    #[tokio::test]
1802    async fn test_execution_config_default() {
1803        let config = ExecutionConfig::default();
1804        assert_eq!(config.max_concurrency, 10);
1805        assert_eq!(config.default_timeout, Duration::from_secs(30));
1806        assert_eq!(config.max_retries, 3);
1807        assert_eq!(config.initial_retry_delay, Duration::from_millis(100));
1808        assert_eq!(config.max_retry_delay, Duration::from_secs(10));
1809        assert_eq!(config.idempotency_ttl, Duration::from_secs(3600));
1810        assert!(config.enable_idempotency);
1811    }
1812
1813    #[tokio::test]
1814    async fn test_resource_limits() {
1815        let limits = ResourceLimits::default()
1816            .with_limit("test_resource", 5)
1817            .with_limit("another_resource", 10);
1818
1819        assert!(limits.get_semaphore("test_resource").is_some());
1820        assert!(limits.get_semaphore("another_resource").is_some());
1821        assert!(limits.get_semaphore("nonexistent").is_none());
1822
1823        let default_limits = ResourceLimits::default();
1824        assert!(default_limits.get_semaphore("solana_rpc").is_some());
1825        assert!(default_limits.get_semaphore("evm_rpc").is_some());
1826        assert!(default_limits.get_semaphore("http_api").is_some());
1827    }
1828
1829    #[tokio::test]
1830    async fn test_worker_metrics_default() {
1831        let metrics = WorkerMetrics::default();
1832        assert_eq!(
1833            metrics
1834                .jobs_processed
1835                .load(std::sync::atomic::Ordering::Relaxed),
1836            0
1837        );
1838        assert_eq!(
1839            metrics
1840                .jobs_succeeded
1841                .load(std::sync::atomic::Ordering::Relaxed),
1842            0
1843        );
1844        assert_eq!(
1845            metrics
1846                .jobs_failed
1847                .load(std::sync::atomic::Ordering::Relaxed),
1848            0
1849        );
1850        assert_eq!(
1851            metrics
1852                .jobs_retried
1853                .load(std::sync::atomic::Ordering::Relaxed),
1854            0
1855        );
1856    }
1857
1858    #[tokio::test]
1859    async fn test_tool_worker_clone() {
1860        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1861            ExecutionConfig::default(),
1862            test_app_context(),
1863        );
1864        let tool = Arc::new(MockTool {
1865            name: "test_tool".to_string(),
1866            should_fail: false,
1867        });
1868        worker.register_tool(tool).await;
1869
1870        let cloned_worker = worker.clone();
1871
1872        // Both workers should have access to the same tools
1873        assert_eq!(worker.tools.len(), 1);
1874        assert_eq!(cloned_worker.tools.len(), 1);
1875
1876        // Test processing with cloned worker
1877        let job = Job {
1878            job_id: Uuid::new_v4(),
1879            tool_name: "test_tool".to_string(),
1880            params: serde_json::json!({}),
1881            idempotency_key: None,
1882            max_retries: 0,
1883            retry_count: 0,
1884        };
1885
1886        let result = cloned_worker.process_job(job).await.unwrap();
1887        assert!(result.is_success());
1888    }
1889
1890    #[tokio::test]
1891    async fn test_tool_worker_run_loop() {
1892        use crate::queue::InMemoryJobQueue;
1893
1894        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1895            ExecutionConfig::default(),
1896            test_app_context(),
1897        );
1898        let tool = Arc::new(MockTool {
1899            name: "test_tool".to_string(),
1900            should_fail: false,
1901        });
1902        worker.register_tool(tool).await;
1903
1904        let queue = Arc::new(InMemoryJobQueue::new());
1905
1906        // Enqueue a job
1907        let job = Job {
1908            job_id: Uuid::new_v4(),
1909            tool_name: "test_tool".to_string(),
1910            params: serde_json::json!({}),
1911            idempotency_key: None,
1912            max_retries: 0,
1913            retry_count: 0,
1914        };
1915        queue.enqueue(job).await.unwrap();
1916
1917        // Start the worker run loop with cancellation token
1918        let worker_clone = worker.clone();
1919        let queue_clone = queue.clone();
1920        let cancellation_token = tokio_util::sync::CancellationToken::new();
1921        let token_clone = cancellation_token.clone();
1922
1923        let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
1924
1925        // Give it time to process the job
1926        tokio::time::sleep(Duration::from_millis(50)).await;
1927
1928        // Signal shutdown
1929        cancellation_token.cancel();
1930
1931        // Check that metrics were updated
1932        let metrics = worker.metrics();
1933        assert!(
1934            metrics
1935                .jobs_processed
1936                .load(std::sync::atomic::Ordering::Relaxed)
1937                > 0
1938        );
1939
1940        handle.await.unwrap().unwrap();
1941    }
1942
1943    #[tokio::test]
1944    async fn test_idempotency_cache_hit() {
1945        let store = Arc::new(InMemoryIdempotencyStore::new());
1946        let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
1947            .with_idempotency_store(store.clone());
1948
1949        let tool = Arc::new(MockTool {
1950            name: "test_tool".to_string(),
1951            should_fail: false,
1952        });
1953        worker.register_tool(tool).await;
1954
1955        // Pre-populate the cache
1956        let cached_result = JobResult::Success {
1957            value: serde_json::json!({"cached": true}),
1958            tx_hash: Some("cached_tx_hash".to_string()),
1959        };
1960        store
1961            .set(
1962                "cache_key",
1963                Arc::new(cached_result),
1964                Duration::from_secs(60),
1965            )
1966            .await
1967            .unwrap();
1968
1969        let job = Job {
1970            job_id: Uuid::new_v4(),
1971            tool_name: "test_tool".to_string(),
1972            params: serde_json::json!({}),
1973            idempotency_key: Some("cache_key".to_string()),
1974            max_retries: 0,
1975            retry_count: 0,
1976        };
1977
1978        // Should return cached result without executing the tool
1979        let result = worker.process_job(job).await.unwrap();
1980        match result {
1981            JobResult::Success { value, tx_hash } => {
1982                assert_eq!(value, serde_json::json!({"cached": true}));
1983                assert_eq!(tx_hash, Some("cached_tx_hash".to_string()));
1984            }
1985            _ => panic!("Expected cached success result"),
1986        }
1987    }
1988
1989    #[tokio::test]
1990    async fn test_tool_worker_unknown_error_fallback() {
1991        // Create a worker with a job that will fail with max retries
1992        // but have no last_error set to trigger the "Unknown error" fallback
1993        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1994            ExecutionConfig::default(),
1995            test_app_context(),
1996        );
1997
1998        // Don't register any tool - this will cause tool not found error
1999        let job = Job {
2000            job_id: Uuid::new_v4(),
2001            tool_name: "nonexistent_tool".to_string(),
2002            params: serde_json::json!({}),
2003            idempotency_key: None,
2004            max_retries: 0,
2005            retry_count: 0,
2006        };
2007
2008        // This should fail with tool not found, not unknown error
2009        let result = worker.process_job(job).await;
2010        assert!(result.is_err());
2011
2012        // The unknown error fallback is actually hard to trigger in normal flow
2013        // It would only happen if there's a bug in the retry logic where
2014        // attempts > max_retries but last_error is None
2015    }
2016
2017    #[tokio::test]
2018    async fn test_run_loop_error_handling() {
2019        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2020            ExecutionConfig::default(),
2021            test_app_context(),
2022        );
2023        let error_queue = Arc::new(ErrorQueue::default());
2024
2025        // Start run loop with cancellation token
2026        let worker_clone = worker.clone();
2027        let queue_clone = error_queue.clone();
2028        let cancellation_token = tokio_util::sync::CancellationToken::new();
2029        let token_clone = cancellation_token.clone();
2030
2031        let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2032
2033        // Give it time to encounter the error
2034        tokio::time::sleep(Duration::from_millis(50)).await;
2035
2036        // Signal shutdown
2037        cancellation_token.cancel();
2038        handle.await.unwrap().unwrap();
2039    }
2040
2041    #[tokio::test]
2042    async fn test_run_loop_empty_queue() {
2043        use crate::queue::InMemoryJobQueue;
2044
2045        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2046            ExecutionConfig::default(),
2047            test_app_context(),
2048        );
2049        let queue = Arc::new(InMemoryJobQueue::new());
2050
2051        // Start run loop with cancellation token - should encounter Ok(None) from empty queue
2052        let worker_clone = worker.clone();
2053        let queue_clone = queue.clone();
2054        let cancellation_token = tokio_util::sync::CancellationToken::new();
2055        let token_clone = cancellation_token.clone();
2056
2057        let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2058
2059        // Give it time to process
2060        tokio::time::sleep(Duration::from_millis(50)).await;
2061
2062        // Signal shutdown
2063        cancellation_token.cancel();
2064        handle.await.unwrap().unwrap();
2065    }
2066
2067    #[tokio::test]
2068    async fn test_run_loop_with_failing_jobs() {
2069        use crate::queue::InMemoryJobQueue;
2070
2071        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2072            ExecutionConfig::default(),
2073            test_app_context(),
2074        );
2075        let fail_tool = Arc::new(MockTool {
2076            name: "fail_tool".to_string(),
2077            should_fail: true,
2078        });
2079        worker.register_tool(fail_tool).await;
2080
2081        let queue = Arc::new(InMemoryJobQueue::new());
2082
2083        // Enqueue a failing job
2084        let job = Job {
2085            job_id: Uuid::new_v4(),
2086            tool_name: "fail_tool".to_string(),
2087            params: serde_json::json!({}),
2088            idempotency_key: None,
2089            max_retries: 0,
2090            retry_count: 0,
2091        };
2092        queue.enqueue(job).await.unwrap();
2093
2094        // Start run loop with cancellation token
2095        let worker_clone = worker.clone();
2096        let queue_clone = queue.clone();
2097        let cancellation_token = tokio_util::sync::CancellationToken::new();
2098        let token_clone = cancellation_token.clone();
2099
2100        let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2101
2102        // Give it time to process the failing job
2103        tokio::time::sleep(Duration::from_millis(50)).await;
2104
2105        // Signal shutdown
2106        cancellation_token.cancel();
2107        handle.await.unwrap().unwrap();
2108
2109        // Verify metrics were updated
2110        let metrics = worker.metrics();
2111        assert!(
2112            metrics
2113                .jobs_processed
2114                .load(std::sync::atomic::Ordering::Relaxed)
2115                > 0
2116        );
2117    }
2118
2119    #[tokio::test]
2120    async fn test_comprehensive_metrics_tracking() {
2121        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2122            ExecutionConfig::default(),
2123            test_app_context(),
2124        );
2125        let success_tool = Arc::new(MockTool {
2126            name: "success_tool".to_string(),
2127            should_fail: false,
2128        });
2129        let fail_tool = Arc::new(RetriableMockTool {
2130            name: "fail_tool".to_string(),
2131        });
2132        worker.register_tool(success_tool).await;
2133        worker.register_tool(fail_tool).await;
2134
2135        let metrics = worker.metrics();
2136
2137        // Process a successful job
2138        let success_job = Job {
2139            job_id: Uuid::new_v4(),
2140            tool_name: "success_tool".to_string(),
2141            params: serde_json::json!({}),
2142            idempotency_key: None,
2143            max_retries: 0,
2144            retry_count: 0,
2145        };
2146        let result = worker.process_job(success_job).await.unwrap();
2147        assert!(result.is_success());
2148
2149        // Verify jobs_succeeded was incremented (line 232)
2150        assert_eq!(
2151            metrics
2152                .jobs_succeeded
2153                .load(std::sync::atomic::Ordering::Relaxed),
2154            1
2155        );
2156
2157        // Process a failing job with retries
2158        let fail_job = Job {
2159            job_id: Uuid::new_v4(),
2160            tool_name: "fail_tool".to_string(),
2161            params: serde_json::json!({}),
2162            idempotency_key: None,
2163            max_retries: 2,
2164            retry_count: 0,
2165        };
2166        let result = worker.process_job(fail_job).await.unwrap();
2167        assert!(!result.is_success());
2168
2169        // Verify jobs_retried was incremented (line 250)
2170        assert_eq!(
2171            metrics
2172                .jobs_retried
2173                .load(std::sync::atomic::Ordering::Relaxed),
2174            1
2175        );
2176
2177        // Verify jobs_failed was incremented (line 264)
2178        assert_eq!(
2179            metrics
2180                .jobs_failed
2181                .load(std::sync::atomic::Ordering::Relaxed),
2182            1
2183        );
2184    }
2185
2186    #[tokio::test]
2187    async fn test_debug_logging_in_retries() {
2188        let config = ExecutionConfig {
2189            initial_retry_delay: Duration::from_millis(1),
2190            ..Default::default()
2191        };
2192
2193        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
2194        let tool = Arc::new(MockTool {
2195            name: "retry_tool".to_string(),
2196            should_fail: true,
2197        });
2198        worker.register_tool(tool).await;
2199
2200        let job = Job {
2201            job_id: Uuid::new_v4(),
2202            tool_name: "retry_tool".to_string(),
2203            params: serde_json::json!({}),
2204            idempotency_key: None,
2205            max_retries: 1,
2206            retry_count: 0,
2207        };
2208
2209        // This should trigger debug logging in the retry loop (lines 205-208)
2210        let _result = worker.process_job(job).await.unwrap();
2211    }
2212
2213    #[tokio::test]
2214    async fn test_worker_startup_logging() {
2215        use crate::queue::InMemoryJobQueue;
2216
2217        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2218            ExecutionConfig::default(),
2219            test_app_context(),
2220        );
2221        let tool = Arc::new(MockTool {
2222            name: "startup_tool".to_string(),
2223            should_fail: false,
2224        });
2225        worker.register_tool(tool).await;
2226
2227        let queue = Arc::new(InMemoryJobQueue::new());
2228
2229        // This should trigger the startup info log
2230        let worker_clone = worker.clone();
2231        let queue_clone = queue.clone();
2232        let cancellation_token = tokio_util::sync::CancellationToken::new();
2233        let token_clone = cancellation_token.clone();
2234
2235        let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2236
2237        // Give it time to start up
2238        tokio::time::sleep(Duration::from_millis(10)).await;
2239
2240        // Signal shutdown
2241        cancellation_token.cancel();
2242        handle.await.unwrap().unwrap();
2243    }
2244
2245    #[tokio::test]
2246    async fn test_timeout_specific_error() {
2247        let config = ExecutionConfig {
2248            default_timeout: Duration::from_millis(1), // Very short timeout
2249            ..Default::default()
2250        };
2251
2252        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
2253        let tool = Arc::new(SlowMockTool {
2254            name: "timeout_tool".to_string(),
2255            delay: Duration::from_millis(50),
2256        });
2257        worker.register_tool(tool).await;
2258
2259        let job = Job {
2260            job_id: Uuid::new_v4(),
2261            tool_name: "timeout_tool".to_string(),
2262            params: serde_json::json!({}),
2263            idempotency_key: None,
2264            max_retries: 0,
2265            retry_count: 0,
2266        };
2267
2268        // This should specifically hit the timeout error assignment (line 240)
2269        let result = worker.process_job(job).await.unwrap();
2270        match result {
2271            JobResult::Failure { error, .. } => {
2272                assert!(error.to_string().to_lowercase().contains("timed out"));
2273            }
2274            _ => panic!("Expected timeout failure"),
2275        }
2276    }
2277
2278    #[tokio::test]
2279    async fn test_resource_matching_edge_cases() {
2280        let limits = ResourceLimits::default()
2281            .with_limit("solana_rpc", 1)
2282            .with_limit("evm_rpc", 1);
2283
2284        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2285            ExecutionConfig::default(),
2286            test_app_context(),
2287        )
2288        .with_resource_limits(limits);
2289
2290        // Register tools with different name patterns to exercise line 278
2291        let solana_tool = Arc::new(MockTool {
2292            name: "solana_balance".to_string(), // Should match solana_ pattern
2293            should_fail: false,
2294        });
2295        let evm_tool = Arc::new(MockTool {
2296            name: "evm_call".to_string(), // Should match evm_ pattern
2297            should_fail: false,
2298        });
2299        let web_tool = Arc::new(MockTool {
2300            name: "web_fetch".to_string(), // Should match web_ pattern
2301            should_fail: false,
2302        });
2303        let other_tool = Arc::new(MockTool {
2304            name: "other_operation".to_string(), // Should use default semaphore
2305            should_fail: false,
2306        });
2307
2308        worker.register_tool(solana_tool).await;
2309        worker.register_tool(evm_tool).await;
2310        worker.register_tool(web_tool).await;
2311        worker.register_tool(other_tool).await;
2312
2313        // Process jobs to exercise the acquire_semaphore method
2314        let job1 = Job {
2315            job_id: Uuid::new_v4(),
2316            tool_name: "solana_balance".to_string(),
2317            params: serde_json::json!({}),
2318            idempotency_key: None,
2319            max_retries: 0,
2320            retry_count: 0,
2321        };
2322
2323        let _result = worker.process_job(job1).await.unwrap();
2324
2325        let job2 = Job {
2326            job_id: Uuid::new_v4(),
2327            tool_name: "other_operation".to_string(),
2328            params: serde_json::json!({}),
2329            idempotency_key: None,
2330            max_retries: 0,
2331            retry_count: 0,
2332        };
2333
2334        let _result = worker.process_job(job2).await.unwrap();
2335    }
2336
2337    #[tokio::test]
2338    async fn test_execution_config_custom_values() {
2339        let config = ExecutionConfig {
2340            max_concurrency: 25,
2341            default_timeout: Duration::from_secs(60),
2342            max_retries: 5,
2343            initial_retry_delay: Duration::from_millis(200),
2344            max_retry_delay: Duration::from_secs(20),
2345            idempotency_ttl: Duration::from_secs(7200),
2346            enable_idempotency: false,
2347        };
2348
2349        assert_eq!(config.max_concurrency, 25);
2350        assert_eq!(config.default_timeout, Duration::from_secs(60));
2351        assert_eq!(config.max_retries, 5);
2352        assert_eq!(config.initial_retry_delay, Duration::from_millis(200));
2353        assert_eq!(config.max_retry_delay, Duration::from_secs(20));
2354        assert_eq!(config.idempotency_ttl, Duration::from_secs(7200));
2355        assert!(!config.enable_idempotency);
2356    }
2357
2358    #[tokio::test]
2359    async fn test_resource_limits_new() {
2360        let limits = ResourceLimits::default();
2361        assert!(limits.get_semaphore("any_resource").is_none());
2362    }
2363
2364    #[tokio::test]
2365    async fn test_resource_limits_with_limit_chaining() {
2366        let limits = ResourceLimits::default()
2367            .with_limit("first", 1)
2368            .with_limit("second", 2)
2369            .with_limit("third", 3);
2370
2371        assert!(limits.get_semaphore("first").is_some());
2372        assert!(limits.get_semaphore("second").is_some());
2373        assert!(limits.get_semaphore("third").is_some());
2374        assert!(limits.get_semaphore("fourth").is_none());
2375    }
2376
2377    #[tokio::test]
2378    async fn test_tool_worker_new_custom_config() {
2379        let config = ExecutionConfig {
2380            max_concurrency: 5,
2381            ..Default::default()
2382        };
2383        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
2384
2385        // Verify worker was created with custom concurrency
2386        // We can't directly access default_semaphore, but we can verify through the config
2387        assert_eq!(worker.config.max_concurrency, 5);
2388    }
2389
2390    #[tokio::test]
2391    async fn test_tool_worker_register_tool_replacement() {
2392        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2393            ExecutionConfig::default(),
2394            test_app_context(),
2395        );
2396
2397        // Register initial tool
2398        let tool1 = Arc::new(MockTool {
2399            name: "test_tool".to_string(),
2400            should_fail: false,
2401        });
2402        worker.register_tool(tool1).await;
2403        assert_eq!(worker.tools.len(), 1);
2404
2405        // Register tool with same name - should replace
2406        let tool2 = Arc::new(MockTool {
2407            name: "test_tool".to_string(),
2408            should_fail: true,
2409        });
2410        worker.register_tool(tool2).await;
2411        assert_eq!(worker.tools.len(), 1);
2412
2413        // Verify the new tool was used (it should fail)
2414        let job = Job {
2415            job_id: Uuid::new_v4(),
2416            tool_name: "test_tool".to_string(),
2417            params: serde_json::json!({}),
2418            idempotency_key: None,
2419            max_retries: 0,
2420            retry_count: 0,
2421        };
2422
2423        let result = worker.process_job(job).await.unwrap();
2424        assert!(!result.is_success());
2425    }
2426
2427    #[tokio::test]
2428    async fn test_idempotency_store_error_handling() {
2429        let store = Arc::new(ErrorIdempotencyStore::default());
2430        let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
2431            .with_idempotency_store(store.clone());
2432
2433        let tool = Arc::new(MockTool {
2434            name: "test_tool".to_string(),
2435            should_fail: false,
2436        });
2437        worker.register_tool(tool).await;
2438
2439        let job = Job {
2440            job_id: Uuid::new_v4(),
2441            tool_name: "test_tool".to_string(),
2442            params: serde_json::json!({}),
2443            idempotency_key: Some("test_key".to_string()),
2444            max_retries: 0,
2445            retry_count: 0,
2446        };
2447
2448        // Should fail with idempotency store error
2449        let result = worker.process_job(job).await;
2450        assert!(result.is_err());
2451        assert!(result.unwrap_err().to_string().contains("Store error"));
2452    }
2453
2454    #[tokio::test]
2455    async fn test_no_idempotency_key_no_cache_check() {
2456        let store = Arc::new(InMemoryIdempotencyStore::new());
2457        let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
2458            .with_idempotency_store(store.clone());
2459
2460        let tool = Arc::new(MockTool {
2461            name: "test_tool".to_string(),
2462            should_fail: false,
2463        });
2464        worker.register_tool(tool).await;
2465
2466        let job = Job {
2467            job_id: Uuid::new_v4(),
2468            tool_name: "test_tool".to_string(),
2469            params: serde_json::json!({}),
2470            idempotency_key: None, // No idempotency key
2471            max_retries: 0,
2472            retry_count: 0,
2473        };
2474
2475        // Should execute normally without cache check
2476        let result = worker.process_job(job).await.unwrap();
2477        assert!(result.is_success());
2478    }
2479
2480    #[tokio::test]
2481    async fn test_no_idempotency_store_no_cache() {
2482        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2483            ExecutionConfig::default(),
2484            test_app_context(),
2485        );
2486        // Don't set idempotency store
2487
2488        let tool = Arc::new(MockTool {
2489            name: "test_tool".to_string(),
2490            should_fail: false,
2491        });
2492        worker.register_tool(tool).await;
2493
2494        let job = Job {
2495            job_id: Uuid::new_v4(),
2496            tool_name: "test_tool".to_string(),
2497            params: serde_json::json!({}),
2498            idempotency_key: Some("test_key".to_string()),
2499            max_retries: 0,
2500            retry_count: 0,
2501        };
2502
2503        // Should execute normally without cache check
2504        let result = worker.process_job(job).await.unwrap();
2505        assert!(result.is_success());
2506    }
2507
2508    #[tokio::test]
2509    async fn test_cache_result_with_no_idempotency_key() {
2510        let store = Arc::new(InMemoryIdempotencyStore::new());
2511        let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
2512            .with_idempotency_store(store.clone());
2513
2514        let tool = Arc::new(MockTool {
2515            name: "test_tool".to_string(),
2516            should_fail: false,
2517        });
2518        worker.register_tool(tool).await;
2519
2520        let job = Job {
2521            job_id: Uuid::new_v4(),
2522            tool_name: "test_tool".to_string(),
2523            params: serde_json::json!({}),
2524            idempotency_key: None, // No idempotency key
2525            max_retries: 0,
2526            retry_count: 0,
2527        };
2528
2529        // Should execute and not attempt to cache
2530        let result = worker.process_job(job).await.unwrap();
2531        assert!(result.is_success());
2532    }
2533
2534    #[tokio::test]
2535    async fn test_cache_result_with_idempotency_disabled() {
2536        let config = ExecutionConfig {
2537            enable_idempotency: false,
2538            ..Default::default()
2539        };
2540
2541        let store = Arc::new(InMemoryIdempotencyStore::new());
2542        let worker =
2543            ToolWorker::new(config, test_app_context()).with_idempotency_store(store.clone());
2544
2545        let tool = Arc::new(MockTool {
2546            name: "test_tool".to_string(),
2547            should_fail: false,
2548        });
2549        worker.register_tool(tool).await;
2550
2551        let job = Job {
2552            job_id: Uuid::new_v4(),
2553            tool_name: "test_tool".to_string(),
2554            params: serde_json::json!({}),
2555            idempotency_key: Some("test_key".to_string()),
2556            max_retries: 0,
2557            retry_count: 0,
2558        };
2559
2560        // Should execute but not cache due to disabled idempotency
2561        let result = worker.process_job(job).await.unwrap();
2562        assert!(result.is_success());
2563
2564        // Verify nothing was cached
2565        assert!(store.get("test_key").await.unwrap().is_none());
2566    }
2567
2568    #[tokio::test]
2569    async fn test_cache_result_with_no_store() {
2570        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2571            ExecutionConfig::default(),
2572            test_app_context(),
2573        );
2574        // Don't set idempotency store
2575
2576        let tool = Arc::new(MockTool {
2577            name: "test_tool".to_string(),
2578            should_fail: false,
2579        });
2580        worker.register_tool(tool).await;
2581
2582        let job = Job {
2583            job_id: Uuid::new_v4(),
2584            tool_name: "test_tool".to_string(),
2585            params: serde_json::json!({}),
2586            idempotency_key: Some("test_key".to_string()),
2587            max_retries: 0,
2588            retry_count: 0,
2589        };
2590
2591        // Should execute but not cache due to no store
2592        let result = worker.process_job(job).await.unwrap();
2593        assert!(result.is_success());
2594    }
2595
2596    // Tests for create_backoff_strategy and wait_with_backoff removed
2597    // These methods are now handled by the unified retry helper in retry.rs
2598
2599    #[tokio::test]
2600    async fn test_acquire_semaphore_web_prefix() {
2601        let limits = ResourceLimits::default().with_limit("http_api", 1);
2602
2603        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2604            ExecutionConfig::default(),
2605            test_app_context(),
2606        )
2607        .with_resource_limits(limits);
2608
2609        // Test web_ prefix maps to http_api
2610        let _permit = worker.acquire_semaphore("web_fetch").await.unwrap();
2611    }
2612
2613    #[tokio::test]
2614    async fn test_acquire_semaphore_no_matching_resource() {
2615        let limits = ResourceLimits::default(); // Empty limits
2616
2617        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2618            ExecutionConfig::default(),
2619            test_app_context(),
2620        )
2621        .with_resource_limits(limits);
2622
2623        // Should fall back to default semaphore
2624        let _permit = worker.acquire_semaphore("solana_test").await.unwrap();
2625        let _permit = worker.acquire_semaphore("random_tool").await.unwrap();
2626    }
2627
2628    #[tokio::test]
2629    async fn test_run_loop_job_processing_success_logging() {
2630        use crate::queue::InMemoryJobQueue;
2631
2632        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2633            ExecutionConfig::default(),
2634            test_app_context(),
2635        );
2636        let tool = Arc::new(MockTool {
2637            name: "log_test_tool".to_string(),
2638            should_fail: false,
2639        });
2640        worker.register_tool(tool).await;
2641
2642        let queue = Arc::new(InMemoryJobQueue::new());
2643
2644        // Enqueue a successful job
2645        let job = Job {
2646            job_id: Uuid::new_v4(),
2647            tool_name: "log_test_tool".to_string(),
2648            params: serde_json::json!({}),
2649            idempotency_key: None,
2650            max_retries: 0,
2651            retry_count: 0,
2652        };
2653        queue.enqueue(job).await.unwrap();
2654
2655        let worker_clone = worker.clone();
2656        let queue_clone = queue.clone();
2657        let cancellation_token = tokio_util::sync::CancellationToken::new();
2658        let token_clone = cancellation_token.clone();
2659
2660        let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2661
2662        // Give it time to process
2663        tokio::time::sleep(Duration::from_millis(50)).await;
2664
2665        cancellation_token.cancel();
2666        handle.await.unwrap().unwrap();
2667
2668        // Check that jobs_processed was incremented (line 1274)
2669        let metrics = worker.metrics();
2670        assert!(
2671            metrics
2672                .jobs_processed
2673                .load(std::sync::atomic::Ordering::Relaxed)
2674                > 0
2675        );
2676    }
2677
2678    #[tokio::test]
2679    async fn test_run_loop_worker_error_logging() {
2680        use crate::queue::InMemoryJobQueue;
2681
2682        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2683            ExecutionConfig::default(),
2684            test_app_context(),
2685        );
2686        // Don't register any tools to cause WorkerError
2687
2688        let queue = Arc::new(InMemoryJobQueue::new());
2689
2690        // Enqueue a job for non-existent tool
2691        let job = Job {
2692            job_id: Uuid::new_v4(),
2693            tool_name: "nonexistent_tool".to_string(),
2694            params: serde_json::json!({}),
2695            idempotency_key: None,
2696            max_retries: 0,
2697            retry_count: 0,
2698        };
2699        queue.enqueue(job).await.unwrap();
2700
2701        let worker_clone = worker.clone();
2702        let queue_clone = queue.clone();
2703        let cancellation_token = tokio_util::sync::CancellationToken::new();
2704        let token_clone = cancellation_token.clone();
2705
2706        let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2707
2708        // Give it time to process and hit the error path
2709        tokio::time::sleep(Duration::from_millis(50)).await;
2710
2711        cancellation_token.cancel();
2712        handle.await.unwrap().unwrap();
2713    }
2714
2715    #[tokio::test]
2716    async fn test_semaphore_acquisition_error() {
2717        // This test is tricky since semaphore acquisition rarely fails
2718        // But we can test the error path by creating a scenario where it might
2719        let config = ExecutionConfig {
2720            max_concurrency: 1,
2721            ..Default::default()
2722        };
2723
2724        let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
2725        let tool = Arc::new(MockTool {
2726            name: "test_tool".to_string(),
2727            should_fail: false,
2728        });
2729        worker.register_tool(tool).await;
2730
2731        let job = Job {
2732            job_id: Uuid::new_v4(),
2733            tool_name: "test_tool".to_string(),
2734            params: serde_json::json!({}),
2735            idempotency_key: None,
2736            max_retries: 0,
2737            retry_count: 0,
2738        };
2739
2740        // Normal case should work
2741        let result = worker.process_job(job).await;
2742        assert!(result.is_ok());
2743    }
2744
2745    #[test]
2746    fn test_tool_trait_description_method() {
2747        let tool = MockTool {
2748            name: "test".to_string(),
2749            should_fail: false,
2750        };
2751
2752        // Test that description method returns expected value
2753        assert_eq!(tool.description(), "");
2754    }
2755
2756    #[test]
2757    fn test_mock_tool_name_method() {
2758        let tool = MockTool {
2759            name: "test_name".to_string(),
2760            should_fail: false,
2761        };
2762
2763        assert_eq!(tool.name(), "test_name");
2764    }
2765
2766    #[test]
2767    fn test_slow_mock_tool_name_method() {
2768        let tool = SlowMockTool {
2769            name: "slow_test".to_string(),
2770            delay: Duration::from_millis(1),
2771        };
2772
2773        assert_eq!(tool.name(), "slow_test");
2774        assert_eq!(tool.description(), "");
2775    }
2776
2777    #[test]
2778    fn test_error_queue_new() {
2779        let _queue = ErrorQueue::default();
2780        // Just testing construction
2781    }
2782
2783    #[derive(Default)]
2784    struct ErrorIdempotencyStore {
2785        _phantom: std::marker::PhantomData<()>,
2786    }
2787
2788    #[async_trait]
2789    impl crate::idempotency::IdempotencyStore for ErrorIdempotencyStore {
2790        async fn get(&self, _key: &str) -> anyhow::Result<Option<Arc<JobResult>>> {
2791            Err(anyhow::anyhow!("Store error"))
2792        }
2793
2794        async fn set(
2795            &self,
2796            _key: &str,
2797            _result: Arc<JobResult>,
2798            _ttl: Duration,
2799        ) -> anyhow::Result<()> {
2800            Err(anyhow::anyhow!("Store error"))
2801        }
2802
2803        async fn remove(&self, _key: &str) -> anyhow::Result<()> {
2804            Err(anyhow::anyhow!("Store error"))
2805        }
2806    }
2807
2808    struct SlowMockTool {
2809        name: String,
2810        delay: Duration,
2811    }
2812
2813    #[async_trait]
2814    impl Tool for SlowMockTool {
2815        async fn execute(
2816            &self,
2817            _params: serde_json::Value,
2818            _context: &crate::provider::ApplicationContext,
2819        ) -> Result<JobResult, ToolError> {
2820            tokio::time::sleep(self.delay).await;
2821            Ok(JobResult::Success {
2822                value: serde_json::json!({"result": "slow_success"}),
2823                tx_hash: None,
2824            })
2825        }
2826
2827        fn name(&self) -> &str {
2828            &self.name
2829        }
2830
2831        fn description(&self) -> &str {
2832            ""
2833        }
2834
2835        fn schema(&self) -> serde_json::Value {
2836            serde_json::json!({
2837                "type": "object",
2838                "additionalProperties": true
2839            })
2840        }
2841    }
2842
2843    #[derive(Default)]
2844    struct ErrorQueue {
2845        _phantom: std::marker::PhantomData<()>,
2846    }
2847
2848    #[async_trait]
2849    impl crate::queue::JobQueue for ErrorQueue {
2850        async fn enqueue(&self, _job: crate::jobs::Job) -> anyhow::Result<()> {
2851            Err(anyhow::anyhow!("Queue error"))
2852        }
2853
2854        async fn dequeue(&self) -> anyhow::Result<Option<crate::jobs::Job>> {
2855            Err(anyhow::anyhow!("Dequeue error"))
2856        }
2857
2858        async fn dequeue_with_timeout(
2859            &self,
2860            _timeout: Duration,
2861        ) -> anyhow::Result<Option<crate::jobs::Job>> {
2862            Err(anyhow::anyhow!("Dequeue timeout error"))
2863        }
2864
2865        async fn len(&self) -> anyhow::Result<usize> {
2866            Err(anyhow::anyhow!("Len error"))
2867        }
2868    }
2869}