use async_trait::async_trait;
use dashmap::DashMap;
use std::collections::HashMap;
use std::sync::Arc;
use std::time::Duration;
use tokio::sync::{OwnedSemaphorePermit, Semaphore};
use tracing::{debug, error, info, warn};
use crate::error::{ToolError, WorkerError};
use crate::idempotency::IdempotencyStore;
use crate::jobs::{Job, JobResult};
use crate::queue::JobQueue;
use crate::signer::SignerContext;
#[async_trait]
pub trait Tool: Send + Sync {
async fn execute(
&self,
params: serde_json::Value,
context: &crate::provider::ApplicationContext,
) -> Result<JobResult, ToolError>;
fn name(&self) -> &str;
fn description(&self) -> &str;
fn schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"additionalProperties": true
})
}
}
#[derive(Debug, Clone)]
pub struct ExecutionConfig {
pub max_concurrency: usize,
pub default_timeout: Duration,
pub max_retries: u32,
pub initial_retry_delay: Duration,
pub max_retry_delay: Duration,
pub idempotency_ttl: Duration,
pub enable_idempotency: bool,
}
impl Default for ExecutionConfig {
fn default() -> Self {
Self {
max_concurrency: 10,
default_timeout: Duration::from_secs(30),
max_retries: 3,
initial_retry_delay: Duration::from_millis(100),
max_retry_delay: Duration::from_secs(10),
idempotency_ttl: Duration::from_secs(3600), enable_idempotency: true,
}
}
}
#[derive(Debug, Clone)]
pub struct ResourceLimits {
semaphores: Arc<HashMap<String, Arc<Semaphore>>>,
}
impl ResourceLimits {
pub fn with_limit(mut self, resource: impl Into<String>, limit: usize) -> Self {
let semaphores = Arc::make_mut(&mut self.semaphores);
semaphores.insert(resource.into(), Arc::new(Semaphore::new(limit)));
self
}
pub fn get_semaphore(&self, resource: &str) -> Option<Arc<Semaphore>> {
self.semaphores.get(resource).cloned()
}
}
impl Default for ResourceLimits {
fn default() -> Self {
let mut semaphores = HashMap::new();
semaphores.insert("solana_rpc".to_string(), Arc::new(Semaphore::new(5)));
semaphores.insert("evm_rpc".to_string(), Arc::new(Semaphore::new(10)));
semaphores.insert("http_api".to_string(), Arc::new(Semaphore::new(20)));
Self {
semaphores: Arc::new(semaphores),
}
}
}
pub struct ToolWorker<I: IdempotencyStore + 'static> {
tools: Arc<DashMap<String, Arc<dyn Tool>>>,
default_semaphore: Arc<Semaphore>,
resource_limits: ResourceLimits,
config: ExecutionConfig,
idempotency_store: Option<Arc<I>>,
metrics: Arc<WorkerMetrics>,
app_context: crate::provider::ApplicationContext,
}
#[derive(Debug, Default)]
pub struct WorkerMetrics {
pub jobs_processed: std::sync::atomic::AtomicU64,
pub jobs_succeeded: std::sync::atomic::AtomicU64,
pub jobs_failed: std::sync::atomic::AtomicU64,
pub jobs_retried: std::sync::atomic::AtomicU64,
}
impl<I: IdempotencyStore + 'static> std::fmt::Debug for ToolWorker<I> {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("ToolWorker")
.field("tools_count", &self.tools.len())
.field("config", &self.config)
.finish_non_exhaustive()
}
}
impl<I: IdempotencyStore + 'static> ToolWorker<I> {
pub fn new(config: ExecutionConfig, app_context: crate::provider::ApplicationContext) -> Self {
Self {
tools: Arc::new(DashMap::new()),
default_semaphore: Arc::new(Semaphore::new(config.max_concurrency)),
resource_limits: ResourceLimits::default(),
config,
idempotency_store: None,
metrics: Arc::new(WorkerMetrics::default()),
app_context,
}
}
pub fn with_idempotency_store(mut self, store: Arc<I>) -> Self {
self.idempotency_store = Some(store);
self
}
pub fn with_resource_limits(mut self, limits: ResourceLimits) -> Self {
self.resource_limits = limits;
self
}
pub async fn register_tool(&self, tool: Arc<dyn Tool>) {
self.tools.insert(tool.name().to_string(), tool);
}
pub fn metrics(&self) -> &WorkerMetrics {
&self.metrics
}
pub async fn process_job(&self, mut job: Job) -> Result<JobResult, WorkerError> {
if let Some(cached_result) = self.check_idempotency_cache(&job).await? {
return Ok(Arc::try_unwrap(cached_result).unwrap_or_else(|arc| (*arc).clone()));
}
if let Ok(signer) = SignerContext::current().await {
if let Some(user_id) = signer.user_id() {
if let Err(rate_limit_error) =
self.app_context.rate_limiter.check_rate_limit(&user_id)
{
return Ok(JobResult::Failure {
error: rate_limit_error,
});
}
}
}
let _permit = self.acquire_semaphore(&job.tool_name).await.map_err(|e| {
WorkerError::SemaphoreAcquisition {
tool_name: job.tool_name.clone(),
source_message: e.to_string(),
}
})?;
let tool = self.get_tool(&job.tool_name).await?;
let result = self.execute_with_retries(tool, &mut job).await;
if let Ok(ref job_result) = result {
self.cache_result(&job, job_result).await;
}
result
}
async fn check_idempotency_cache(
&self,
job: &Job,
) -> Result<Option<Arc<JobResult>>, WorkerError> {
if let Some(ref idempotency_key) = job.idempotency_key {
if self.config.enable_idempotency {
if let Some(ref store) = self.idempotency_store {
match store.get(idempotency_key).await {
Ok(Some(cached_result)) => {
info!(
"Returning cached result for idempotency key: {}",
idempotency_key
);
return Ok(Some(cached_result));
}
Ok(None) => {} Err(e) => {
return Err(WorkerError::IdempotencyStore {
source_message: e.to_string(),
});
}
}
}
}
}
Ok(None)
}
async fn get_tool(&self, tool_name: &str) -> Result<Arc<dyn Tool>, WorkerError> {
self.tools
.get(tool_name)
.ok_or_else(|| WorkerError::ToolNotFound {
tool_name: tool_name.to_string(),
})
.map(|entry| (*entry).clone())
}
async fn execute_with_retries(
&self,
tool: Arc<dyn Tool>,
job: &mut Job,
) -> Result<JobResult, WorkerError> {
use crate::retry::{retry_async, ErrorClass, RetryConfig};
let retry_config = RetryConfig {
max_retries: job.max_retries,
base_delay_ms: self.config.initial_retry_delay.as_millis() as u64,
max_delay_ms: self.config.max_retry_delay.as_millis() as u64,
backoff_multiplier: 2.0,
use_jitter: true,
};
let result = retry_async(
|| async { self.execute_single_attempt(&tool, &job.params).await },
|error| {
if error.is_retriable() {
ErrorClass::Retryable
} else {
ErrorClass::Permanent
}
},
&retry_config,
&format!("job_{}", job.job_id),
)
.await;
match result {
Ok(job_result) => {
self.metrics
.jobs_succeeded
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
Ok(job_result)
}
Err(tool_error) => {
self.metrics
.jobs_failed
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
if retry_config.max_retries > 0 {
self.metrics
.jobs_retried
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}
Ok(JobResult::Failure { error: tool_error })
}
}
}
async fn execute_single_attempt(
&self,
tool: &Arc<dyn Tool>,
params: &serde_json::Value,
) -> Result<JobResult, ToolError> {
let result = tokio::time::timeout(
self.config.default_timeout,
tool.execute(params.clone(), &self.app_context),
)
.await;
match result {
Ok(Ok(job_result)) => Ok(job_result),
Ok(Err(e)) => {
warn!("Tool execution failed: {}", e);
Err(e)
}
Err(_) => {
let error_msg = format!(
"Tool execution timed out after {:?}",
self.config.default_timeout
);
warn!("{}", error_msg);
Err(ToolError::retriable_string(error_msg))
}
}
}
async fn cache_result(&self, job: &Job, job_result: &JobResult) {
if let Some(ref idempotency_key) = job.idempotency_key {
if self.config.enable_idempotency {
if let Some(ref store) = self.idempotency_store {
let _ = store
.set(
idempotency_key,
Arc::new(job_result.clone()),
self.config.idempotency_ttl,
)
.await;
}
}
}
}
async fn acquire_semaphore(
&self,
tool_name: &str,
) -> Result<OwnedSemaphorePermit, Box<dyn std::error::Error + Send + Sync>> {
let resource_name = match tool_name {
name if name.starts_with("solana_") => "solana_rpc",
name if name.starts_with("evm_") => "evm_rpc",
name if name.starts_with("web_") => "http_api",
_ => "",
};
if !resource_name.is_empty() {
if let Some(semaphore) = self.resource_limits.get_semaphore(resource_name) {
return Ok(semaphore.acquire_owned().await?);
}
}
Ok(self.default_semaphore.clone().acquire_owned().await?)
}
pub async fn run<Q: JobQueue>(
&self,
queue: Arc<Q>,
cancellation_token: tokio_util::sync::CancellationToken,
) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
info!(
"Starting ToolWorker with {} tools registered",
self.tools.len()
);
while !cancellation_token.is_cancelled() {
tokio::select! {
_ = cancellation_token.cancelled() => {
info!("Worker shutdown requested, stopping job processing");
break;
}
result = queue.dequeue_with_timeout(Duration::from_secs(5)) => {
match result {
Ok(Some(job)) => {
let job_id = job.job_id;
let tool_name = job.tool_name.clone();
self.metrics
.jobs_processed
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
let worker = self.clone();
tokio::spawn(async move {
match worker.process_job(job).await {
Ok(job_result) => {
if job_result.is_success() {
info!("Job {} ({}) completed successfully", job_id, tool_name);
} else {
warn!(
"Job {} ({}) failed: {:?}",
job_id, tool_name, job_result
);
}
}
Err(e) => {
error!("Job {} ({}) processing error: {}", job_id, tool_name, e);
}
}
});
}
Ok(None) => {
debug!("No jobs available in queue");
}
Err(e) => {
error!("Failed to dequeue job: {}", e);
tokio::time::sleep(Duration::from_secs(1)).await;
}
}
}
}
}
info!("ToolWorker shutdown completed");
Ok(())
}
}
impl<I: IdempotencyStore + 'static> Clone for ToolWorker<I> {
fn clone(&self) -> Self {
Self {
tools: self.tools.clone(),
default_semaphore: self.default_semaphore.clone(),
resource_limits: self.resource_limits.clone(),
config: self.config.clone(),
idempotency_store: self.idempotency_store.clone(),
metrics: self.metrics.clone(),
app_context: self.app_context.clone(),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::idempotency::InMemoryIdempotencyStore;
use crate::jobs::Job;
use crate::provider::ApplicationContext;
use uuid::Uuid;
fn test_app_context() -> ApplicationContext {
ApplicationContext::default()
}
struct MockTool {
name: String,
should_fail: bool,
}
struct RetriableMockTool {
name: String,
}
#[async_trait]
impl Tool for MockTool {
async fn execute(
&self,
_params: serde_json::Value,
_context: &crate::provider::ApplicationContext,
) -> Result<JobResult, ToolError> {
if self.should_fail {
Err(ToolError::permanent_string("Mock failure"))
} else {
Ok(JobResult::Success {
value: serde_json::json!({"result": "success"}),
tx_hash: None,
})
}
}
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
""
}
fn schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"additionalProperties": true
})
}
}
#[async_trait]
impl Tool for RetriableMockTool {
async fn execute(
&self,
_params: serde_json::Value,
_context: &crate::provider::ApplicationContext,
) -> Result<JobResult, ToolError> {
Err(ToolError::retriable_string("Mock retriable failure"))
}
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
""
}
fn schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"additionalProperties": true
})
}
}
#[tokio::test]
async fn test_tool_worker_process_job() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 3,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
match result {
JobResult::Success { .. } => (),
_ => panic!("Expected success"),
}
}
#[tokio::test]
async fn test_tool_worker_with_idempotency() {
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
.with_idempotency_store(store.clone());
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: Some("test_key".to_string()),
max_retries: 3,
retry_count: 0,
};
let result1 = worker.process_job(job.clone()).await.unwrap();
assert!(result1.is_success());
let result2 = worker.process_job(job).await.unwrap();
assert!(result2.is_success());
}
#[tokio::test]
async fn test_tool_worker_with_retries() {
let config = ExecutionConfig {
initial_retry_delay: Duration::from_millis(10),
..Default::default()
};
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: true,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 2,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
match result {
JobResult::Failure { .. } => {
assert!(!result.is_retriable()); }
_ => panic!("Expected failure"),
}
}
#[tokio::test]
async fn test_tool_worker_tool_not_found() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "nonexistent_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await;
assert!(result.is_err());
assert!(result
.unwrap_err()
.to_string()
.contains("Tool 'nonexistent_tool' not found"));
}
#[tokio::test]
async fn test_tool_worker_timeout() {
let config = ExecutionConfig {
default_timeout: Duration::from_millis(10), ..Default::default()
};
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
let tool = Arc::new(SlowMockTool {
name: "slow_tool".to_string(),
delay: Duration::from_millis(100),
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "slow_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 1,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
match result {
JobResult::Failure { error, .. } => {
assert!(error.to_string().to_lowercase().contains("timed out"));
}
_ => panic!("Expected timeout failure"),
}
}
#[tokio::test]
async fn test_tool_worker_with_resource_limits() {
let config = ExecutionConfig::default();
let limits = ResourceLimits::default()
.with_limit("solana_rpc", 2)
.with_limit("evm_rpc", 3);
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context())
.with_resource_limits(limits);
let solana_tool = Arc::new(MockTool {
name: "solana_test".to_string(),
should_fail: false,
});
let evm_tool = Arc::new(MockTool {
name: "evm_test".to_string(),
should_fail: false,
});
let web_tool = Arc::new(MockTool {
name: "web_test".to_string(),
should_fail: false,
});
let other_tool = Arc::new(MockTool {
name: "other_test".to_string(),
should_fail: false,
});
worker.register_tool(solana_tool).await;
worker.register_tool(evm_tool).await;
worker.register_tool(web_tool).await;
worker.register_tool(other_tool).await;
let jobs = vec![
Job {
job_id: Uuid::new_v4(),
tool_name: "solana_test".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
},
Job {
job_id: Uuid::new_v4(),
tool_name: "evm_test".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
},
Job {
job_id: Uuid::new_v4(),
tool_name: "web_test".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
},
Job {
job_id: Uuid::new_v4(),
tool_name: "other_test".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
},
];
for job in jobs {
let result = worker.process_job(job).await.unwrap();
assert!(result.is_success());
}
}
#[tokio::test]
async fn test_tool_worker_idempotency_disabled() {
let config = ExecutionConfig {
enable_idempotency: false,
..Default::default()
};
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker =
ToolWorker::new(config, test_app_context()).with_idempotency_store(store.clone());
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: Some("test_key".to_string()),
max_retries: 0,
retry_count: 0,
};
let result1 = worker.process_job(job.clone()).await.unwrap();
assert!(result1.is_success());
let result2 = worker.process_job(job).await.unwrap();
assert!(result2.is_success());
assert!(store.get("test_key").await.unwrap().is_none());
}
#[tokio::test]
async fn test_tool_worker_metrics() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let success_tool = Arc::new(MockTool {
name: "success_tool".to_string(),
should_fail: false,
});
let fail_tool = Arc::new(RetriableMockTool {
name: "fail_tool".to_string(),
});
worker.register_tool(success_tool).await;
worker.register_tool(fail_tool).await;
let metrics = worker.metrics();
assert_eq!(
metrics
.jobs_processed
.load(std::sync::atomic::Ordering::Relaxed),
0
);
assert_eq!(
metrics
.jobs_succeeded
.load(std::sync::atomic::Ordering::Relaxed),
0
);
assert_eq!(
metrics
.jobs_failed
.load(std::sync::atomic::Ordering::Relaxed),
0
);
assert_eq!(
metrics
.jobs_retried
.load(std::sync::atomic::Ordering::Relaxed),
0
);
let success_job = Job {
job_id: Uuid::new_v4(),
tool_name: "success_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
worker.process_job(success_job).await.unwrap();
assert_eq!(
metrics
.jobs_succeeded
.load(std::sync::atomic::Ordering::Relaxed),
1
);
let fail_job = Job {
job_id: Uuid::new_v4(),
tool_name: "fail_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 2,
retry_count: 0,
};
worker.process_job(fail_job).await.unwrap();
assert_eq!(
metrics
.jobs_failed
.load(std::sync::atomic::Ordering::Relaxed),
1
);
assert_eq!(
metrics
.jobs_retried
.load(std::sync::atomic::Ordering::Relaxed),
1
);
}
#[tokio::test]
async fn test_execution_config_default() {
let config = ExecutionConfig::default();
assert_eq!(config.max_concurrency, 10);
assert_eq!(config.default_timeout, Duration::from_secs(30));
assert_eq!(config.max_retries, 3);
assert_eq!(config.initial_retry_delay, Duration::from_millis(100));
assert_eq!(config.max_retry_delay, Duration::from_secs(10));
assert_eq!(config.idempotency_ttl, Duration::from_secs(3600));
assert!(config.enable_idempotency);
}
#[tokio::test]
async fn test_resource_limits() {
let limits = ResourceLimits::default()
.with_limit("test_resource", 5)
.with_limit("another_resource", 10);
assert!(limits.get_semaphore("test_resource").is_some());
assert!(limits.get_semaphore("another_resource").is_some());
assert!(limits.get_semaphore("nonexistent").is_none());
let default_limits = ResourceLimits::default();
assert!(default_limits.get_semaphore("solana_rpc").is_some());
assert!(default_limits.get_semaphore("evm_rpc").is_some());
assert!(default_limits.get_semaphore("http_api").is_some());
}
#[tokio::test]
async fn test_worker_metrics_default() {
let metrics = WorkerMetrics::default();
assert_eq!(
metrics
.jobs_processed
.load(std::sync::atomic::Ordering::Relaxed),
0
);
assert_eq!(
metrics
.jobs_succeeded
.load(std::sync::atomic::Ordering::Relaxed),
0
);
assert_eq!(
metrics
.jobs_failed
.load(std::sync::atomic::Ordering::Relaxed),
0
);
assert_eq!(
metrics
.jobs_retried
.load(std::sync::atomic::Ordering::Relaxed),
0
);
}
#[tokio::test]
async fn test_tool_worker_clone() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let cloned_worker = worker.clone();
assert_eq!(worker.tools.len(), 1);
assert_eq!(cloned_worker.tools.len(), 1);
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
let result = cloned_worker.process_job(job).await.unwrap();
assert!(result.is_success());
}
#[tokio::test]
async fn test_tool_worker_run_loop() {
use crate::queue::InMemoryJobQueue;
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let queue = Arc::new(InMemoryJobQueue::new());
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
queue.enqueue(job).await.unwrap();
let worker_clone = worker.clone();
let queue_clone = queue.clone();
let cancellation_token = tokio_util::sync::CancellationToken::new();
let token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
tokio::time::sleep(Duration::from_millis(50)).await;
cancellation_token.cancel();
let metrics = worker.metrics();
assert!(
metrics
.jobs_processed
.load(std::sync::atomic::Ordering::Relaxed)
> 0
);
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn test_idempotency_cache_hit() {
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
.with_idempotency_store(store.clone());
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let cached_result = JobResult::Success {
value: serde_json::json!({"cached": true}),
tx_hash: Some("cached_tx_hash".to_string()),
};
store
.set(
"cache_key",
Arc::new(cached_result),
Duration::from_secs(60),
)
.await
.unwrap();
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: Some("cache_key".to_string()),
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
match result {
JobResult::Success { value, tx_hash } => {
assert_eq!(value, serde_json::json!({"cached": true}));
assert_eq!(tx_hash, Some("cached_tx_hash".to_string()));
}
_ => panic!("Expected cached success result"),
}
}
#[tokio::test]
async fn test_tool_worker_unknown_error_fallback() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "nonexistent_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await;
assert!(result.is_err());
}
#[tokio::test]
async fn test_run_loop_error_handling() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let error_queue = Arc::new(ErrorQueue::default());
let worker_clone = worker.clone();
let queue_clone = error_queue.clone();
let cancellation_token = tokio_util::sync::CancellationToken::new();
let token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
tokio::time::sleep(Duration::from_millis(50)).await;
cancellation_token.cancel();
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn test_run_loop_empty_queue() {
use crate::queue::InMemoryJobQueue;
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let queue = Arc::new(InMemoryJobQueue::new());
let worker_clone = worker.clone();
let queue_clone = queue.clone();
let cancellation_token = tokio_util::sync::CancellationToken::new();
let token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
tokio::time::sleep(Duration::from_millis(50)).await;
cancellation_token.cancel();
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn test_run_loop_with_failing_jobs() {
use crate::queue::InMemoryJobQueue;
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let fail_tool = Arc::new(MockTool {
name: "fail_tool".to_string(),
should_fail: true,
});
worker.register_tool(fail_tool).await;
let queue = Arc::new(InMemoryJobQueue::new());
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "fail_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
queue.enqueue(job).await.unwrap();
let worker_clone = worker.clone();
let queue_clone = queue.clone();
let cancellation_token = tokio_util::sync::CancellationToken::new();
let token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
tokio::time::sleep(Duration::from_millis(50)).await;
cancellation_token.cancel();
handle.await.unwrap().unwrap();
let metrics = worker.metrics();
assert!(
metrics
.jobs_processed
.load(std::sync::atomic::Ordering::Relaxed)
> 0
);
}
#[tokio::test]
async fn test_comprehensive_metrics_tracking() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let success_tool = Arc::new(MockTool {
name: "success_tool".to_string(),
should_fail: false,
});
let fail_tool = Arc::new(RetriableMockTool {
name: "fail_tool".to_string(),
});
worker.register_tool(success_tool).await;
worker.register_tool(fail_tool).await;
let metrics = worker.metrics();
let success_job = Job {
job_id: Uuid::new_v4(),
tool_name: "success_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(success_job).await.unwrap();
assert!(result.is_success());
assert_eq!(
metrics
.jobs_succeeded
.load(std::sync::atomic::Ordering::Relaxed),
1
);
let fail_job = Job {
job_id: Uuid::new_v4(),
tool_name: "fail_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 2,
retry_count: 0,
};
let result = worker.process_job(fail_job).await.unwrap();
assert!(!result.is_success());
assert_eq!(
metrics
.jobs_retried
.load(std::sync::atomic::Ordering::Relaxed),
1
);
assert_eq!(
metrics
.jobs_failed
.load(std::sync::atomic::Ordering::Relaxed),
1
);
}
#[tokio::test]
async fn test_debug_logging_in_retries() {
let config = ExecutionConfig {
initial_retry_delay: Duration::from_millis(1),
..Default::default()
};
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
let tool = Arc::new(MockTool {
name: "retry_tool".to_string(),
should_fail: true,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "retry_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 1,
retry_count: 0,
};
let _result = worker.process_job(job).await.unwrap();
}
#[tokio::test]
async fn test_worker_startup_logging() {
use crate::queue::InMemoryJobQueue;
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let tool = Arc::new(MockTool {
name: "startup_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let queue = Arc::new(InMemoryJobQueue::new());
let worker_clone = worker.clone();
let queue_clone = queue.clone();
let cancellation_token = tokio_util::sync::CancellationToken::new();
let token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
tokio::time::sleep(Duration::from_millis(10)).await;
cancellation_token.cancel();
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn test_timeout_specific_error() {
let config = ExecutionConfig {
default_timeout: Duration::from_millis(1), ..Default::default()
};
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
let tool = Arc::new(SlowMockTool {
name: "timeout_tool".to_string(),
delay: Duration::from_millis(50),
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "timeout_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
match result {
JobResult::Failure { error, .. } => {
assert!(error.to_string().to_lowercase().contains("timed out"));
}
_ => panic!("Expected timeout failure"),
}
}
#[tokio::test]
async fn test_resource_matching_edge_cases() {
let limits = ResourceLimits::default()
.with_limit("solana_rpc", 1)
.with_limit("evm_rpc", 1);
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
)
.with_resource_limits(limits);
let solana_tool = Arc::new(MockTool {
name: "solana_balance".to_string(), should_fail: false,
});
let evm_tool = Arc::new(MockTool {
name: "evm_call".to_string(), should_fail: false,
});
let web_tool = Arc::new(MockTool {
name: "web_fetch".to_string(), should_fail: false,
});
let other_tool = Arc::new(MockTool {
name: "other_operation".to_string(), should_fail: false,
});
worker.register_tool(solana_tool).await;
worker.register_tool(evm_tool).await;
worker.register_tool(web_tool).await;
worker.register_tool(other_tool).await;
let job1 = Job {
job_id: Uuid::new_v4(),
tool_name: "solana_balance".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
let _result = worker.process_job(job1).await.unwrap();
let job2 = Job {
job_id: Uuid::new_v4(),
tool_name: "other_operation".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
let _result = worker.process_job(job2).await.unwrap();
}
#[tokio::test]
async fn test_execution_config_custom_values() {
let config = ExecutionConfig {
max_concurrency: 25,
default_timeout: Duration::from_secs(60),
max_retries: 5,
initial_retry_delay: Duration::from_millis(200),
max_retry_delay: Duration::from_secs(20),
idempotency_ttl: Duration::from_secs(7200),
enable_idempotency: false,
};
assert_eq!(config.max_concurrency, 25);
assert_eq!(config.default_timeout, Duration::from_secs(60));
assert_eq!(config.max_retries, 5);
assert_eq!(config.initial_retry_delay, Duration::from_millis(200));
assert_eq!(config.max_retry_delay, Duration::from_secs(20));
assert_eq!(config.idempotency_ttl, Duration::from_secs(7200));
assert!(!config.enable_idempotency);
}
#[tokio::test]
async fn test_resource_limits_new() {
let limits = ResourceLimits::default();
assert!(limits.get_semaphore("any_resource").is_none());
}
#[tokio::test]
async fn test_resource_limits_with_limit_chaining() {
let limits = ResourceLimits::default()
.with_limit("first", 1)
.with_limit("second", 2)
.with_limit("third", 3);
assert!(limits.get_semaphore("first").is_some());
assert!(limits.get_semaphore("second").is_some());
assert!(limits.get_semaphore("third").is_some());
assert!(limits.get_semaphore("fourth").is_none());
}
#[tokio::test]
async fn test_tool_worker_new_custom_config() {
let config = ExecutionConfig {
max_concurrency: 5,
..Default::default()
};
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
assert_eq!(worker.config.max_concurrency, 5);
}
#[tokio::test]
async fn test_tool_worker_register_tool_replacement() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let tool1 = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool1).await;
assert_eq!(worker.tools.len(), 1);
let tool2 = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: true,
});
worker.register_tool(tool2).await;
assert_eq!(worker.tools.len(), 1);
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
assert!(!result.is_success());
}
#[tokio::test]
async fn test_idempotency_store_error_handling() {
let store = Arc::new(ErrorIdempotencyStore::default());
let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
.with_idempotency_store(store.clone());
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: Some("test_key".to_string()),
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await;
assert!(result.is_err());
assert!(result.unwrap_err().to_string().contains("Store error"));
}
#[tokio::test]
async fn test_no_idempotency_key_no_cache_check() {
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
.with_idempotency_store(store.clone());
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None, max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
assert!(result.is_success());
}
#[tokio::test]
async fn test_no_idempotency_store_no_cache() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: Some("test_key".to_string()),
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
assert!(result.is_success());
}
#[tokio::test]
async fn test_cache_result_with_no_idempotency_key() {
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker = ToolWorker::new(ExecutionConfig::default(), test_app_context())
.with_idempotency_store(store.clone());
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None, max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
assert!(result.is_success());
}
#[tokio::test]
async fn test_cache_result_with_idempotency_disabled() {
let config = ExecutionConfig {
enable_idempotency: false,
..Default::default()
};
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker =
ToolWorker::new(config, test_app_context()).with_idempotency_store(store.clone());
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: Some("test_key".to_string()),
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
assert!(result.is_success());
assert!(store.get("test_key").await.unwrap().is_none());
}
#[tokio::test]
async fn test_cache_result_with_no_store() {
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: Some("test_key".to_string()),
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await.unwrap();
assert!(result.is_success());
}
#[tokio::test]
async fn test_acquire_semaphore_web_prefix() {
let limits = ResourceLimits::default().with_limit("http_api", 1);
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
)
.with_resource_limits(limits);
let _permit = worker.acquire_semaphore("web_fetch").await.unwrap();
}
#[tokio::test]
async fn test_acquire_semaphore_no_matching_resource() {
let limits = ResourceLimits::default();
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
)
.with_resource_limits(limits);
let _permit = worker.acquire_semaphore("solana_test").await.unwrap();
let _permit = worker.acquire_semaphore("random_tool").await.unwrap();
}
#[tokio::test]
async fn test_run_loop_job_processing_success_logging() {
use crate::queue::InMemoryJobQueue;
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let tool = Arc::new(MockTool {
name: "log_test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let queue = Arc::new(InMemoryJobQueue::new());
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "log_test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
queue.enqueue(job).await.unwrap();
let worker_clone = worker.clone();
let queue_clone = queue.clone();
let cancellation_token = tokio_util::sync::CancellationToken::new();
let token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
tokio::time::sleep(Duration::from_millis(50)).await;
cancellation_token.cancel();
handle.await.unwrap().unwrap();
let metrics = worker.metrics();
assert!(
metrics
.jobs_processed
.load(std::sync::atomic::Ordering::Relaxed)
> 0
);
}
#[tokio::test]
async fn test_run_loop_worker_error_logging() {
use crate::queue::InMemoryJobQueue;
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default(),
test_app_context(),
);
let queue = Arc::new(InMemoryJobQueue::new());
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "nonexistent_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
queue.enqueue(job).await.unwrap();
let worker_clone = worker.clone();
let queue_clone = queue.clone();
let cancellation_token = tokio_util::sync::CancellationToken::new();
let token_clone = cancellation_token.clone();
let handle = tokio::spawn(async move { worker_clone.run(queue_clone, token_clone).await });
tokio::time::sleep(Duration::from_millis(50)).await;
cancellation_token.cancel();
handle.await.unwrap().unwrap();
}
#[tokio::test]
async fn test_semaphore_acquisition_error() {
let config = ExecutionConfig {
max_concurrency: 1,
..Default::default()
};
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(config, test_app_context());
let tool = Arc::new(MockTool {
name: "test_tool".to_string(),
should_fail: false,
});
worker.register_tool(tool).await;
let job = Job {
job_id: Uuid::new_v4(),
tool_name: "test_tool".to_string(),
params: serde_json::json!({}),
idempotency_key: None,
max_retries: 0,
retry_count: 0,
};
let result = worker.process_job(job).await;
assert!(result.is_ok());
}
#[test]
fn test_tool_trait_description_method() {
let tool = MockTool {
name: "test".to_string(),
should_fail: false,
};
assert_eq!(tool.description(), "");
}
#[test]
fn test_mock_tool_name_method() {
let tool = MockTool {
name: "test_name".to_string(),
should_fail: false,
};
assert_eq!(tool.name(), "test_name");
}
#[test]
fn test_slow_mock_tool_name_method() {
let tool = SlowMockTool {
name: "slow_test".to_string(),
delay: Duration::from_millis(1),
};
assert_eq!(tool.name(), "slow_test");
assert_eq!(tool.description(), "");
}
#[test]
fn test_error_queue_new() {
let _queue = ErrorQueue::default();
}
#[derive(Default)]
struct ErrorIdempotencyStore {
_phantom: std::marker::PhantomData<()>,
}
#[async_trait]
impl crate::idempotency::IdempotencyStore for ErrorIdempotencyStore {
async fn get(&self, _key: &str) -> anyhow::Result<Option<Arc<JobResult>>> {
Err(anyhow::anyhow!("Store error"))
}
async fn set(
&self,
_key: &str,
_result: Arc<JobResult>,
_ttl: Duration,
) -> anyhow::Result<()> {
Err(anyhow::anyhow!("Store error"))
}
async fn remove(&self, _key: &str) -> anyhow::Result<()> {
Err(anyhow::anyhow!("Store error"))
}
}
struct SlowMockTool {
name: String,
delay: Duration,
}
#[async_trait]
impl Tool for SlowMockTool {
async fn execute(
&self,
_params: serde_json::Value,
_context: &crate::provider::ApplicationContext,
) -> Result<JobResult, ToolError> {
tokio::time::sleep(self.delay).await;
Ok(JobResult::Success {
value: serde_json::json!({"result": "slow_success"}),
tx_hash: None,
})
}
fn name(&self) -> &str {
&self.name
}
fn description(&self) -> &str {
""
}
fn schema(&self) -> serde_json::Value {
serde_json::json!({
"type": "object",
"additionalProperties": true
})
}
}
#[derive(Default)]
struct ErrorQueue {
_phantom: std::marker::PhantomData<()>,
}
#[async_trait]
impl crate::queue::JobQueue for ErrorQueue {
async fn enqueue(&self, _job: crate::jobs::Job) -> anyhow::Result<()> {
Err(anyhow::anyhow!("Queue error"))
}
async fn dequeue(&self) -> anyhow::Result<Option<crate::jobs::Job>> {
Err(anyhow::anyhow!("Dequeue error"))
}
async fn dequeue_with_timeout(
&self,
_timeout: Duration,
) -> anyhow::Result<Option<crate::jobs::Job>> {
Err(anyhow::anyhow!("Dequeue timeout error"))
}
async fn len(&self) -> anyhow::Result<usize> {
Err(anyhow::anyhow!("Len error"))
}
}
}