Expand description
Core tool trait and execution infrastructure. Tool orchestration and execution framework.
Provides the ToolWorker abstraction for coordinating tool execution with proper error handling, logging, and integration with rig agents.
This module provides the core abstractions for executing tools in a resilient, asynchronous manner with support for retries, timeouts, and job queuing.
§Overview
The tool execution system is designed around several key components:
Tool- The core trait defining executable toolsToolWorker- The execution engine that processes jobs using registered toolsExecutionConfig- Configuration for retry behavior, timeouts, and resource limitsResourceLimits- Fine-grained control over resource usage per tool typeWorkerMetrics- Performance and operational metrics for monitoring
§Key Features
§Resilient Execution
- Exponential backoff retry logic with configurable delays
- Timeout protection to prevent hanging operations
- Error classification to distinguish retriable vs permanent failures
- Idempotency support to safely retry operations
§Resource Management
- Semaphore-based limits for different resource types (RPC calls, HTTP requests)
- Concurrent execution with configurable limits per resource
- Tool-specific resource mapping based on naming patterns
§Monitoring and Observability
- Comprehensive metrics tracking success/failure rates
- Structured logging with correlation IDs
- Performance monitoring with execution timings
§Examples
§Basic Tool Implementation
ⓘ
use riglr_core::{Tool, JobResult};
use async_trait::async_trait;
use serde_json::Value;
struct SimpleCalculator;
#[async_trait]
impl Tool for SimpleCalculator {
async fn execute(&self, params: Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
let a = params["a"].as_f64().ok_or("Missing parameter 'a'")?;
let b = params["b"].as_f64().ok_or("Missing parameter 'b'")?;
let operation = params["op"].as_str().unwrap_or("add");
let result = match operation {
"add" => a + b,
"multiply" => a * b,
"divide" => {
if b == 0.0 {
return Err(ToolError::permanent_string("Division by zero", "Cannot divide by zero"));
}
a / b
}
_ => return Err(ToolError::permanent_string("Unknown operation", "Operation not supported")),
};
Ok(JobResult::success(&result)?)
}
fn name(&self) -> &str {
"calculator"
}
}§Worker Setup and Execution
ⓘ
use riglr_core::{
ToolWorker, ExecutionConfig, ResourceLimits, Job,
idempotency::InMemoryIdempotencyStore
};
use std::{sync::Arc, time::Duration};
// Configure execution behavior
let config = ExecutionConfig {
max_concurrency: 20,
default_timeout: Duration::from_secs(60),
max_retries: 3,
initial_retry_delay: Duration::from_millis(100),
max_retry_delay: Duration::from_secs(30),
enable_idempotency: true,
..Default::default()
};
// Set up resource limits
let limits = ResourceLimits::default()
.with_limit("solana_rpc", 5) // Limit Solana RPC calls
.with_limit("evm_rpc", 10) // Limit EVM RPC calls
.with_limit("http_api", 20); // Limit HTTP API calls
// Create worker with idempotency store
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker = ToolWorker::new(config)
.with_idempotency_store(store)
.with_resource_limits(limits);
// Register tools
worker.register_tool(Arc::new(SimpleCalculator)).await;
// Process a job
let job = Job::new_idempotent(
"calculator",
&serde_json::json!({"a": 10, "b": 5, "op": "add"}),
3, // max retries
"calc_10_plus_5" // idempotency key
)?;
let result = worker.process_job(job).await.unwrap();
println!("Result: {:?}", result);
// Get metrics
let metrics = worker.metrics();
println!("Jobs processed: {}", metrics.jobs_processed.load(std::sync::atomic::Ordering::Relaxed));Structs§
- Execution
Config - Configuration for tool execution behavior.
- Resource
Limits - Resource limits configuration for fine-grained concurrency control.
- Tool
Worker - A worker that processes jobs from a queue using registered tools.
- Worker
Metrics - Performance and operational metrics for monitoring worker health.
Traits§
- Tool
- A trait defining the execution interface for tools.