pub struct ToolWorker<I: IdempotencyStore + 'static> { /* private fields */ }Expand description
A worker that processes jobs from a queue using registered tools.
The ToolWorker is the core execution engine of the riglr system. It manages
registered tools, processes jobs with resilience features, and provides
comprehensive monitoring and observability.
§Key Features
- Resilient execution: Automatic retries with exponential backoff
- Resource management: Configurable limits per resource type
- Idempotency support: Cached results for safe retries
- Comprehensive monitoring: Built-in metrics and structured logging
- Concurrent processing: Efficient parallel job execution
§Architecture
The worker operates on a job-based model where:
- Jobs are dequeued from a
JobQueue - Tools are looked up by name and executed
- Results are processed with retry logic for failures
- Successful results are optionally cached for idempotency
- Metrics are updated for monitoring
§Examples
§Basic Setup
use riglr_core::{
ToolWorker, ExecutionConfig, ResourceLimits,
idempotency::InMemoryIdempotencyStore
};
use std::sync::Arc;
// Create worker with default configuration
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default()
);
// Add idempotency store for safe retries
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker = worker.with_idempotency_store(store);
// Configure resource limits
let limits = ResourceLimits::default()
.with_limit("solana_rpc", 3)
.with_limit("evm_rpc", 5);
let worker = worker.with_resource_limits(limits);§Processing Jobs
use riglr_core::{ToolWorker, Job, Tool, JobResult, ExecutionConfig};
use riglr_core::idempotency::InMemoryIdempotencyStore;
use async_trait::async_trait;
use std::sync::Arc;
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
ExecutionConfig::default()
);
// Register tools
worker.register_tool(Arc::new(ExampleTool)).await;
// Process a job
let job = Job::new("example", &serde_json::json!({}), 3)?;
let result = worker.process_job(job).await.unwrap();
println!("Job result: {:?}", result);
// Check metrics
let metrics = worker.metrics();
println!("Jobs processed: {}",
metrics.jobs_processed.load(std::sync::atomic::Ordering::Relaxed));Implementations§
Source§impl<I: IdempotencyStore + 'static> ToolWorker<I>
impl<I: IdempotencyStore + 'static> ToolWorker<I>
Sourcepub fn new(config: ExecutionConfig, app_context: ApplicationContext) -> Self
pub fn new(config: ExecutionConfig, app_context: ApplicationContext) -> Self
Create a new tool worker with the given configuration and application context.
This creates a worker ready to process jobs, but no tools are registered yet.
Use register_tool() to add tools before processing jobs.
§Parameters
config- Execution configuration controlling retry behavior, timeouts, etc.app_context- Application context providing access to RPC providers and shared resources
§Examples
use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore, provider::ApplicationContext};
use std::time::Duration;
use riglr_config::Config;
let exec_config = ExecutionConfig {
max_concurrency: 20,
default_timeout: Duration::from_secs(60),
max_retries: 5,
..Default::default()
};
let config = Config::from_env();
let app_context = ApplicationContext::from_config(&config);
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(exec_config, app_context);Sourcepub fn with_idempotency_store(self, store: Arc<I>) -> Self
pub fn with_idempotency_store(self, store: Arc<I>) -> Self
Configure an idempotency store for result caching.
When an idempotency store is configured, jobs with idempotency keys will have their results cached. Subsequent executions with the same idempotency key will return the cached result instead of re-executing.
§Parameters
store- The idempotency store implementation to use
§Returns
Self, for method chaining
§Examples
use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
use std::sync::Arc;
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker = ToolWorker::new(ExecutionConfig::default())
.with_idempotency_store(store);Sourcepub fn with_resource_limits(self, limits: ResourceLimits) -> Self
pub fn with_resource_limits(self, limits: ResourceLimits) -> Self
Configure custom resource limits.
Resource limits control how many concurrent operations can run for each resource type. This prevents overwhelming external services and provides fine-grained control over resource usage.
§Parameters
limits- The resource limits configuration to use
§Returns
Self, for method chaining
§Examples
use riglr_core::{ToolWorker, ExecutionConfig, ResourceLimits, idempotency::InMemoryIdempotencyStore};
let limits = ResourceLimits::default()
.with_limit("solana_rpc", 3)
.with_limit("evm_rpc", 8)
.with_limit("http_api", 15);
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default())
.with_resource_limits(limits);Sourcepub async fn register_tool(&self, tool: Arc<dyn Tool>)
pub async fn register_tool(&self, tool: Arc<dyn Tool>)
Register a tool with this worker.
Tools must be registered before they can be executed by jobs. Each tool is indexed by its name, so tool names must be unique within a single worker.
§Parameters
tool- The tool implementation to register
§Panics
This method does not panic, but if a tool with the same name is already registered, it will be replaced.
§Examples
use riglr_core::{ToolWorker, ExecutionConfig, Tool, JobResult, idempotency::InMemoryIdempotencyStore};
use async_trait::async_trait;
use std::sync::Arc;
struct CalculatorTool;
#[async_trait]
impl Tool for CalculatorTool {
async fn execute(&self, params: serde_json::Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
let a = params["a"].as_f64().unwrap_or(0.0);
let b = params["b"].as_f64().unwrap_or(0.0);
Ok(JobResult::success(&(a + b))?)
}
fn name(&self) -> &str {
"calculator"
}
}
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default());
worker.register_tool(Arc::new(CalculatorTool)).await;Sourcepub fn metrics(&self) -> &WorkerMetrics
pub fn metrics(&self) -> &WorkerMetrics
Get access to worker metrics.
The returned metrics can be used for monitoring worker performance and health. All metrics are thread-safe and can be read at any time.
§Returns
A reference to the worker’s metrics
§Examples
use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
use std::sync::atomic::Ordering;
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default());
let metrics = worker.metrics();
println!("Jobs processed: {}",
metrics.jobs_processed.load(Ordering::Relaxed));
println!("Success rate: {:.2}%", {
let processed = metrics.jobs_processed.load(Ordering::Relaxed);
let succeeded = metrics.jobs_succeeded.load(Ordering::Relaxed);
if processed > 0 { (succeeded as f64 / processed as f64) * 100.0 } else { 0.0 }
});Sourcepub async fn process_job(&self, job: Job) -> Result<JobResult, WorkerError>
pub async fn process_job(&self, job: Job) -> Result<JobResult, WorkerError>
Process a single job with all resilience features.
Returns:
Ok(JobResult)- The job was processed (successfully or with business logic failure)Err(WorkerError)- System-level worker failure (tool not found, semaphore issues, etc.)
Sourcepub async fn run<Q: JobQueue>(
&self,
queue: Arc<Q>,
cancellation_token: CancellationToken,
) -> Result<(), Box<dyn Error + Send + Sync>>
pub async fn run<Q: JobQueue>( &self, queue: Arc<Q>, cancellation_token: CancellationToken, ) -> Result<(), Box<dyn Error + Send + Sync>>
Start the worker loop, processing jobs from the given queue.
The worker will continue processing jobs until the provided cancellation token is cancelled. This allows for graceful shutdown where in-flight jobs can complete before the worker stops.
§Parameters
queue- The job queue to process jobs fromcancellation_token- Token to signal when the worker should stop
§Examples
use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
use riglr_core::provider::ApplicationContext;
use riglr_core::queue::InMemoryJobQueue;
use riglr_config::ConfigBuilder;
use tokio_util::sync::CancellationToken;
use std::sync::Arc;
let config = ConfigBuilder::default().build().unwrap();
let app_context = ApplicationContext::from_config(&config);
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default(), app_context);
let queue = Arc::new(InMemoryJobQueue::new());
let cancellation_token = CancellationToken::new();
// Start worker in background
let token_clone = cancellation_token.clone();
let worker_handle = tokio::spawn(async move {
worker.run(queue, token_clone).await
});
// Later, signal shutdown
cancellation_token.cancel();
// Await the task and ignore the inner result for simplicity in docs
let _ = worker_handle.await;