riglr_core/tool.rs
1//! Tool orchestration and execution framework.
2//!
3//! Provides the ToolWorker abstraction for coordinating tool execution
4//! with proper error handling, logging, and integration with rig agents.
5//!
6//! This module provides the core abstractions for executing tools in a resilient,
7//! asynchronous manner with support for retries, timeouts, and job queuing.
8//!
9//! ## Overview
10//!
11//! The tool execution system is designed around several key components:
12//!
13//! - **[`Tool`]** - The core trait defining executable tools
14//! - **[`ToolWorker`]** - The execution engine that processes jobs using registered tools
15//! - **[`ExecutionConfig`]** - Configuration for retry behavior, timeouts, and resource limits
16//! - **[`ResourceLimits`]** - Fine-grained control over resource usage per tool type
17//! - **[`WorkerMetrics`]** - Performance and operational metrics for monitoring
18//!
19//! ## Key Features
20//!
21//! ### Resilient Execution
22//! - **Exponential backoff** retry logic with configurable delays
23//! - **Timeout protection** to prevent hanging operations
24//! - **Error classification** to distinguish retriable vs permanent failures
25//! - **Idempotency support** to safely retry operations
26//!
27//! ### Resource Management
28//! - **Semaphore-based limits** for different resource types (RPC calls, HTTP requests)
29//! - **Concurrent execution** with configurable limits per resource
30//! - **Tool-specific resource mapping** based on naming patterns
31//!
32//! ### Monitoring and Observability
33//! - **Comprehensive metrics** tracking success/failure rates
34//! - **Structured logging** with correlation IDs
35//! - **Performance monitoring** with execution timings
36//!
37//! ## Examples
38//!
39//! ### Basic Tool Implementation
40//!
41//! ```ignore
42//! use riglr_core::{Tool, JobResult};
43//! use async_trait::async_trait;
44//! use serde_json::Value;
45//!
46//! struct SimpleCalculator;
47//!
48//! #[async_trait]
49//! impl Tool for SimpleCalculator {
50//! async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
51//! let a = params["a"].as_f64().ok_or("Missing parameter 'a'")?;
52//! let b = params["b"].as_f64().ok_or("Missing parameter 'b'")?;
53//! let operation = params["op"].as_str().unwrap_or("add");
54//!
55//! let result = match operation {
56//! "add" => a + b,
57//! "multiply" => a * b,
58//! "divide" => {
59//! if b == 0.0 {
60//! return Err(ToolError::permanent_string("Division by zero", "Cannot divide by zero"));
61//! }
62//! a / b
63//! }
64//! _ => return Err(ToolError::permanent_string("Unknown operation", "Operation not supported")),
65//! };
66//!
67//! Ok(JobResult::success(&result)?)
68//! }
69//!
70//! fn name(&self) -> &str {
71//! "calculator"
72//! }
73//! }
74//! ```
75//!
76//! ### Worker Setup and Execution
77//!
78//! ```ignore
79//! use riglr_core::{
80//! ToolWorker, ExecutionConfig, ResourceLimits, Job,
81//! idempotency::InMemoryIdempotencyStore
82//! };
83//! use std::{sync::Arc, time::Duration};
84//!
85//! # async fn example() -> anyhow::Result<()> {
86//! // Configure execution behavior
87//! let config = ExecutionConfig {
88//! max_concurrency: 20,
89//! default_timeout: Duration::from_secs(60),
90//! max_retries: 3,
91//! initial_retry_delay: Duration::from_millis(100),
92//! max_retry_delay: Duration::from_secs(30),
93//! enable_idempotency: true,
94//! ..Default::default()
95//! };
96//!
97//! // Set up resource limits
98//! let limits = ResourceLimits::default()
99//! .with_limit("solana_rpc", 5) // Limit Solana RPC calls
100//! .with_limit("evm_rpc", 10) // Limit EVM RPC calls
101//! .with_limit("http_api", 20); // Limit HTTP API calls
102//!
103//! // Create worker with idempotency store
104//! let store = Arc::new(InMemoryIdempotencyStore::new());
105//! let worker = ToolWorker::new(config)
106//! .with_idempotency_store(store)
107//! .with_resource_limits(limits);
108//!
109//! // Register tools
110//! # use riglr_core::{Tool, JobResult};
111//! # use async_trait::async_trait;
112//! # struct SimpleCalculator;
113//! # #[async_trait]
114//! # impl Tool for SimpleCalculator {
115//! # async fn execute(&self, _: serde_json::Value, _: &crate::provider::ApplicationContext) -> Result<JobResult, ToolError> {
116//! # Ok(JobResult::success(&42)?)
117//! # }
118//! # fn name(&self) -> &str { "calculator" }
119//! # }
120//! worker.register_tool(Arc::new(SimpleCalculator)).await;
121//!
122//! // Process a job
123//! let job = Job::new_idempotent(
124//! "calculator",
125//! &serde_json::json!({"a": 10, "b": 5, "op": "add"}),
126//! 3, // max retries
127//! "calc_10_plus_5" // idempotency key
128//! )?;
129//!
130//! let result = worker.process_job(job).await.unwrap();
131//! println!("Result: {:?}", result);
132//!
133//! // Get metrics
134//! let metrics = worker.metrics();
135//! println!("Jobs processed: {}", metrics.jobs_processed.load(std::sync::atomic::Ordering::Relaxed));
136//! # Ok(())
137//! # }
138//! ```
139
140use async_trait::async_trait;
141use dashmap::DashMap;
142use std::collections::HashMap;
143use std::sync::Arc;
144use std::time::Duration;
145use tokio::sync::{OwnedSemaphorePermit, Semaphore};
146use tracing::{debug, error, info, warn};
147
148use crate::error::{ToolError, WorkerError};
149use crate::idempotency::IdempotencyStore;
150use crate::jobs::{Job, JobResult};
151use crate::queue::JobQueue;
152use crate::signer::SignerContext;
153
154/// A trait defining the execution interface for tools.
155///
156/// This is compatible with `rig::Tool` and provides the foundation
157/// for executing tools within the riglr ecosystem. Tools represent
158/// individual operations that can be executed by the [`ToolWorker`].
159///
160/// ## Design Principles
161///
162/// - **Stateless**: Tools should not maintain internal state between executions
163/// - **Idempotent**: When possible, tools should be safe to retry
164/// - **Error-aware**: Tools should classify errors as retriable or permanent
165/// - **Resource-conscious**: Tools should handle rate limits and timeouts gracefully
166///
167/// ## Error Handling
168///
169/// Tools should carefully distinguish between different types of errors:
170///
171/// - **Retriable errors**: Network timeouts, rate limits, temporary service unavailability
172/// - **Permanent errors**: Invalid parameters, insufficient funds, authorization failures
173/// - **System errors**: Internal configuration issues, unexpected state
174///
175/// ## Examples
176///
177/// ### Basic Tool Implementation
178///
179/// ```ignore
180/// use riglr_core::{Tool, JobResult};
181/// use async_trait::async_trait;
182/// use serde_json::Value;
183///
184/// struct HttpFetcher;
185///
186/// #[async_trait]
187/// impl Tool for HttpFetcher {
188/// async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
189/// let url = params["url"].as_str()
190/// .ok_or("Missing required parameter: url")?;
191///
192/// // Mock HTTP client behavior for this example
193/// if url.starts_with("https://") {
194/// let body = r#"{"data": "success"}"#;
195/// Ok(JobResult::success(&serde_json::json!({"body": body}))?)
196/// } else if url.contains("error") {
197/// // Simulate client error
198/// Ok(JobResult::Failure {
199/// error: ToolError::permanent_string("Client error: Invalid URL")
200/// })
201/// } else if url.contains("timeout") {
202/// // Simulate timeout
203/// Ok(JobResult::Failure {
204/// error: ToolError::retriable_string("Request timeout: Connection timed out")
205/// })
206/// } else {
207/// // Simulate server error
208/// Ok(JobResult::Failure {
209/// error: ToolError::retriable_string("Server error: HTTP 503")
210/// })
211/// }
212/// }
213///
214/// fn name(&self) -> &str {
215/// "http_fetcher"
216/// }
217/// }
218/// ```
219///
220/// ### Tool with Resource-Aware Naming
221///
222/// ```ignore
223/// use riglr_core::{Tool, JobResult};
224/// use async_trait::async_trait;
225/// use serde_json::Value;
226///
227/// // The name starts with "solana_" which will use the solana_rpc resource limit
228/// struct SolanaBalanceChecker;
229///
230/// #[async_trait]
231/// impl Tool for SolanaBalanceChecker {
232/// async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
233/// let address = params["address"].as_str()
234/// .ok_or("Missing required parameter: address")?;
235///
236/// // This tool will be rate-limited by the "solana_rpc" resource limit
237/// // because its name starts with "solana_"
238///
239/// // Implementation would use Solana RPC client here...
240/// let balance = 1.5; // Mock balance
241///
242/// Ok(JobResult::success(&serde_json::json!({
243/// "address": address,
244/// "balance": balance,
245/// "unit": "SOL"
246/// }))?)
247/// }
248///
249/// fn name(&self) -> &str {
250/// "solana_balance_check" // Name prefix determines resource pool
251/// }
252/// }
253/// ```
254///
255/// ### Tool Using Signer Context
256///
257/// ```ignore
258/// use riglr_core::{Tool, JobResult, SignerContext};
259/// use async_trait::async_trait;
260/// use serde_json::Value;
261///
262/// struct TransferTool;
263///
264/// #[async_trait]
265/// impl Tool for TransferTool {
266/// async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
267/// // Check if we have a signer context
268/// if !SignerContext::is_available().await {
269/// return Ok(JobResult::Failure {
270/// error: ToolError::permanent_string("Transfer operations require a signer context")
271/// });
272/// }
273///
274/// let signer = SignerContext::current().await
275/// .map_err(|_| "Failed to get signer context")?;
276///
277/// let recipient = params["recipient"].as_str()
278/// .ok_or("Missing required parameter: recipient")?;
279/// let amount = params["amount"].as_f64()
280/// .ok_or("Missing required parameter: amount")?;
281///
282/// // Use the signer to perform the transfer...
283/// // This is just a mock implementation
284/// Ok(JobResult::success_with_tx(
285/// &serde_json::json!({
286/// "recipient": recipient,
287/// "amount": amount,
288/// "status": "completed"
289/// }),
290/// "mock_tx_hash_12345"
291/// )?)
292/// }
293///
294/// fn name(&self) -> &str {
295/// "transfer"
296/// }
297/// }
298/// ```
299#[async_trait]
300pub trait Tool: Send + Sync {
301 /// Execute the tool with the given parameters.
302 ///
303 /// This method performs the core work of the tool. It receives parameters
304 /// as a JSON value and should return a [`JobResult`] indicating success or failure.
305 ///
306 /// # Parameters
307 /// * `params` - JSON parameters passed to the tool. Tools should validate
308 /// and extract required parameters, returning appropriate errors
309 /// for missing or invalid data.
310 ///
311 /// # Returns
312 /// * `Ok(JobResult::Success)` - Tool executed successfully with result data
313 /// * `Ok(JobResult::Failure { retriable: true })` - Tool failed but can be retried
314 /// * `Ok(JobResult::Failure { retriable: false })` - Tool failed permanently
315 /// * `Err(Box<dyn Error>)` - Unexpected system error occurred
316 ///
317 /// # Error Classification Guidelines
318 ///
319 /// Return retriable failures (`JobResult::Failure { error: ToolError::retriable_string(...) }`) for:
320 /// - Network timeouts or connection errors
321 /// - Rate limiting (HTTP 429, RPC rate limits) - use `ToolError::rate_limited_string` for these
322 /// - Temporary service unavailability (HTTP 503)
323 /// - Blockchain congestion or temporary RPC failures
324 ///
325 /// Return permanent failures (`JobResult::Failure { error: ToolError::permanent_string(...) }`) for:
326 /// - Invalid parameters or malformed requests - use `ToolError::invalid_input_string` for these
327 /// - Authentication/authorization failures
328 /// - Insufficient funds or balance
329 /// - Invalid blockchain addresses or transaction data
330 ///
331 /// # Examples
332 ///
333 /// ```ignore
334 /// use riglr_core::{Tool, JobResult};
335 /// use async_trait::async_trait;
336 /// use serde_json::Value;
337 ///
338 /// struct WeatherTool;
339 ///
340 /// #[async_trait]
341 /// impl Tool for WeatherTool {
342 /// async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
343 /// // Validate parameters
344 /// let city = params["city"].as_str()
345 /// .ok_or("Missing required parameter: city")?;
346 ///
347 /// if city.is_empty() {
348 /// return Ok(JobResult::Failure {
349 /// error: ToolError::invalid_input_string("City name cannot be empty")
350 /// });
351 /// }
352 ///
353 /// // Simulate API call with mock weather service
354 /// let weather_data = if city == "InvalidCity" {
355 /// return Ok(JobResult::Failure {
356 /// error: ToolError::permanent_string(format!("City not found: {}", city))
357 /// });
358 /// } else if city == "TimeoutCity" {
359 /// return Ok(JobResult::Failure {
360 /// error: ToolError::retriable_string("Network timeout")
361 /// });
362 /// } else {
363 /// serde_json::json!({
364 /// "city": city,
365 /// "temperature": 22,
366 /// "condition": "sunny"
367 /// })
368 /// };
369 ///
370 /// Ok(JobResult::success(&weather_data)?)
371 /// }
372 ///
373 /// fn name(&self) -> &str {
374 /// "weather"
375 /// }
376 /// }
377 /// ```
378 async fn execute(
379 &self,
380 params: serde_json::Value,
381 context: &crate::provider::ApplicationContext,
382 ) -> Result<JobResult, ToolError>;
383
384 /// Get the name of this tool.
385 ///
386 /// The tool name is used for:
387 /// - Tool registration and lookup in the worker
388 /// - Job identification and logging
389 /// - Resource limit mapping (based on name prefixes)
390 /// - Metrics and monitoring
391 ///
392 /// # Naming Conventions
393 ///
394 /// Tool names should follow these patterns for automatic resource management:
395 ///
396 /// - `solana_*` - Uses "solana_rpc" resource pool (e.g., "solana_balance", "solana_transfer")
397 /// - `evm_*` - Uses "evm_rpc" resource pool (e.g., "evm_balance", "evm_swap")
398 /// - `web_*` - Uses "http_api" resource pool (e.g., "web_fetch", "web_scrape")
399 /// - Others - Use default resource pool
400 ///
401 /// # Returns
402 /// A string identifier for this tool that must be unique within a worker.
403 ///
404 /// # Examples
405 ///
406 /// ```ignore
407 /// use riglr_core::Tool;
408 /// # use async_trait::async_trait;
409 /// # use riglr_core::JobResult;
410 ///
411 /// struct BitcoinPriceChecker;
412 ///
413 /// # #[async_trait]
414 /// # impl Tool for BitcoinPriceChecker {
415 /// # async fn execute(&self, _: serde_json::Value, _: &crate::provider::ApplicationContext) -> Result<JobResult, ToolError> {
416 /// # Ok(JobResult::success(&42)?)
417 /// # }
418 /// #
419 /// fn name(&self) -> &str {
420 /// "web_bitcoin_price" // Will use "http_api" resource pool
421 /// }
422 /// # }
423 /// ```
424 fn name(&self) -> &str;
425
426 /// A concise, AI-friendly description of this tool's purpose.
427 ///
428 /// This should be a short, human-readable sentence that explains
429 /// what the tool does and when to use it. It's intended for AI models
430 /// and end-user UIs, and is separate from developer-facing rustdoc.
431 ///
432 /// When using the `#[tool]` macro from `riglr-macros`, this value is:
433 /// - Taken from the explicit `#[tool(description = "...")]` attribute when provided
434 /// - Otherwise derived from the item's doc comments
435 /// - Falls back to an empty string if neither is present
436 fn description(&self) -> &str;
437
438 /// Returns the JSON schema for this tool's parameters.
439 ///
440 /// This schema is used by AI models to understand what parameters
441 /// the tool accepts and their types. It should be a valid JSON Schema
442 /// object describing the parameters structure.
443 ///
444 /// The default implementation returns a generic object schema that
445 /// accepts any properties, which may not work well with all AI providers.
446 /// Tools should override this to provide their actual parameter schema.
447 fn schema(&self) -> serde_json::Value {
448 // Default to a generic object schema
449 serde_json::json!({
450 "type": "object",
451 "additionalProperties": true
452 })
453 }
454}
455
456/// Configuration for tool execution behavior.
457///
458/// This struct controls all aspects of how tools are executed by the [`ToolWorker`],
459/// including retry behavior, timeouts, concurrency limits, and idempotency settings.
460///
461/// # Examples
462///
463/// ```ignore
464/// use riglr_core::ExecutionConfig;
465/// use std::time::Duration;
466///
467/// // Default configuration
468/// let config = ExecutionConfig::default();
469///
470/// // Custom configuration for high-throughput scenario
471/// let high_throughput_config = ExecutionConfig {
472/// max_concurrency: 50,
473/// default_timeout: Duration::from_secs(120),
474/// max_retries: 5,
475/// initial_retry_delay: Duration::from_millis(200),
476/// max_retry_delay: Duration::from_secs(60),
477/// enable_idempotency: true,
478/// ..Default::default()
479/// };
480///
481/// // Conservative configuration for sensitive operations
482/// let conservative_config = ExecutionConfig {
483/// max_concurrency: 5,
484/// default_timeout: Duration::from_secs(300),
485/// max_retries: 1,
486/// initial_retry_delay: Duration::from_secs(5),
487/// max_retry_delay: Duration::from_secs(30),
488/// enable_idempotency: true,
489/// ..Default::default()
490/// };
491/// ```
492#[derive(Debug, Clone)]
493pub struct ExecutionConfig {
494 /// Maximum number of concurrent executions per resource type.
495 ///
496 /// This controls the default concurrency limit when no specific resource
497 /// limit is configured. Individual resource types can have their own
498 /// limits configured via [`ResourceLimits`].
499 ///
500 /// **Default**: 10
501 pub max_concurrency: usize,
502
503 /// Default timeout for tool execution.
504 ///
505 /// Individual tool executions will be cancelled if they exceed this duration.
506 /// Tools should be designed to complete within reasonable time bounds.
507 ///
508 /// **Default**: 30 seconds
509 pub default_timeout: Duration,
510
511 /// Maximum number of retry attempts for failed operations.
512 ///
513 /// This applies only to retriable failures. Permanent failures are never retried.
514 /// The total number of execution attempts will be `max_retries + 1`.
515 ///
516 /// **Default**: 3 retries (4 total attempts)
517 pub max_retries: u32,
518
519 /// Initial retry delay for exponential backoff.
520 ///
521 /// The first retry will wait this long after the initial failure.
522 /// Subsequent retries will use exponentially increasing delays.
523 ///
524 /// **Default**: 100 milliseconds
525 pub initial_retry_delay: Duration,
526
527 /// Maximum retry delay for exponential backoff.
528 ///
529 /// Retry delays will not exceed this value, even with exponential backoff.
530 /// This prevents excessive wait times for highly retried operations.
531 ///
532 /// **Default**: 10 seconds
533 pub max_retry_delay: Duration,
534
535 /// TTL for idempotency cache entries.
536 ///
537 /// Completed operations with idempotency keys will be cached for this duration.
538 /// Subsequent requests with the same key will return the cached result.
539 ///
540 /// **Default**: 1 hour
541 pub idempotency_ttl: Duration,
542
543 /// Whether to enable idempotency checking.
544 ///
545 /// When enabled, jobs with idempotency keys will have their results cached
546 /// and subsequent identical requests will return cached results instead of
547 /// re-executing the tool.
548 ///
549 /// **Default**: true
550 pub enable_idempotency: bool,
551}
552
553impl Default for ExecutionConfig {
554 fn default() -> Self {
555 Self {
556 max_concurrency: 10,
557 default_timeout: Duration::from_secs(30),
558 max_retries: 3,
559 initial_retry_delay: Duration::from_millis(100),
560 max_retry_delay: Duration::from_secs(10),
561 idempotency_ttl: Duration::from_secs(3600), // 1 hour
562 enable_idempotency: true,
563 }
564 }
565}
566
567/// Resource limits configuration for fine-grained concurrency control.
568///
569/// This struct allows you to set different concurrency limits for different
570/// types of operations, preventing any single resource type from overwhelming
571/// external services or consuming too many system resources.
572///
573/// Resource limits are applied based on tool name prefixes:
574/// - Tools starting with `solana_` use the "solana_rpc" resource pool
575/// - Tools starting with `evm_` use the "evm_rpc" resource pool
576/// - Tools starting with `web_` use the "http_api" resource pool
577/// - All other tools use the default concurrency limit
578///
579/// # Examples
580///
581/// ```rust
582/// use riglr_core::ResourceLimits;
583///
584/// // Create limits for different resource types
585/// let limits = ResourceLimits::default()
586/// .with_limit("solana_rpc", 3) // Max 3 concurrent Solana RPC calls
587/// .with_limit("evm_rpc", 8) // Max 8 concurrent EVM RPC calls
588/// .with_limit("http_api", 15) // Max 15 concurrent HTTP requests
589/// .with_limit("database", 5); // Max 5 concurrent database operations
590///
591/// // Use default limits (solana_rpc: 5, evm_rpc: 10, http_api: 20)
592/// let default_limits = ResourceLimits::default();
593/// ```
594///
595/// # Resource Pool Mapping
596///
597/// The system automatically maps tool names to resource pools:
598///
599/// ```text
600/// Tool Name → Resource Pool → Limit
601/// "solana_balance" → "solana_rpc" → configured limit
602/// "evm_swap" → "evm_rpc" → configured limit
603/// "web_fetch" → "http_api" → configured limit
604/// "custom_tool" → default → ExecutionConfig.max_concurrency
605/// ```
606#[derive(Debug, Clone)]
607pub struct ResourceLimits {
608 /// Resource name to semaphore mapping.
609 ///
610 /// Each semaphore controls the maximum number of concurrent operations
611 /// for its associated resource type.
612 semaphores: Arc<HashMap<String, Arc<Semaphore>>>,
613}
614
615impl ResourceLimits {
616 /// Add a resource limit for the specified resource type.
617 ///
618 /// This creates a semaphore that will limit concurrent access to the
619 /// specified resource. Tools with names matching the resource mapping
620 /// will be subject to this limit.
621 ///
622 /// # Parameters
623 /// * `resource` - The resource identifier (e.g., "solana_rpc", "evm_rpc")
624 /// * `limit` - Maximum number of concurrent operations for this resource
625 ///
626 /// # Returns
627 /// Self, for method chaining
628 ///
629 /// # Examples
630 ///
631 /// ```ignore
632 /// use riglr_core::ResourceLimits;
633 ///
634 /// let limits = ResourceLimits::default()
635 /// .with_limit("solana_rpc", 3) // Limit Solana RPC calls
636 /// .with_limit("database", 10) // Limit database connections
637 /// .with_limit("external_api", 5); // Limit external API calls
638 /// ```
639 pub fn with_limit(mut self, resource: impl Into<String>, limit: usize) -> Self {
640 let semaphores = Arc::make_mut(&mut self.semaphores);
641 semaphores.insert(resource.into(), Arc::new(Semaphore::new(limit)));
642 self
643 }
644
645 /// Get the semaphore for a specific resource type.
646 ///
647 /// This is used internally by the [`ToolWorker`] to acquire permits
648 /// before executing tools. Returns `None` if no limit is configured
649 /// for the specified resource.
650 ///
651 /// # Parameters
652 /// * `resource` - The resource identifier to look up
653 ///
654 /// # Returns
655 /// * `Some(Arc<Semaphore>)` - If a limit is configured for this resource
656 /// * `None` - If no limit is configured (will use default limit)
657 ///
658 /// # Examples
659 ///
660 /// ```rust
661 /// use riglr_core::ResourceLimits;
662 ///
663 /// let limits = ResourceLimits::default()
664 /// .with_limit("test_resource", 5);
665 ///
666 /// assert!(limits.get_semaphore("test_resource").is_some());
667 /// assert!(limits.get_semaphore("unknown_resource").is_none());
668 /// ```
669 pub fn get_semaphore(&self, resource: &str) -> Option<Arc<Semaphore>> {
670 self.semaphores.get(resource).cloned()
671 }
672}
673
674impl Default for ResourceLimits {
675 fn default() -> Self {
676 let mut semaphores = HashMap::new();
677 semaphores.insert("solana_rpc".to_string(), Arc::new(Semaphore::new(5)));
678 semaphores.insert("evm_rpc".to_string(), Arc::new(Semaphore::new(10)));
679 semaphores.insert("http_api".to_string(), Arc::new(Semaphore::new(20)));
680 Self {
681 semaphores: Arc::new(semaphores),
682 }
683 }
684}
685
686/// A worker that processes jobs from a queue using registered tools.
687///
688/// The `ToolWorker` is the core execution engine of the riglr system. It manages
689/// registered tools, processes jobs with resilience features, and provides
690/// comprehensive monitoring and observability.
691///
692/// ## Key Features
693///
694/// - **Resilient execution**: Automatic retries with exponential backoff
695/// - **Resource management**: Configurable limits per resource type
696/// - **Idempotency support**: Cached results for safe retries
697/// - **Comprehensive monitoring**: Built-in metrics and structured logging
698/// - **Concurrent processing**: Efficient parallel job execution
699///
700/// ## Architecture
701///
702/// The worker operates on a job-based model where:
703/// 1. Jobs are dequeued from a [`JobQueue`]
704/// 2. Tools are looked up by name and executed
705/// 3. Results are processed with retry logic for failures
706/// 4. Successful results are optionally cached for idempotency
707/// 5. Metrics are updated for monitoring
708///
709/// ## Examples
710///
711/// ### Basic Setup
712///
713/// ```ignore
714/// use riglr_core::{
715/// ToolWorker, ExecutionConfig, ResourceLimits,
716/// idempotency::InMemoryIdempotencyStore
717/// };
718/// use std::sync::Arc;
719///
720/// # async fn example() -> anyhow::Result<()> {
721/// // Create worker with default configuration
722/// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
723/// ExecutionConfig::default()
724/// );
725///
726/// // Add idempotency store for safe retries
727/// let store = Arc::new(InMemoryIdempotencyStore::new());
728/// let worker = worker.with_idempotency_store(store);
729///
730/// // Configure resource limits
731/// let limits = ResourceLimits::default()
732/// .with_limit("solana_rpc", 3)
733/// .with_limit("evm_rpc", 5);
734/// let worker = worker.with_resource_limits(limits);
735/// # Ok(())
736/// # }
737/// ```
738///
739/// ### Processing Jobs
740///
741/// ```ignore
742/// use riglr_core::{ToolWorker, Job, Tool, JobResult, ExecutionConfig};
743/// use riglr_core::idempotency::InMemoryIdempotencyStore;
744/// use async_trait::async_trait;
745/// use std::sync::Arc;
746///
747/// # struct ExampleTool;
748/// # #[async_trait]
749/// # impl Tool for ExampleTool {
750/// # async fn execute(&self, _: serde_json::Value, _: &crate::provider::ApplicationContext) -> Result<JobResult, ToolError> {
751/// # Ok(JobResult::success(&"example result")?)
752/// # }
753/// # fn name(&self) -> &str { "example" }
754/// # }
755/// # async fn example() -> anyhow::Result<()> {
756/// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
757/// ExecutionConfig::default()
758/// );
759///
760/// // Register tools
761/// worker.register_tool(Arc::new(ExampleTool)).await;
762///
763/// // Process a job
764/// let job = Job::new("example", &serde_json::json!({}), 3)?;
765/// let result = worker.process_job(job).await.unwrap();
766///
767/// println!("Job result: {:?}", result);
768///
769/// // Check metrics
770/// let metrics = worker.metrics();
771/// println!("Jobs processed: {}",
772/// metrics.jobs_processed.load(std::sync::atomic::Ordering::Relaxed));
773/// # Ok(())
774/// # }
775/// ```
776pub struct ToolWorker<I: IdempotencyStore + 'static> {
777 /// Registered tools, indexed by name for fast lookup
778 tools: Arc<DashMap<String, Arc<dyn Tool>>>,
779
780 /// Default semaphore for general concurrency control
781 default_semaphore: Arc<Semaphore>,
782
783 /// Resource-specific limits for fine-grained control
784 resource_limits: ResourceLimits,
785
786 /// Configuration for retry behavior, timeouts, etc.
787 config: ExecutionConfig,
788
789 /// Optional idempotency store for caching results
790 idempotency_store: Option<Arc<I>>,
791
792 /// Performance and operational metrics
793 metrics: Arc<WorkerMetrics>,
794
795 /// Application context providing access to RPC providers and shared resources
796 app_context: crate::provider::ApplicationContext,
797}
798
799/// Performance and operational metrics for monitoring worker health.
800///
801/// These metrics provide insight into worker performance and can be used
802/// for monitoring, alerting, and performance optimization.
803///
804/// All metrics use atomic integers for thread-safe updates and reads.
805///
806/// ## Metrics Tracked
807///
808/// - **jobs_processed**: Total number of jobs dequeued and processed
809/// - **jobs_succeeded**: Number of jobs that completed successfully
810/// - **jobs_failed**: Number of jobs that failed permanently (after all retries)
811/// - **jobs_retried**: Total number of retry attempts across all jobs
812///
813/// ## Examples
814///
815/// ```ignore
816/// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
817/// use std::sync::atomic::Ordering;
818///
819/// # async fn example() {
820/// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
821/// ExecutionConfig::default()
822/// );
823///
824/// let metrics = worker.metrics();
825///
826/// // Read current metrics
827/// let processed = metrics.jobs_processed.load(Ordering::Relaxed);
828/// let succeeded = metrics.jobs_succeeded.load(Ordering::Relaxed);
829/// let failed = metrics.jobs_failed.load(Ordering::Relaxed);
830/// let retried = metrics.jobs_retried.load(Ordering::Relaxed);
831///
832/// println!("Worker Stats:");
833/// println!(" Processed: {}", processed);
834/// println!(" Succeeded: {}", succeeded);
835/// println!(" Failed: {}", failed);
836/// println!(" Retried: {}", retried);
837/// println!(" Success Rate: {:.2}%",
838/// if processed > 0 { (succeeded as f64 / processed as f64) * 100.0 } else { 0.0 });
839/// # }
840/// ```
841#[derive(Debug, Default)]
842pub struct WorkerMetrics {
843 /// Total number of jobs processed (dequeued and executed)
844 pub jobs_processed: std::sync::atomic::AtomicU64,
845
846 /// Number of jobs that completed successfully
847 pub jobs_succeeded: std::sync::atomic::AtomicU64,
848
849 /// Number of jobs that failed permanently (after all retries)
850 pub jobs_failed: std::sync::atomic::AtomicU64,
851
852 /// Total number of retry attempts across all jobs
853 pub jobs_retried: std::sync::atomic::AtomicU64,
854}
855
856impl<I: IdempotencyStore + 'static> std::fmt::Debug for ToolWorker<I> {
857 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
858 f.debug_struct("ToolWorker")
859 .field("tools_count", &self.tools.len())
860 .field("config", &self.config)
861 .finish_non_exhaustive()
862 }
863}
864
865impl<I: IdempotencyStore + 'static> ToolWorker<I> {
866 /// Create a new tool worker with the given configuration and application context.
867 ///
868 /// This creates a worker ready to process jobs, but no tools are registered yet.
869 /// Use [`register_tool()`](Self::register_tool) to add tools before processing jobs.
870 ///
871 /// # Parameters
872 /// * `config` - Execution configuration controlling retry behavior, timeouts, etc.
873 /// * `app_context` - Application context providing access to RPC providers and shared resources
874 ///
875 /// # Examples
876 ///
877 /// ```ignore
878 /// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore, provider::ApplicationContext};
879 /// use std::time::Duration;
880 /// use riglr_config::Config;
881 ///
882 /// let exec_config = ExecutionConfig {
883 /// max_concurrency: 20,
884 /// default_timeout: Duration::from_secs(60),
885 /// max_retries: 5,
886 /// ..Default::default()
887 /// };
888 /// let config = Config::from_env();
889 /// let app_context = ApplicationContext::from_config(&config);
890 ///
891 /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(exec_config, app_context);
892 /// ```
893 pub fn new(config: ExecutionConfig, app_context: crate::provider::ApplicationContext) -> Self {
894 Self {
895 tools: Arc::new(DashMap::new()),
896 default_semaphore: Arc::new(Semaphore::new(config.max_concurrency)),
897 resource_limits: ResourceLimits::default(),
898 config,
899 idempotency_store: None,
900 metrics: Arc::new(WorkerMetrics::default()),
901 app_context,
902 }
903 }
904
905 /// Configure an idempotency store for result caching.
906 ///
907 /// When an idempotency store is configured, jobs with idempotency keys
908 /// will have their results cached. Subsequent executions with the same
909 /// idempotency key will return the cached result instead of re-executing.
910 ///
911 /// # Parameters
912 /// * `store` - The idempotency store implementation to use
913 ///
914 /// # Returns
915 /// Self, for method chaining
916 ///
917 /// # Examples
918 ///
919 /// ```ignore
920 /// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
921 /// use std::sync::Arc;
922 ///
923 /// let store = Arc::new(InMemoryIdempotencyStore::new());
924 /// let worker = ToolWorker::new(ExecutionConfig::default())
925 /// .with_idempotency_store(store);
926 /// ```
927 pub fn with_idempotency_store(mut self, store: Arc<I>) -> Self {
928 self.idempotency_store = Some(store);
929 self
930 }
931
932 /// Configure custom resource limits.
933 ///
934 /// Resource limits control how many concurrent operations can run
935 /// for each resource type. This prevents overwhelming external
936 /// services and provides fine-grained control over resource usage.
937 ///
938 /// # Parameters
939 /// * `limits` - The resource limits configuration to use
940 ///
941 /// # Returns
942 /// Self, for method chaining
943 ///
944 /// # Examples
945 ///
946 /// ```ignore
947 /// use riglr_core::{ToolWorker, ExecutionConfig, ResourceLimits, idempotency::InMemoryIdempotencyStore};
948 ///
949 /// let limits = ResourceLimits::default()
950 /// .with_limit("solana_rpc", 3)
951 /// .with_limit("evm_rpc", 8)
952 /// .with_limit("http_api", 15);
953 ///
954 /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default())
955 /// .with_resource_limits(limits);
956 /// ```
957 pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
958 self.resource_limits = limits;
959 self
960 }
961
962 /// Register a tool with this worker.
963 ///
964 /// Tools must be registered before they can be executed by jobs.
965 /// Each tool is indexed by its name, so tool names must be unique
966 /// within a single worker.
967 ///
968 /// # Parameters
969 /// * `tool` - The tool implementation to register
970 ///
971 /// # Panics
972 /// This method does not panic, but if a tool with the same name
973 /// is already registered, it will be replaced.
974 ///
975 /// # Examples
976 ///
977 /// ```ignore
978 /// use riglr_core::{ToolWorker, ExecutionConfig, Tool, JobResult, idempotency::InMemoryIdempotencyStore};
979 /// use async_trait::async_trait;
980 /// use std::sync::Arc;
981 ///
982 /// struct CalculatorTool;
983 ///
984 /// #[async_trait]
985 /// impl Tool for CalculatorTool {
986 /// async fn execute(&self, params: serde_json::Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
987 /// let a = params["a"].as_f64().unwrap_or(0.0);
988 /// let b = params["b"].as_f64().unwrap_or(0.0);
989 /// Ok(JobResult::success(&(a + b))?)
990 /// }
991 ///
992 /// fn name(&self) -> &str {
993 /// "calculator"
994 /// }
995 /// }
996 ///
997 /// # async fn example() {
998 /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default());
999 /// worker.register_tool(Arc::new(CalculatorTool)).await;
1000 /// # }
1001 /// ```
1002 pub async fn register_tool(&self, tool: Arc<dyn Tool>) {
1003 self.tools.insert(tool.name().to_string(), tool);
1004 }
1005
1006 /// Get access to worker metrics.
1007 ///
1008 /// The returned metrics can be used for monitoring worker performance
1009 /// and health. All metrics are thread-safe and can be read at any time.
1010 ///
1011 /// # Returns
1012 /// A reference to the worker's metrics
1013 ///
1014 /// # Examples
1015 ///
1016 /// ```ignore
1017 /// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
1018 /// use std::sync::atomic::Ordering;
1019 ///
1020 /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default());
1021 /// let metrics = worker.metrics();
1022 ///
1023 /// println!("Jobs processed: {}",
1024 /// metrics.jobs_processed.load(Ordering::Relaxed));
1025 /// println!("Success rate: {:.2}%", {
1026 /// let processed = metrics.jobs_processed.load(Ordering::Relaxed);
1027 /// let succeeded = metrics.jobs_succeeded.load(Ordering::Relaxed);
1028 /// if processed > 0 { (succeeded as f64 / processed as f64) * 100.0 } else { 0.0 }
1029 /// });
1030 /// ```
1031 pub fn metrics(&self) -> &WorkerMetrics {
1032 &self.metrics
1033 }
1034
1035 /// Process a single job with all resilience features.
1036 ///
1037 /// Returns:
1038 /// - `Ok(JobResult)` - The job was processed (successfully or with business logic failure)
1039 /// - `Err(WorkerError)` - System-level worker failure (tool not found, semaphore issues, etc.)
1040 pub async fn process_job(&self, mut job: Job) -> Result<JobResult, WorkerError> {
1041 // Check idempotency cache first
1042 if let Some(cached_result) = self.check_idempotency_cache(&job).await? {
1043 return Ok(Arc::try_unwrap(cached_result).unwrap_or_else(|arc| (*arc).clone()));
1044 }
1045
1046 // Apply rate limiting for operations within a SignerContext that has a user_id
1047 if let Ok(signer) = SignerContext::current().await {
1048 if let Some(user_id) = signer.user_id() {
1049 if let Err(rate_limit_error) =
1050 self.app_context.rate_limiter.check_rate_limit(&user_id)
1051 {
1052 // Convert rate limit error to a JobResult::Failure
1053 return Ok(JobResult::Failure {
1054 error: rate_limit_error,
1055 });
1056 }
1057 }
1058 }
1059
1060 // Acquire appropriate semaphore
1061 let _permit = self.acquire_semaphore(&job.tool_name).await.map_err(|e| {
1062 WorkerError::SemaphoreAcquisition {
1063 tool_name: job.tool_name.clone(),
1064 source_message: e.to_string(),
1065 }
1066 })?;
1067
1068 // Get the tool for this job
1069 let tool = self.get_tool(&job.tool_name).await?;
1070
1071 // Execute with retries
1072 let result = self.execute_with_retries(tool, &mut job).await;
1073
1074 // Cache successful results
1075 if let Ok(ref job_result) = result {
1076 self.cache_result(&job, job_result).await;
1077 }
1078
1079 result
1080 }
1081
1082 /// Check if there's a cached result for this job
1083 async fn check_idempotency_cache(
1084 &self,
1085 job: &Job,
1086 ) -> Result<Option<Arc<JobResult>>, WorkerError> {
1087 if let Some(ref idempotency_key) = job.idempotency_key {
1088 if self.config.enable_idempotency {
1089 if let Some(ref store) = self.idempotency_store {
1090 match store.get(idempotency_key).await {
1091 Ok(Some(cached_result)) => {
1092 info!(
1093 "Returning cached result for idempotency key: {}",
1094 idempotency_key
1095 );
1096 return Ok(Some(cached_result));
1097 }
1098 Ok(None) => {} // No cached result
1099 Err(e) => {
1100 return Err(WorkerError::IdempotencyStore {
1101 source_message: e.to_string(),
1102 });
1103 }
1104 }
1105 }
1106 }
1107 }
1108 Ok(None)
1109 }
1110
1111 /// Get a tool from the registry
1112 async fn get_tool(&self, tool_name: &str) -> Result<Arc<dyn Tool>, WorkerError> {
1113 self.tools
1114 .get(tool_name)
1115 .ok_or_else(|| WorkerError::ToolNotFound {
1116 tool_name: tool_name.to_string(),
1117 })
1118 .map(|entry| (*entry).clone())
1119 }
1120
1121 /// Execute a tool with retry logic using unified retry helper
1122 async fn execute_with_retries(
1123 &self,
1124 tool: Arc<dyn Tool>,
1125 job: &mut Job,
1126 ) -> Result<JobResult, WorkerError> {
1127 use crate::retry::{retry_async, ErrorClass, RetryConfig};
1128
1129 // Create retry config from worker config
1130 let retry_config = RetryConfig {
1131 max_retries: job.max_retries,
1132 base_delay_ms: self.config.initial_retry_delay.as_millis() as u64,
1133 max_delay_ms: self.config.max_retry_delay.as_millis() as u64,
1134 backoff_multiplier: 2.0,
1135 use_jitter: true,
1136 };
1137
1138 // Use unified retry helper
1139 let result = retry_async(
1140 || async { self.execute_single_attempt(&tool, &job.params).await },
1141 |error| {
1142 // Classify error based on ToolError's is_retriable
1143 if error.is_retriable() {
1144 ErrorClass::Retryable
1145 } else {
1146 ErrorClass::Permanent
1147 }
1148 },
1149 &retry_config,
1150 &format!("job_{}", job.job_id),
1151 )
1152 .await;
1153
1154 // Update metrics based on result
1155 match result {
1156 Ok(job_result) => {
1157 self.metrics
1158 .jobs_succeeded
1159 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1160 Ok(job_result)
1161 }
1162 Err(tool_error) => {
1163 self.metrics
1164 .jobs_failed
1165 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1166 // Track retries that occurred - the retry helper manages the actual retries
1167 if retry_config.max_retries > 0 {
1168 self.metrics
1169 .jobs_retried
1170 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1171 }
1172 Ok(JobResult::Failure { error: tool_error })
1173 }
1174 }
1175 }
1176
1177 /// Execute a single attempt of a tool
1178 async fn execute_single_attempt(
1179 &self,
1180 tool: &Arc<dyn Tool>,
1181 params: &serde_json::Value,
1182 ) -> Result<JobResult, ToolError> {
1183 // Execute with timeout
1184 let result = tokio::time::timeout(
1185 self.config.default_timeout,
1186 tool.execute(params.clone(), &self.app_context),
1187 )
1188 .await;
1189
1190 match result {
1191 Ok(Ok(job_result)) => Ok(job_result),
1192 Ok(Err(e)) => {
1193 // Propagate the ToolError directly to preserve its structure
1194 warn!("Tool execution failed: {}", e);
1195 Err(e)
1196 }
1197 Err(_) => {
1198 let error_msg = format!(
1199 "Tool execution timed out after {:?}",
1200 self.config.default_timeout
1201 );
1202 warn!("{}", error_msg);
1203 // Timeout errors are retriable
1204 Err(ToolError::retriable_string(error_msg))
1205 }
1206 }
1207 }
1208
1209 // Note: create_backoff_strategy and wait_with_backoff methods removed
1210 // as they are now handled by the unified retry helper in retry.rs
1211
1212 /// Cache a successful result
1213 async fn cache_result(&self, job: &Job, job_result: &JobResult) {
1214 if let Some(ref idempotency_key) = job.idempotency_key {
1215 if self.config.enable_idempotency {
1216 if let Some(ref store) = self.idempotency_store {
1217 let _ = store
1218 .set(
1219 idempotency_key,
1220 Arc::new(job_result.clone()),
1221 self.config.idempotency_ttl,
1222 )
1223 .await;
1224 }
1225 }
1226 }
1227 }
1228
1229 /// Acquire the appropriate semaphore for a tool
1230 async fn acquire_semaphore(
1231 &self,
1232 tool_name: &str,
1233 ) -> Result<OwnedSemaphorePermit, Box<dyn std::error::Error + Send + Sync>> {
1234 // Check if there's a specific resource limit for this tool
1235 let resource_name = match tool_name {
1236 name if name.starts_with("solana_") => "solana_rpc",
1237 name if name.starts_with("evm_") => "evm_rpc",
1238 name if name.starts_with("web_") => "http_api",
1239 _ => "",
1240 };
1241
1242 if !resource_name.is_empty() {
1243 if let Some(semaphore) = self.resource_limits.get_semaphore(resource_name) {
1244 return Ok(semaphore.acquire_owned().await?);
1245 }
1246 }
1247
1248 // Fall back to default semaphore
1249 Ok(self.default_semaphore.clone().acquire_owned().await?)
1250 }
1251
1252 /// Start the worker loop, processing jobs from the given queue.
1253 ///
1254 /// The worker will continue processing jobs until the provided cancellation token
1255 /// is cancelled. This allows for graceful shutdown where in-flight jobs can complete
1256 /// before the worker stops.
1257 ///
1258 /// # Parameters
1259 /// * `queue` - The job queue to process jobs from
1260 /// * `cancellation_token` - Token to signal when the worker should stop
1261 ///
1262 /// # Examples
1263 ///
1264 /// ```rust
1265 /// use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
1266 /// use riglr_core::provider::ApplicationContext;
1267 /// use riglr_core::queue::InMemoryJobQueue;
1268 /// use riglr_config::ConfigBuilder;
1269 /// use tokio_util::sync::CancellationToken;
1270 /// use std::sync::Arc;
1271 ///
1272 /// # async fn example() -> anyhow::Result<()> {
1273 /// let config = ConfigBuilder::default().build().unwrap();
1274 /// let app_context = ApplicationContext::from_config(&config);
1275 /// let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default(), app_context);
1276 /// let queue = Arc::new(InMemoryJobQueue::new());
1277 /// let cancellation_token = CancellationToken::new();
1278 ///
1279 /// // Start worker in background
1280 /// let token_clone = cancellation_token.clone();
1281 /// let worker_handle = tokio::spawn(async move {
1282 /// worker.run(queue, token_clone).await
1283 /// });
1284 ///
1285 /// // Later, signal shutdown
1286 /// cancellation_token.cancel();
1287 /// // Await the task and ignore the inner result for simplicity in docs
1288 /// let _ = worker_handle.await;
1289 /// # Ok(())
1290 /// # }
1291 /// ```
1292 pub async fn run<Q: JobQueue>(
1293 &self,
1294 queue: Arc<Q>,
1295 cancellation_token: tokio_util::sync::CancellationToken,
1296 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1297 info!(
1298 "Starting ToolWorker with {} tools registered",
1299 self.tools.len()
1300 );
1301
1302 while !cancellation_token.is_cancelled() {
1303 tokio::select! {
1304 // Check for cancellation
1305 _ = cancellation_token.cancelled() => {
1306 info!("Worker shutdown requested, stopping job processing");
1307 break;
1308 }
1309
1310 // Try to dequeue and process jobs
1311 result = queue.dequeue_with_timeout(Duration::from_secs(5)) => {
1312 match result {
1313 Ok(Some(job)) => {
1314 let job_id = job.job_id;
1315 let tool_name = job.tool_name.clone();
1316
1317 self.metrics
1318 .jobs_processed
1319 .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
1320
1321 // Spawn task to process job asynchronously
1322 let worker = self.clone();
1323 tokio::spawn(async move {
1324 match worker.process_job(job).await {
1325 Ok(job_result) => {
1326 if job_result.is_success() {
1327 info!("Job {} ({}) completed successfully", job_id, tool_name);
1328 } else {
1329 warn!(
1330 "Job {} ({}) failed: {:?}",
1331 job_id, tool_name, job_result
1332 );
1333 }
1334 }
1335 Err(e) => {
1336 error!("Job {} ({}) processing error: {}", job_id, tool_name, e);
1337 }
1338 }
1339 });
1340 }
1341 Ok(None) => {
1342 // No jobs available, continue
1343 debug!("No jobs available in queue");
1344 }
1345 Err(e) => {
1346 error!("Failed to dequeue job: {}", e);
1347 tokio::time::sleep(Duration::from_secs(1)).await;
1348 }
1349 }
1350 }
1351 }
1352 }
1353
1354 info!("ToolWorker shutdown completed");
1355 Ok(())
1356 }
1357}
1358
1359// Implement Clone for ToolWorker to enable spawning tasks
1360impl<I: IdempotencyStore + 'static> Clone for ToolWorker<I> {
1361 fn clone(&self) -> Self {
1362 Self {
1363 tools: self.tools.clone(),
1364 default_semaphore: self.default_semaphore.clone(),
1365 resource_limits: self.resource_limits.clone(),
1366 config: self.config.clone(),
1367 idempotency_store: self.idempotency_store.clone(),
1368 metrics: self.metrics.clone(),
1369 app_context: self.app_context.clone(),
1370 }
1371 }
1372}
1373
1374#[cfg(test)]
1375mod tests {
1376 use super::*;
1377 use crate::idempotency::InMemoryIdempotencyStore;
1378 use crate::jobs::Job;
1379 use crate::provider::ApplicationContext;
1380 use uuid::Uuid;
1381
1382 fn test_app_context() -> ApplicationContext {
1383 // Use default configuration for tests
1384 ApplicationContext::default()
1385 }
1386
1387 struct MockTool {
1388 name: String,
1389 should_fail: bool,
1390 }
1391
1392 struct RetriableMockTool {
1393 name: String,
1394 }
1395
1396 #[async_trait]
1397 impl Tool for MockTool {
1398 async fn execute(
1399 &self,
1400 _params: serde_json::Value,
1401 _context: &crate::provider::ApplicationContext,
1402 ) -> Result<JobResult, ToolError> {
1403 if self.should_fail {
1404 Err(ToolError::permanent_string("Mock failure"))
1405 } else {
1406 Ok(JobResult::Success {
1407 value: serde_json::json!({"result": "success"}),
1408 tx_hash: None,
1409 })
1410 }
1411 }
1412
1413 fn name(&self) -> &str {
1414 &self.name
1415 }
1416
1417 fn description(&self) -> &str {
1418 ""
1419 }
1420
1421 fn schema(&self) -> serde_json::Value {
1422 serde_json::json!({
1423 "type": "object",
1424 "additionalProperties": true
1425 })
1426 }
1427 }
1428
1429 #[async_trait]
1430 impl Tool for RetriableMockTool {
1431 async fn execute(
1432 &self,
1433 _params: serde_json::Value,
1434 _context: &crate::provider::ApplicationContext,
1435 ) -> Result<JobResult, ToolError> {
1436 Err(ToolError::retriable_string("Mock retriable failure"))
1437 }
1438
1439 fn name(&self) -> &str {
1440 &self.name
1441 }
1442
1443 fn description(&self) -> &str {
1444 ""
1445 }
1446
1447 fn schema(&self) -> serde_json::Value {
1448 serde_json::json!({
1449 "type": "object",
1450 "additionalProperties": true
1451 })
1452 }
1453 }
1454
1455 #[tokio::test]
1456 async fn test_tool_worker_process_job() {
1457 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1458 ExecutionConfig::default(),
1459 test_app_context(),
1460 );
1461 let tool = Arc::new(MockTool {
1462 name: "test_tool".to_string(),
1463 should_fail: false,
1464 });
1465 worker.register_tool(tool).await;
1466
1467 let job = Job {
1468 job_id: Uuid::new_v4(),
1469 tool_name: "test_tool".to_string(),
1470 params: serde_json::json!({}),
1471 idempotency_key: None,
1472 max_retries: 3,
1473 retry_count: 0,
1474 };
1475
1476 let result = worker.process_job(job).await.unwrap();
1477 match result {
1478 JobResult::Success { .. } => (),
1479 _ => panic!("Expected success"),
1480 }
1481 }
1482
1483 #[tokio::test]
1484 async fn test_tool_worker_with_idempotency() {
1485 let store = Arc::new(InMemoryIdempotencyStore::new());
1486 let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
1487 .with_idempotency_store(store.clone());
1488
1489 let tool = Arc::new(MockTool {
1490 name: "test_tool".to_string(),
1491 should_fail: false,
1492 });
1493 worker.register_tool(tool).await;
1494
1495 let job = Job {
1496 job_id: Uuid::new_v4(),
1497 tool_name: "test_tool".to_string(),
1498 params: serde_json::json!({}),
1499 idempotency_key: Some("test_key".to_string()),
1500 max_retries: 3,
1501 retry_count: 0,
1502 };
1503
1504 // First execution
1505 let result1 = worker.process_job(job.clone()).await.unwrap();
1506 assert!(result1.is_success());
1507
1508 // Second execution should return cached result
1509 let result2 = worker.process_job(job).await.unwrap();
1510 assert!(result2.is_success());
1511 }
1512
1513 #[tokio::test]
1514 async fn test_tool_worker_with_retries() {
1515 let config = ExecutionConfig {
1516 initial_retry_delay: Duration::from_millis(10),
1517 ..Default::default()
1518 };
1519
1520 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
1521 let tool = Arc::new(MockTool {
1522 name: "test_tool".to_string(),
1523 should_fail: true,
1524 });
1525 worker.register_tool(tool).await;
1526
1527 let job = Job {
1528 job_id: Uuid::new_v4(),
1529 tool_name: "test_tool".to_string(),
1530 params: serde_json::json!({}),
1531 idempotency_key: None,
1532 max_retries: 2,
1533 retry_count: 0,
1534 };
1535
1536 let result = worker.process_job(job).await.unwrap();
1537 match result {
1538 JobResult::Failure { .. } => {
1539 assert!(!result.is_retriable()); // Should not be retriable after exhausting retries
1540 }
1541 _ => panic!("Expected failure"),
1542 }
1543 }
1544
1545 #[tokio::test]
1546 async fn test_tool_worker_tool_not_found() {
1547 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1548 ExecutionConfig::default(),
1549 test_app_context(),
1550 );
1551
1552 let job = Job {
1553 job_id: Uuid::new_v4(),
1554 tool_name: "nonexistent_tool".to_string(),
1555 params: serde_json::json!({}),
1556 idempotency_key: None,
1557 max_retries: 0,
1558 retry_count: 0,
1559 };
1560
1561 let result = worker.process_job(job).await;
1562 assert!(result.is_err());
1563 assert!(result
1564 .unwrap_err()
1565 .to_string()
1566 .contains("Tool 'nonexistent_tool' not found"));
1567 }
1568
1569 #[tokio::test]
1570 async fn test_tool_worker_timeout() {
1571 let config = ExecutionConfig {
1572 default_timeout: Duration::from_millis(10), // Very short timeout
1573 ..Default::default()
1574 };
1575
1576 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
1577 let tool = Arc::new(SlowMockTool {
1578 name: "slow_tool".to_string(),
1579 delay: Duration::from_millis(100),
1580 });
1581 worker.register_tool(tool).await;
1582
1583 let job = Job {
1584 job_id: Uuid::new_v4(),
1585 tool_name: "slow_tool".to_string(),
1586 params: serde_json::json!({}),
1587 idempotency_key: None,
1588 max_retries: 1,
1589 retry_count: 0,
1590 };
1591
1592 let result = worker.process_job(job).await.unwrap();
1593 match result {
1594 JobResult::Failure { error, .. } => {
1595 assert!(error.to_string().to_lowercase().contains("timed out"));
1596 }
1597 _ => panic!("Expected timeout failure"),
1598 }
1599 }
1600
1601 #[tokio::test]
1602 async fn test_tool_worker_with_resource_limits() {
1603 let config = ExecutionConfig::default();
1604 let limits = ResourceLimits::default()
1605 .with_limit("solana_rpc", 2)
1606 .with_limit("evm_rpc", 3);
1607
1608 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context())
1609 .with_resource_limits(limits);
1610
1611 // Test semaphore acquisition for different tool types
1612 let solana_tool = Arc::new(MockTool {
1613 name: "solana_test".to_string(),
1614 should_fail: false,
1615 });
1616 let evm_tool = Arc::new(MockTool {
1617 name: "evm_test".to_string(),
1618 should_fail: false,
1619 });
1620 let web_tool = Arc::new(MockTool {
1621 name: "web_test".to_string(),
1622 should_fail: false,
1623 });
1624 let other_tool = Arc::new(MockTool {
1625 name: "other_test".to_string(),
1626 should_fail: false,
1627 });
1628
1629 worker.register_tool(solana_tool).await;
1630 worker.register_tool(evm_tool).await;
1631 worker.register_tool(web_tool).await;
1632 worker.register_tool(other_tool).await;
1633
1634 // Test different tool name patterns
1635 let jobs = vec![
1636 Job {
1637 job_id: Uuid::new_v4(),
1638 tool_name: "solana_test".to_string(),
1639 params: serde_json::json!({}),
1640 idempotency_key: None,
1641 max_retries: 0,
1642 retry_count: 0,
1643 },
1644 Job {
1645 job_id: Uuid::new_v4(),
1646 tool_name: "evm_test".to_string(),
1647 params: serde_json::json!({}),
1648 idempotency_key: None,
1649 max_retries: 0,
1650 retry_count: 0,
1651 },
1652 Job {
1653 job_id: Uuid::new_v4(),
1654 tool_name: "web_test".to_string(),
1655 params: serde_json::json!({}),
1656 idempotency_key: None,
1657 max_retries: 0,
1658 retry_count: 0,
1659 },
1660 Job {
1661 job_id: Uuid::new_v4(),
1662 tool_name: "other_test".to_string(),
1663 params: serde_json::json!({}),
1664 idempotency_key: None,
1665 max_retries: 0,
1666 retry_count: 0,
1667 },
1668 ];
1669
1670 // All should succeed
1671 for job in jobs {
1672 let result = worker.process_job(job).await.unwrap();
1673 assert!(result.is_success());
1674 }
1675 }
1676
1677 #[tokio::test]
1678 async fn test_tool_worker_idempotency_disabled() {
1679 let config = ExecutionConfig {
1680 enable_idempotency: false,
1681 ..Default::default()
1682 };
1683
1684 let store = Arc::new(InMemoryIdempotencyStore::new());
1685 let worker =
1686 ToolWorker::new(config, test_app_context()).with_idempotency_store(store.clone());
1687
1688 let tool = Arc::new(MockTool {
1689 name: "test_tool".to_string(),
1690 should_fail: false,
1691 });
1692 worker.register_tool(tool).await;
1693
1694 let job = Job {
1695 job_id: Uuid::new_v4(),
1696 tool_name: "test_tool".to_string(),
1697 params: serde_json::json!({}),
1698 idempotency_key: Some("test_key".to_string()),
1699 max_retries: 0,
1700 retry_count: 0,
1701 };
1702
1703 // First execution
1704 let result1 = worker.process_job(job.clone()).await.unwrap();
1705 assert!(result1.is_success());
1706
1707 // Second execution should NOT use cache due to disabled idempotency
1708 let result2 = worker.process_job(job).await.unwrap();
1709 assert!(result2.is_success());
1710
1711 // Verify the key was never set in the store
1712 assert!(store.get("test_key").await.unwrap().is_none());
1713 }
1714
1715 #[tokio::test]
1716 async fn test_tool_worker_metrics() {
1717 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1718 ExecutionConfig::default(),
1719 test_app_context(),
1720 );
1721 let success_tool = Arc::new(MockTool {
1722 name: "success_tool".to_string(),
1723 should_fail: false,
1724 });
1725 let fail_tool = Arc::new(RetriableMockTool {
1726 name: "fail_tool".to_string(),
1727 });
1728
1729 worker.register_tool(success_tool).await;
1730 worker.register_tool(fail_tool).await;
1731
1732 let metrics = worker.metrics();
1733
1734 // Initial state
1735 assert_eq!(
1736 metrics
1737 .jobs_processed
1738 .load(std::sync::atomic::Ordering::Relaxed),
1739 0
1740 );
1741 assert_eq!(
1742 metrics
1743 .jobs_succeeded
1744 .load(std::sync::atomic::Ordering::Relaxed),
1745 0
1746 );
1747 assert_eq!(
1748 metrics
1749 .jobs_failed
1750 .load(std::sync::atomic::Ordering::Relaxed),
1751 0
1752 );
1753 assert_eq!(
1754 metrics
1755 .jobs_retried
1756 .load(std::sync::atomic::Ordering::Relaxed),
1757 0
1758 );
1759
1760 // Process successful job
1761 let success_job = Job {
1762 job_id: Uuid::new_v4(),
1763 tool_name: "success_tool".to_string(),
1764 params: serde_json::json!({}),
1765 idempotency_key: None,
1766 max_retries: 0,
1767 retry_count: 0,
1768 };
1769 worker.process_job(success_job).await.unwrap();
1770 assert_eq!(
1771 metrics
1772 .jobs_succeeded
1773 .load(std::sync::atomic::Ordering::Relaxed),
1774 1
1775 );
1776
1777 // Process failing job with retries
1778 let fail_job = Job {
1779 job_id: Uuid::new_v4(),
1780 tool_name: "fail_tool".to_string(),
1781 params: serde_json::json!({}),
1782 idempotency_key: None,
1783 max_retries: 2,
1784 retry_count: 0,
1785 };
1786 worker.process_job(fail_job).await.unwrap();
1787 assert_eq!(
1788 metrics
1789 .jobs_failed
1790 .load(std::sync::atomic::Ordering::Relaxed),
1791 1
1792 );
1793 assert_eq!(
1794 metrics
1795 .jobs_retried
1796 .load(std::sync::atomic::Ordering::Relaxed),
1797 1
1798 );
1799 }
1800
1801 #[tokio::test]
1802 async fn test_execution_config_default() {
1803 let config = ExecutionConfig::default();
1804 assert_eq!(config.max_concurrency, 10);
1805 assert_eq!(config.default_timeout, Duration::from_secs(30));
1806 assert_eq!(config.max_retries, 3);
1807 assert_eq!(config.initial_retry_delay, Duration::from_millis(100));
1808 assert_eq!(config.max_retry_delay, Duration::from_secs(10));
1809 assert_eq!(config.idempotency_ttl, Duration::from_secs(3600));
1810 assert!(config.enable_idempotency);
1811 }
1812
1813 #[tokio::test]
1814 async fn test_resource_limits() {
1815 let limits = ResourceLimits::default()
1816 .with_limit("test_resource", 5)
1817 .with_limit("another_resource", 10);
1818
1819 assert!(limits.get_semaphore("test_resource").is_some());
1820 assert!(limits.get_semaphore("another_resource").is_some());
1821 assert!(limits.get_semaphore("nonexistent").is_none());
1822
1823 let default_limits = ResourceLimits::default();
1824 assert!(default_limits.get_semaphore("solana_rpc").is_some());
1825 assert!(default_limits.get_semaphore("evm_rpc").is_some());
1826 assert!(default_limits.get_semaphore("http_api").is_some());
1827 }
1828
1829 #[tokio::test]
1830 async fn test_worker_metrics_default() {
1831 let metrics = WorkerMetrics::default();
1832 assert_eq!(
1833 metrics
1834 .jobs_processed
1835 .load(std::sync::atomic::Ordering::Relaxed),
1836 0
1837 );
1838 assert_eq!(
1839 metrics
1840 .jobs_succeeded
1841 .load(std::sync::atomic::Ordering::Relaxed),
1842 0
1843 );
1844 assert_eq!(
1845 metrics
1846 .jobs_failed
1847 .load(std::sync::atomic::Ordering::Relaxed),
1848 0
1849 );
1850 assert_eq!(
1851 metrics
1852 .jobs_retried
1853 .load(std::sync::atomic::Ordering::Relaxed),
1854 0
1855 );
1856 }
1857
1858 #[tokio::test]
1859 async fn test_tool_worker_clone() {
1860 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1861 ExecutionConfig::default(),
1862 test_app_context(),
1863 );
1864 let tool = Arc::new(MockTool {
1865 name: "test_tool".to_string(),
1866 should_fail: false,
1867 });
1868 worker.register_tool(tool).await;
1869
1870 let cloned_worker = worker.clone();
1871
1872 // Both workers should have access to the same tools
1873 assert_eq!(worker.tools.len(), 1);
1874 assert_eq!(cloned_worker.tools.len(), 1);
1875
1876 // Test processing with cloned worker
1877 let job = Job {
1878 job_id: Uuid::new_v4(),
1879 tool_name: "test_tool".to_string(),
1880 params: serde_json::json!({}),
1881 idempotency_key: None,
1882 max_retries: 0,
1883 retry_count: 0,
1884 };
1885
1886 let result = cloned_worker.process_job(job).await.unwrap();
1887 assert!(result.is_success());
1888 }
1889
1890 #[tokio::test]
1891 async fn test_tool_worker_run_loop() {
1892 use crate::queue::InMemoryJobQueue;
1893
1894 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1895 ExecutionConfig::default(),
1896 test_app_context(),
1897 );
1898 let tool = Arc::new(MockTool {
1899 name: "test_tool".to_string(),
1900 should_fail: false,
1901 });
1902 worker.register_tool(tool).await;
1903
1904 let queue = Arc::new(InMemoryJobQueue::new());
1905
1906 // Enqueue a job
1907 let job = Job {
1908 job_id: Uuid::new_v4(),
1909 tool_name: "test_tool".to_string(),
1910 params: serde_json::json!({}),
1911 idempotency_key: None,
1912 max_retries: 0,
1913 retry_count: 0,
1914 };
1915 queue.enqueue(job).await.unwrap();
1916
1917 // Start the worker run loop with cancellation token
1918 let worker_clone = worker.clone();
1919 let queue_clone = queue.clone();
1920 let cancellation_token = tokio_util::sync::CancellationToken::new();
1921 let token_clone = cancellation_token.clone();
1922
1923 let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
1924
1925 // Give it time to process the job
1926 tokio::time::sleep(Duration::from_millis(50)).await;
1927
1928 // Signal shutdown
1929 cancellation_token.cancel();
1930
1931 // Check that metrics were updated
1932 let metrics = worker.metrics();
1933 assert!(
1934 metrics
1935 .jobs_processed
1936 .load(std::sync::atomic::Ordering::Relaxed)
1937 > 0
1938 );
1939
1940 handle.await.unwrap().unwrap();
1941 }
1942
1943 #[tokio::test]
1944 async fn test_idempotency_cache_hit() {
1945 let store = Arc::new(InMemoryIdempotencyStore::new());
1946 let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
1947 .with_idempotency_store(store.clone());
1948
1949 let tool = Arc::new(MockTool {
1950 name: "test_tool".to_string(),
1951 should_fail: false,
1952 });
1953 worker.register_tool(tool).await;
1954
1955 // Pre-populate the cache
1956 let cached_result = JobResult::Success {
1957 value: serde_json::json!({"cached": true}),
1958 tx_hash: Some("cached_tx_hash".to_string()),
1959 };
1960 store
1961 .set(
1962 "cache_key",
1963 Arc::new(cached_result),
1964 Duration::from_secs(60),
1965 )
1966 .await
1967 .unwrap();
1968
1969 let job = Job {
1970 job_id: Uuid::new_v4(),
1971 tool_name: "test_tool".to_string(),
1972 params: serde_json::json!({}),
1973 idempotency_key: Some("cache_key".to_string()),
1974 max_retries: 0,
1975 retry_count: 0,
1976 };
1977
1978 // Should return cached result without executing the tool
1979 let result = worker.process_job(job).await.unwrap();
1980 match result {
1981 JobResult::Success { value, tx_hash } => {
1982 assert_eq!(value, serde_json::json!({"cached": true}));
1983 assert_eq!(tx_hash, Some("cached_tx_hash".to_string()));
1984 }
1985 _ => panic!("Expected cached success result"),
1986 }
1987 }
1988
1989 #[tokio::test]
1990 async fn test_tool_worker_unknown_error_fallback() {
1991 // Create a worker with a job that will fail with max retries
1992 // but have no last_error set to trigger the "Unknown error" fallback
1993 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
1994 ExecutionConfig::default(),
1995 test_app_context(),
1996 );
1997
1998 // Don't register any tool - this will cause tool not found error
1999 let job = Job {
2000 job_id: Uuid::new_v4(),
2001 tool_name: "nonexistent_tool".to_string(),
2002 params: serde_json::json!({}),
2003 idempotency_key: None,
2004 max_retries: 0,
2005 retry_count: 0,
2006 };
2007
2008 // This should fail with tool not found, not unknown error
2009 let result = worker.process_job(job).await;
2010 assert!(result.is_err());
2011
2012 // The unknown error fallback is actually hard to trigger in normal flow
2013 // It would only happen if there's a bug in the retry logic where
2014 // attempts > max_retries but last_error is None
2015 }
2016
2017 #[tokio::test]
2018 async fn test_run_loop_error_handling() {
2019 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2020 ExecutionConfig::default(),
2021 test_app_context(),
2022 );
2023 let error_queue = Arc::new(ErrorQueue::default());
2024
2025 // Start run loop with cancellation token
2026 let worker_clone = worker.clone();
2027 let queue_clone = error_queue.clone();
2028 let cancellation_token = tokio_util::sync::CancellationToken::new();
2029 let token_clone = cancellation_token.clone();
2030
2031 let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2032
2033 // Give it time to encounter the error
2034 tokio::time::sleep(Duration::from_millis(50)).await;
2035
2036 // Signal shutdown
2037 cancellation_token.cancel();
2038 handle.await.unwrap().unwrap();
2039 }
2040
2041 #[tokio::test]
2042 async fn test_run_loop_empty_queue() {
2043 use crate::queue::InMemoryJobQueue;
2044
2045 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2046 ExecutionConfig::default(),
2047 test_app_context(),
2048 );
2049 let queue = Arc::new(InMemoryJobQueue::new());
2050
2051 // Start run loop with cancellation token - should encounter Ok(None) from empty queue
2052 let worker_clone = worker.clone();
2053 let queue_clone = queue.clone();
2054 let cancellation_token = tokio_util::sync::CancellationToken::new();
2055 let token_clone = cancellation_token.clone();
2056
2057 let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2058
2059 // Give it time to process
2060 tokio::time::sleep(Duration::from_millis(50)).await;
2061
2062 // Signal shutdown
2063 cancellation_token.cancel();
2064 handle.await.unwrap().unwrap();
2065 }
2066
2067 #[tokio::test]
2068 async fn test_run_loop_with_failing_jobs() {
2069 use crate::queue::InMemoryJobQueue;
2070
2071 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2072 ExecutionConfig::default(),
2073 test_app_context(),
2074 );
2075 let fail_tool = Arc::new(MockTool {
2076 name: "fail_tool".to_string(),
2077 should_fail: true,
2078 });
2079 worker.register_tool(fail_tool).await;
2080
2081 let queue = Arc::new(InMemoryJobQueue::new());
2082
2083 // Enqueue a failing job
2084 let job = Job {
2085 job_id: Uuid::new_v4(),
2086 tool_name: "fail_tool".to_string(),
2087 params: serde_json::json!({}),
2088 idempotency_key: None,
2089 max_retries: 0,
2090 retry_count: 0,
2091 };
2092 queue.enqueue(job).await.unwrap();
2093
2094 // Start run loop with cancellation token
2095 let worker_clone = worker.clone();
2096 let queue_clone = queue.clone();
2097 let cancellation_token = tokio_util::sync::CancellationToken::new();
2098 let token_clone = cancellation_token.clone();
2099
2100 let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2101
2102 // Give it time to process the failing job
2103 tokio::time::sleep(Duration::from_millis(50)).await;
2104
2105 // Signal shutdown
2106 cancellation_token.cancel();
2107 handle.await.unwrap().unwrap();
2108
2109 // Verify metrics were updated
2110 let metrics = worker.metrics();
2111 assert!(
2112 metrics
2113 .jobs_processed
2114 .load(std::sync::atomic::Ordering::Relaxed)
2115 > 0
2116 );
2117 }
2118
2119 #[tokio::test]
2120 async fn test_comprehensive_metrics_tracking() {
2121 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2122 ExecutionConfig::default(),
2123 test_app_context(),
2124 );
2125 let success_tool = Arc::new(MockTool {
2126 name: "success_tool".to_string(),
2127 should_fail: false,
2128 });
2129 let fail_tool = Arc::new(RetriableMockTool {
2130 name: "fail_tool".to_string(),
2131 });
2132 worker.register_tool(success_tool).await;
2133 worker.register_tool(fail_tool).await;
2134
2135 let metrics = worker.metrics();
2136
2137 // Process a successful job
2138 let success_job = Job {
2139 job_id: Uuid::new_v4(),
2140 tool_name: "success_tool".to_string(),
2141 params: serde_json::json!({}),
2142 idempotency_key: None,
2143 max_retries: 0,
2144 retry_count: 0,
2145 };
2146 let result = worker.process_job(success_job).await.unwrap();
2147 assert!(result.is_success());
2148
2149 // Verify jobs_succeeded was incremented (line 232)
2150 assert_eq!(
2151 metrics
2152 .jobs_succeeded
2153 .load(std::sync::atomic::Ordering::Relaxed),
2154 1
2155 );
2156
2157 // Process a failing job with retries
2158 let fail_job = Job {
2159 job_id: Uuid::new_v4(),
2160 tool_name: "fail_tool".to_string(),
2161 params: serde_json::json!({}),
2162 idempotency_key: None,
2163 max_retries: 2,
2164 retry_count: 0,
2165 };
2166 let result = worker.process_job(fail_job).await.unwrap();
2167 assert!(!result.is_success());
2168
2169 // Verify jobs_retried was incremented (line 250)
2170 assert_eq!(
2171 metrics
2172 .jobs_retried
2173 .load(std::sync::atomic::Ordering::Relaxed),
2174 1
2175 );
2176
2177 // Verify jobs_failed was incremented (line 264)
2178 assert_eq!(
2179 metrics
2180 .jobs_failed
2181 .load(std::sync::atomic::Ordering::Relaxed),
2182 1
2183 );
2184 }
2185
2186 #[tokio::test]
2187 async fn test_debug_logging_in_retries() {
2188 let config = ExecutionConfig {
2189 initial_retry_delay: Duration::from_millis(1),
2190 ..Default::default()
2191 };
2192
2193 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
2194 let tool = Arc::new(MockTool {
2195 name: "retry_tool".to_string(),
2196 should_fail: true,
2197 });
2198 worker.register_tool(tool).await;
2199
2200 let job = Job {
2201 job_id: Uuid::new_v4(),
2202 tool_name: "retry_tool".to_string(),
2203 params: serde_json::json!({}),
2204 idempotency_key: None,
2205 max_retries: 1,
2206 retry_count: 0,
2207 };
2208
2209 // This should trigger debug logging in the retry loop (lines 205-208)
2210 let _result = worker.process_job(job).await.unwrap();
2211 }
2212
2213 #[tokio::test]
2214 async fn test_worker_startup_logging() {
2215 use crate::queue::InMemoryJobQueue;
2216
2217 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2218 ExecutionConfig::default(),
2219 test_app_context(),
2220 );
2221 let tool = Arc::new(MockTool {
2222 name: "startup_tool".to_string(),
2223 should_fail: false,
2224 });
2225 worker.register_tool(tool).await;
2226
2227 let queue = Arc::new(InMemoryJobQueue::new());
2228
2229 // This should trigger the startup info log
2230 let worker_clone = worker.clone();
2231 let queue_clone = queue.clone();
2232 let cancellation_token = tokio_util::sync::CancellationToken::new();
2233 let token_clone = cancellation_token.clone();
2234
2235 let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2236
2237 // Give it time to start up
2238 tokio::time::sleep(Duration::from_millis(10)).await;
2239
2240 // Signal shutdown
2241 cancellation_token.cancel();
2242 handle.await.unwrap().unwrap();
2243 }
2244
2245 #[tokio::test]
2246 async fn test_timeout_specific_error() {
2247 let config = ExecutionConfig {
2248 default_timeout: Duration::from_millis(1), // Very short timeout
2249 ..Default::default()
2250 };
2251
2252 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
2253 let tool = Arc::new(SlowMockTool {
2254 name: "timeout_tool".to_string(),
2255 delay: Duration::from_millis(50),
2256 });
2257 worker.register_tool(tool).await;
2258
2259 let job = Job {
2260 job_id: Uuid::new_v4(),
2261 tool_name: "timeout_tool".to_string(),
2262 params: serde_json::json!({}),
2263 idempotency_key: None,
2264 max_retries: 0,
2265 retry_count: 0,
2266 };
2267
2268 // This should specifically hit the timeout error assignment (line 240)
2269 let result = worker.process_job(job).await.unwrap();
2270 match result {
2271 JobResult::Failure { error, .. } => {
2272 assert!(error.to_string().to_lowercase().contains("timed out"));
2273 }
2274 _ => panic!("Expected timeout failure"),
2275 }
2276 }
2277
2278 #[tokio::test]
2279 async fn test_resource_matching_edge_cases() {
2280 let limits = ResourceLimits::default()
2281 .with_limit("solana_rpc", 1)
2282 .with_limit("evm_rpc", 1);
2283
2284 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2285 ExecutionConfig::default(),
2286 test_app_context(),
2287 )
2288 .with_resource_limits(limits);
2289
2290 // Register tools with different name patterns to exercise line 278
2291 let solana_tool = Arc::new(MockTool {
2292 name: "solana_balance".to_string(), // Should match solana_ pattern
2293 should_fail: false,
2294 });
2295 let evm_tool = Arc::new(MockTool {
2296 name: "evm_call".to_string(), // Should match evm_ pattern
2297 should_fail: false,
2298 });
2299 let web_tool = Arc::new(MockTool {
2300 name: "web_fetch".to_string(), // Should match web_ pattern
2301 should_fail: false,
2302 });
2303 let other_tool = Arc::new(MockTool {
2304 name: "other_operation".to_string(), // Should use default semaphore
2305 should_fail: false,
2306 });
2307
2308 worker.register_tool(solana_tool).await;
2309 worker.register_tool(evm_tool).await;
2310 worker.register_tool(web_tool).await;
2311 worker.register_tool(other_tool).await;
2312
2313 // Process jobs to exercise the acquire_semaphore method
2314 let job1 = Job {
2315 job_id: Uuid::new_v4(),
2316 tool_name: "solana_balance".to_string(),
2317 params: serde_json::json!({}),
2318 idempotency_key: None,
2319 max_retries: 0,
2320 retry_count: 0,
2321 };
2322
2323 let _result = worker.process_job(job1).await.unwrap();
2324
2325 let job2 = Job {
2326 job_id: Uuid::new_v4(),
2327 tool_name: "other_operation".to_string(),
2328 params: serde_json::json!({}),
2329 idempotency_key: None,
2330 max_retries: 0,
2331 retry_count: 0,
2332 };
2333
2334 let _result = worker.process_job(job2).await.unwrap();
2335 }
2336
2337 #[tokio::test]
2338 async fn test_execution_config_custom_values() {
2339 let config = ExecutionConfig {
2340 max_concurrency: 25,
2341 default_timeout: Duration::from_secs(60),
2342 max_retries: 5,
2343 initial_retry_delay: Duration::from_millis(200),
2344 max_retry_delay: Duration::from_secs(20),
2345 idempotency_ttl: Duration::from_secs(7200),
2346 enable_idempotency: false,
2347 };
2348
2349 assert_eq!(config.max_concurrency, 25);
2350 assert_eq!(config.default_timeout, Duration::from_secs(60));
2351 assert_eq!(config.max_retries, 5);
2352 assert_eq!(config.initial_retry_delay, Duration::from_millis(200));
2353 assert_eq!(config.max_retry_delay, Duration::from_secs(20));
2354 assert_eq!(config.idempotency_ttl, Duration::from_secs(7200));
2355 assert!(!config.enable_idempotency);
2356 }
2357
2358 #[tokio::test]
2359 async fn test_resource_limits_new() {
2360 let limits = ResourceLimits::default();
2361 assert!(limits.get_semaphore("any_resource").is_none());
2362 }
2363
2364 #[tokio::test]
2365 async fn test_resource_limits_with_limit_chaining() {
2366 let limits = ResourceLimits::default()
2367 .with_limit("first", 1)
2368 .with_limit("second", 2)
2369 .with_limit("third", 3);
2370
2371 assert!(limits.get_semaphore("first").is_some());
2372 assert!(limits.get_semaphore("second").is_some());
2373 assert!(limits.get_semaphore("third").is_some());
2374 assert!(limits.get_semaphore("fourth").is_none());
2375 }
2376
2377 #[tokio::test]
2378 async fn test_tool_worker_new_custom_config() {
2379 let config = ExecutionConfig {
2380 max_concurrency: 5,
2381 ..Default::default()
2382 };
2383 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
2384
2385 // Verify worker was created with custom concurrency
2386 // We can't directly access default_semaphore, but we can verify through the config
2387 assert_eq!(worker.config.max_concurrency, 5);
2388 }
2389
2390 #[tokio::test]
2391 async fn test_tool_worker_register_tool_replacement() {
2392 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2393 ExecutionConfig::default(),
2394 test_app_context(),
2395 );
2396
2397 // Register initial tool
2398 let tool1 = Arc::new(MockTool {
2399 name: "test_tool".to_string(),
2400 should_fail: false,
2401 });
2402 worker.register_tool(tool1).await;
2403 assert_eq!(worker.tools.len(), 1);
2404
2405 // Register tool with same name - should replace
2406 let tool2 = Arc::new(MockTool {
2407 name: "test_tool".to_string(),
2408 should_fail: true,
2409 });
2410 worker.register_tool(tool2).await;
2411 assert_eq!(worker.tools.len(), 1);
2412
2413 // Verify the new tool was used (it should fail)
2414 let job = Job {
2415 job_id: Uuid::new_v4(),
2416 tool_name: "test_tool".to_string(),
2417 params: serde_json::json!({}),
2418 idempotency_key: None,
2419 max_retries: 0,
2420 retry_count: 0,
2421 };
2422
2423 let result = worker.process_job(job).await.unwrap();
2424 assert!(!result.is_success());
2425 }
2426
2427 #[tokio::test]
2428 async fn test_idempotency_store_error_handling() {
2429 let store = Arc::new(ErrorIdempotencyStore::default());
2430 let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
2431 .with_idempotency_store(store.clone());
2432
2433 let tool = Arc::new(MockTool {
2434 name: "test_tool".to_string(),
2435 should_fail: false,
2436 });
2437 worker.register_tool(tool).await;
2438
2439 let job = Job {
2440 job_id: Uuid::new_v4(),
2441 tool_name: "test_tool".to_string(),
2442 params: serde_json::json!({}),
2443 idempotency_key: Some("test_key".to_string()),
2444 max_retries: 0,
2445 retry_count: 0,
2446 };
2447
2448 // Should fail with idempotency store error
2449 let result = worker.process_job(job).await;
2450 assert!(result.is_err());
2451 assert!(result.unwrap_err().to_string().contains("Store error"));
2452 }
2453
2454 #[tokio::test]
2455 async fn test_no_idempotency_key_no_cache_check() {
2456 let store = Arc::new(InMemoryIdempotencyStore::new());
2457 let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
2458 .with_idempotency_store(store.clone());
2459
2460 let tool = Arc::new(MockTool {
2461 name: "test_tool".to_string(),
2462 should_fail: false,
2463 });
2464 worker.register_tool(tool).await;
2465
2466 let job = Job {
2467 job_id: Uuid::new_v4(),
2468 tool_name: "test_tool".to_string(),
2469 params: serde_json::json!({}),
2470 idempotency_key: None, // No idempotency key
2471 max_retries: 0,
2472 retry_count: 0,
2473 };
2474
2475 // Should execute normally without cache check
2476 let result = worker.process_job(job).await.unwrap();
2477 assert!(result.is_success());
2478 }
2479
2480 #[tokio::test]
2481 async fn test_no_idempotency_store_no_cache() {
2482 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2483 ExecutionConfig::default(),
2484 test_app_context(),
2485 );
2486 // Don't set idempotency store
2487
2488 let tool = Arc::new(MockTool {
2489 name: "test_tool".to_string(),
2490 should_fail: false,
2491 });
2492 worker.register_tool(tool).await;
2493
2494 let job = Job {
2495 job_id: Uuid::new_v4(),
2496 tool_name: "test_tool".to_string(),
2497 params: serde_json::json!({}),
2498 idempotency_key: Some("test_key".to_string()),
2499 max_retries: 0,
2500 retry_count: 0,
2501 };
2502
2503 // Should execute normally without cache check
2504 let result = worker.process_job(job).await.unwrap();
2505 assert!(result.is_success());
2506 }
2507
2508 #[tokio::test]
2509 async fn test_cache_result_with_no_idempotency_key() {
2510 let store = Arc::new(InMemoryIdempotencyStore::new());
2511 let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
2512 .with_idempotency_store(store.clone());
2513
2514 let tool = Arc::new(MockTool {
2515 name: "test_tool".to_string(),
2516 should_fail: false,
2517 });
2518 worker.register_tool(tool).await;
2519
2520 let job = Job {
2521 job_id: Uuid::new_v4(),
2522 tool_name: "test_tool".to_string(),
2523 params: serde_json::json!({}),
2524 idempotency_key: None, // No idempotency key
2525 max_retries: 0,
2526 retry_count: 0,
2527 };
2528
2529 // Should execute and not attempt to cache
2530 let result = worker.process_job(job).await.unwrap();
2531 assert!(result.is_success());
2532 }
2533
2534 #[tokio::test]
2535 async fn test_cache_result_with_idempotency_disabled() {
2536 let config = ExecutionConfig {
2537 enable_idempotency: false,
2538 ..Default::default()
2539 };
2540
2541 let store = Arc::new(InMemoryIdempotencyStore::new());
2542 let worker =
2543 ToolWorker::new(config, test_app_context()).with_idempotency_store(store.clone());
2544
2545 let tool = Arc::new(MockTool {
2546 name: "test_tool".to_string(),
2547 should_fail: false,
2548 });
2549 worker.register_tool(tool).await;
2550
2551 let job = Job {
2552 job_id: Uuid::new_v4(),
2553 tool_name: "test_tool".to_string(),
2554 params: serde_json::json!({}),
2555 idempotency_key: Some("test_key".to_string()),
2556 max_retries: 0,
2557 retry_count: 0,
2558 };
2559
2560 // Should execute but not cache due to disabled idempotency
2561 let result = worker.process_job(job).await.unwrap();
2562 assert!(result.is_success());
2563
2564 // Verify nothing was cached
2565 assert!(store.get("test_key").await.unwrap().is_none());
2566 }
2567
2568 #[tokio::test]
2569 async fn test_cache_result_with_no_store() {
2570 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2571 ExecutionConfig::default(),
2572 test_app_context(),
2573 );
2574 // Don't set idempotency store
2575
2576 let tool = Arc::new(MockTool {
2577 name: "test_tool".to_string(),
2578 should_fail: false,
2579 });
2580 worker.register_tool(tool).await;
2581
2582 let job = Job {
2583 job_id: Uuid::new_v4(),
2584 tool_name: "test_tool".to_string(),
2585 params: serde_json::json!({}),
2586 idempotency_key: Some("test_key".to_string()),
2587 max_retries: 0,
2588 retry_count: 0,
2589 };
2590
2591 // Should execute but not cache due to no store
2592 let result = worker.process_job(job).await.unwrap();
2593 assert!(result.is_success());
2594 }
2595
2596 // Tests for create_backoff_strategy and wait_with_backoff removed
2597 // These methods are now handled by the unified retry helper in retry.rs
2598
2599 #[tokio::test]
2600 async fn test_acquire_semaphore_web_prefix() {
2601 let limits = ResourceLimits::default().with_limit("http_api", 1);
2602
2603 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2604 ExecutionConfig::default(),
2605 test_app_context(),
2606 )
2607 .with_resource_limits(limits);
2608
2609 // Test web_ prefix maps to http_api
2610 let _permit = worker.acquire_semaphore("web_fetch").await.unwrap();
2611 }
2612
2613 #[tokio::test]
2614 async fn test_acquire_semaphore_no_matching_resource() {
2615 let limits = ResourceLimits::default(); // Empty limits
2616
2617 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2618 ExecutionConfig::default(),
2619 test_app_context(),
2620 )
2621 .with_resource_limits(limits);
2622
2623 // Should fall back to default semaphore
2624 let _permit = worker.acquire_semaphore("solana_test").await.unwrap();
2625 let _permit = worker.acquire_semaphore("random_tool").await.unwrap();
2626 }
2627
2628 #[tokio::test]
2629 async fn test_run_loop_job_processing_success_logging() {
2630 use crate::queue::InMemoryJobQueue;
2631
2632 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2633 ExecutionConfig::default(),
2634 test_app_context(),
2635 );
2636 let tool = Arc::new(MockTool {
2637 name: "log_test_tool".to_string(),
2638 should_fail: false,
2639 });
2640 worker.register_tool(tool).await;
2641
2642 let queue = Arc::new(InMemoryJobQueue::new());
2643
2644 // Enqueue a successful job
2645 let job = Job {
2646 job_id: Uuid::new_v4(),
2647 tool_name: "log_test_tool".to_string(),
2648 params: serde_json::json!({}),
2649 idempotency_key: None,
2650 max_retries: 0,
2651 retry_count: 0,
2652 };
2653 queue.enqueue(job).await.unwrap();
2654
2655 let worker_clone = worker.clone();
2656 let queue_clone = queue.clone();
2657 let cancellation_token = tokio_util::sync::CancellationToken::new();
2658 let token_clone = cancellation_token.clone();
2659
2660 let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2661
2662 // Give it time to process
2663 tokio::time::sleep(Duration::from_millis(50)).await;
2664
2665 cancellation_token.cancel();
2666 handle.await.unwrap().unwrap();
2667
2668 // Check that jobs_processed was incremented (line 1274)
2669 let metrics = worker.metrics();
2670 assert!(
2671 metrics
2672 .jobs_processed
2673 .load(std::sync::atomic::Ordering::Relaxed)
2674 > 0
2675 );
2676 }
2677
2678 #[tokio::test]
2679 async fn test_run_loop_worker_error_logging() {
2680 use crate::queue::InMemoryJobQueue;
2681
2682 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
2683 ExecutionConfig::default(),
2684 test_app_context(),
2685 );
2686 // Don't register any tools to cause WorkerError
2687
2688 let queue = Arc::new(InMemoryJobQueue::new());
2689
2690 // Enqueue a job for non-existent tool
2691 let job = Job {
2692 job_id: Uuid::new_v4(),
2693 tool_name: "nonexistent_tool".to_string(),
2694 params: serde_json::json!({}),
2695 idempotency_key: None,
2696 max_retries: 0,
2697 retry_count: 0,
2698 };
2699 queue.enqueue(job).await.unwrap();
2700
2701 let worker_clone = worker.clone();
2702 let queue_clone = queue.clone();
2703 let cancellation_token = tokio_util::sync::CancellationToken::new();
2704 let token_clone = cancellation_token.clone();
2705
2706 let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
2707
2708 // Give it time to process and hit the error path
2709 tokio::time::sleep(Duration::from_millis(50)).await;
2710
2711 cancellation_token.cancel();
2712 handle.await.unwrap().unwrap();
2713 }
2714
2715 #[tokio::test]
2716 async fn test_semaphore_acquisition_error() {
2717 // This test is tricky since semaphore acquisition rarely fails
2718 // But we can test the error path by creating a scenario where it might
2719 let config = ExecutionConfig {
2720 max_concurrency: 1,
2721 ..Default::default()
2722 };
2723
2724 let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
2725 let tool = Arc::new(MockTool {
2726 name: "test_tool".to_string(),
2727 should_fail: false,
2728 });
2729 worker.register_tool(tool).await;
2730
2731 let job = Job {
2732 job_id: Uuid::new_v4(),
2733 tool_name: "test_tool".to_string(),
2734 params: serde_json::json!({}),
2735 idempotency_key: None,
2736 max_retries: 0,
2737 retry_count: 0,
2738 };
2739
2740 // Normal case should work
2741 let result = worker.process_job(job).await;
2742 assert!(result.is_ok());
2743 }
2744
2745 #[test]
2746 fn test_tool_trait_description_method() {
2747 let tool = MockTool {
2748 name: "test".to_string(),
2749 should_fail: false,
2750 };
2751
2752 // Test that description method returns expected value
2753 assert_eq!(tool.description(), "");
2754 }
2755
2756 #[test]
2757 fn test_mock_tool_name_method() {
2758 let tool = MockTool {
2759 name: "test_name".to_string(),
2760 should_fail: false,
2761 };
2762
2763 assert_eq!(tool.name(), "test_name");
2764 }
2765
2766 #[test]
2767 fn test_slow_mock_tool_name_method() {
2768 let tool = SlowMockTool {
2769 name: "slow_test".to_string(),
2770 delay: Duration::from_millis(1),
2771 };
2772
2773 assert_eq!(tool.name(), "slow_test");
2774 assert_eq!(tool.description(), "");
2775 }
2776
2777 #[test]
2778 fn test_error_queue_new() {
2779 let _queue = ErrorQueue::default();
2780 // Just testing construction
2781 }
2782
2783 #[derive(Default)]
2784 struct ErrorIdempotencyStore {
2785 _phantom: std::marker::PhantomData<()>,
2786 }
2787
2788 #[async_trait]
2789 impl crate::idempotency::IdempotencyStore for ErrorIdempotencyStore {
2790 async fn get(&self, _key: &str) -> anyhow::Result<Option<Arc<JobResult>>> {
2791 Err(anyhow::anyhow!("Store error"))
2792 }
2793
2794 async fn set(
2795 &self,
2796 _key: &str,
2797 _result: Arc<JobResult>,
2798 _ttl: Duration,
2799 ) -> anyhow::Result<()> {
2800 Err(anyhow::anyhow!("Store error"))
2801 }
2802
2803 async fn remove(&self, _key: &str) -> anyhow::Result<()> {
2804 Err(anyhow::anyhow!("Store error"))
2805 }
2806 }
2807
2808 struct SlowMockTool {
2809 name: String,
2810 delay: Duration,
2811 }
2812
2813 #[async_trait]
2814 impl Tool for SlowMockTool {
2815 async fn execute(
2816 &self,
2817 _params: serde_json::Value,
2818 _context: &crate::provider::ApplicationContext,
2819 ) -> Result<JobResult, ToolError> {
2820 tokio::time::sleep(self.delay).await;
2821 Ok(JobResult::Success {
2822 value: serde_json::json!({"result": "slow_success"}),
2823 tx_hash: None,
2824 })
2825 }
2826
2827 fn name(&self) -> &str {
2828 &self.name
2829 }
2830
2831 fn description(&self) -> &str {
2832 ""
2833 }
2834
2835 fn schema(&self) -> serde_json::Value {
2836 serde_json::json!({
2837 "type": "object",
2838 "additionalProperties": true
2839 })
2840 }
2841 }
2842
2843 #[derive(Default)]
2844 struct ErrorQueue {
2845 _phantom: std::marker::PhantomData<()>,
2846 }
2847
2848 #[async_trait]
2849 impl crate::queue::JobQueue for ErrorQueue {
2850 async fn enqueue(&self, _job: crate::jobs::Job) -> anyhow::Result<()> {
2851 Err(anyhow::anyhow!("Queue error"))
2852 }
2853
2854 async fn dequeue(&self) -> anyhow::Result<Option<crate::jobs::Job>> {
2855 Err(anyhow::anyhow!("Dequeue error"))
2856 }
2857
2858 async fn dequeue_with_timeout(
2859 &self,
2860 _timeout: Duration,
2861 ) -> anyhow::Result<Option<crate::jobs::Job>> {
2862 Err(anyhow::anyhow!("Dequeue timeout error"))
2863 }
2864
2865 async fn len(&self) -> anyhow::Result<usize> {
2866 Err(anyhow::anyhow!("Len error"))
2867 }
2868 }
2869}