ToolWorker

Struct ToolWorker 

Source
pub struct ToolWorker<I: IdempotencyStore + 'static> { /* private fields */ }
Expand description

A worker that processes jobs from a queue using registered tools.

The ToolWorker is the core execution engine of the riglr system. It manages registered tools, processes jobs with resilience features, and provides comprehensive monitoring and observability.

§Key Features

  • Resilient execution: Automatic retries with exponential backoff
  • Resource management: Configurable limits per resource type
  • Idempotency support: Cached results for safe retries
  • Comprehensive monitoring: Built-in metrics and structured logging
  • Concurrent processing: Efficient parallel job execution

§Architecture

The worker operates on a job-based model where:

  1. Jobs are dequeued from a JobQueue
  2. Tools are looked up by name and executed
  3. Results are processed with retry logic for failures
  4. Successful results are optionally cached for idempotency
  5. Metrics are updated for monitoring

§Examples

§Basic Setup

use riglr_core::{
    ToolWorker, ExecutionConfig, ResourceLimits,
    idempotency::InMemoryIdempotencyStore
};
use std::sync::Arc;

// Create worker with default configuration
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
    ExecutionConfig::default()
);

// Add idempotency store for safe retries
let store = Arc::new(InMemoryIdempotencyStore::new());
let worker = worker.with_idempotency_store(store);

// Configure resource limits
let limits = ResourceLimits::default()
    .with_limit("solana_rpc", 3)
    .with_limit("evm_rpc", 5);
let worker = worker.with_resource_limits(limits);

§Processing Jobs

use riglr_core::{ToolWorker, Job, Tool, JobResult, ExecutionConfig};
use riglr_core::idempotency::InMemoryIdempotencyStore;
use async_trait::async_trait;
use std::sync::Arc;

let worker = ToolWorker::<InMemoryIdempotencyStore>::new(
    ExecutionConfig::default()
);

// Register tools
worker.register_tool(Arc::new(ExampleTool)).await;

// Process a job
let job = Job::new("example", &serde_json::json!({}), 3)?;
let result = worker.process_job(job).await.unwrap();

println!("Job result: {:?}", result);

// Check metrics
let metrics = worker.metrics();
println!("Jobs processed: {}",
    metrics.jobs_processed.load(std::sync::atomic::Ordering::Relaxed));

Implementations§

Source§

impl<I: IdempotencyStore + 'static> ToolWorker<I>

Source

pub fn new(config: ExecutionConfig, app_context: ApplicationContext) -> Self

Create a new tool worker with the given configuration and application context.

This creates a worker ready to process jobs, but no tools are registered yet. Use register_tool() to add tools before processing jobs.

§Parameters
  • config - Execution configuration controlling retry behavior, timeouts, etc.
  • app_context - Application context providing access to RPC providers and shared resources
§Examples
use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore, provider::ApplicationContext};
use std::time::Duration;
use riglr_config::Config;

let exec_config = ExecutionConfig {
    max_concurrency: 20,
    default_timeout: Duration::from_secs(60),
    max_retries: 5,
    ..Default::default()
};
let config = Config::from_env();
let app_context = ApplicationContext::from_config(&config);

let worker = ToolWorker::<InMemoryIdempotencyStore>::new(exec_config, app_context);
Source

pub fn with_idempotency_store(self, store: Arc<I>) -> Self

Configure an idempotency store for result caching.

When an idempotency store is configured, jobs with idempotency keys will have their results cached. Subsequent executions with the same idempotency key will return the cached result instead of re-executing.

§Parameters
  • store - The idempotency store implementation to use
§Returns

Self, for method chaining

§Examples
use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
use std::sync::Arc;

let store = Arc::new(InMemoryIdempotencyStore::new());
let worker = ToolWorker::new(ExecutionConfig::default())
    .with_idempotency_store(store);
Source

pub fn with_resource_limits(self, limits: ResourceLimits) -> Self

Configure custom resource limits.

Resource limits control how many concurrent operations can run for each resource type. This prevents overwhelming external services and provides fine-grained control over resource usage.

§Parameters
  • limits - The resource limits configuration to use
§Returns

Self, for method chaining

§Examples
use riglr_core::{ToolWorker, ExecutionConfig, ResourceLimits, idempotency::InMemoryIdempotencyStore};

let limits = ResourceLimits::default()
    .with_limit("solana_rpc", 3)
    .with_limit("evm_rpc", 8)
    .with_limit("http_api", 15);

let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default())
    .with_resource_limits(limits);
Source

pub async fn register_tool(&self, tool: Arc<dyn Tool>)

Register a tool with this worker.

Tools must be registered before they can be executed by jobs. Each tool is indexed by its name, so tool names must be unique within a single worker.

§Parameters
  • tool - The tool implementation to register
§Panics

This method does not panic, but if a tool with the same name is already registered, it will be replaced.

§Examples
use riglr_core::{ToolWorker, ExecutionConfig, Tool, JobResult, idempotency::InMemoryIdempotencyStore};
use async_trait::async_trait;
use std::sync::Arc;

struct CalculatorTool;

#[async_trait]
impl Tool for CalculatorTool {
    async fn execute(&self, params: serde_json::Value, _context: &ApplicationContext) -> Result<JobResult, ToolError> {
        let a = params["a"].as_f64().unwrap_or(0.0);
        let b = params["b"].as_f64().unwrap_or(0.0);
        Ok(JobResult::success(&(a + b))?)
    }

    fn name(&self) -> &str {
        "calculator"
    }
}

let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default());
worker.register_tool(Arc::new(CalculatorTool)).await;
Source

pub fn metrics(&self) -> &WorkerMetrics

Get access to worker metrics.

The returned metrics can be used for monitoring worker performance and health. All metrics are thread-safe and can be read at any time.

§Returns

A reference to the worker’s metrics

§Examples
use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
use std::sync::atomic::Ordering;

let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default());
let metrics = worker.metrics();

println!("Jobs processed: {}",
    metrics.jobs_processed.load(Ordering::Relaxed));
println!("Success rate: {:.2}%", {
    let processed = metrics.jobs_processed.load(Ordering::Relaxed);
    let succeeded = metrics.jobs_succeeded.load(Ordering::Relaxed);
    if processed > 0 { (succeeded as f64 / processed as f64) * 100.0 } else { 0.0 }
});
Source

pub async fn process_job(&self, job: Job) -> Result<JobResult, WorkerError>

Process a single job with all resilience features.

Returns:

  • Ok(JobResult) - The job was processed (successfully or with business logic failure)
  • Err(WorkerError) - System-level worker failure (tool not found, semaphore issues, etc.)
Source

pub async fn run<Q: JobQueue>( &self, queue: Arc<Q>, cancellation_token: CancellationToken, ) -> Result<(), Box<dyn Error + Send + Sync>>

Start the worker loop, processing jobs from the given queue.

The worker will continue processing jobs until the provided cancellation token is cancelled. This allows for graceful shutdown where in-flight jobs can complete before the worker stops.

§Parameters
  • queue - The job queue to process jobs from
  • cancellation_token - Token to signal when the worker should stop
§Examples
use riglr_core::{ToolWorker, ExecutionConfig, idempotency::InMemoryIdempotencyStore};
use riglr_core::provider::ApplicationContext;
use riglr_core::queue::InMemoryJobQueue;
use riglr_config::ConfigBuilder;
use tokio_util::sync::CancellationToken;
use std::sync::Arc;

let config = ConfigBuilder::default().build().unwrap();
let app_context = ApplicationContext::from_config(&config);
let worker = ToolWorker::<InMemoryIdempotencyStore>::new(ExecutionConfig::default(), app_context);
let queue = Arc::new(InMemoryJobQueue::new());
let cancellation_token = CancellationToken::new();

// Start worker in background
let token_clone = cancellation_token.clone();
let worker_handle = tokio::spawn(async move {
    worker.run(queue, token_clone).await
});

// Later, signal shutdown
cancellation_token.cancel();
// Await the task and ignore the inner result for simplicity in docs
let _ = worker_handle.await;

Trait Implementations§

Source§

impl<I: IdempotencyStore + 'static> Clone for ToolWorker<I>

Source§

fn clone(&self) -> Self

Returns a duplicate of the value. Read more
1.0.0 · Source§

fn clone_from(&mut self, source: &Self)

Performs copy-assignment from source. Read more
Source§

impl<I: IdempotencyStore + 'static> Debug for ToolWorker<I>

Source§

fn fmt(&self, f: &mut Formatter<'_>) -> Result

Formats the value using the given formatter. Read more

Auto Trait Implementations§

§

impl<I> Freeze for ToolWorker<I>

§

impl<I> !RefUnwindSafe for ToolWorker<I>

§

impl<I> Send for ToolWorker<I>

§

impl<I> Sync for ToolWorker<I>

§

impl<I> Unpin for ToolWorker<I>

§

impl<I> !UnwindSafe for ToolWorker<I>

Blanket Implementations§

Source§

impl<T> Any for T
where T: 'static + ?Sized,

Source§

fn type_id(&self) -> TypeId

Gets the TypeId of self. Read more
Source§

impl<T> Borrow<T> for T
where T: ?Sized,

Source§

fn borrow(&self) -> &T

Immutably borrows from an owned value. Read more
Source§

impl<T> BorrowMut<T> for T
where T: ?Sized,

Source§

fn borrow_mut(&mut self) -> &mut T

Mutably borrows from an owned value. Read more
Source§

impl<T> CloneToUninit for T
where T: Clone,

Source§

unsafe fn clone_to_uninit(&self, dest: *mut u8)

🔬This is a nightly-only experimental API. (clone_to_uninit)
Performs copy-assignment from self to dest. Read more
Source§

impl<T> From<T> for T

Source§

fn from(t: T) -> T

Returns the argument unchanged.

Source§

impl<T> Instrument for T

Source§

fn instrument(self, span: Span) -> Instrumented<Self>

Instruments this type with the provided Span, returning an Instrumented wrapper. Read more
Source§

fn in_current_span(self) -> Instrumented<Self>

Instruments this type with the current Span, returning an Instrumented wrapper. Read more
Source§

impl<T, U> Into<U> for T
where U: From<T>,

Source§

fn into(self) -> U

Calls U::from(self).

That is, this conversion is whatever the implementation of From<T> for U chooses to do.

Source§

impl<T> ToOwned for T
where T: Clone,

Source§

type Owned = T

The resulting type after obtaining ownership.
Source§

fn to_owned(&self) -> T

Creates owned data from borrowed data, usually by cloning. Read more
Source§

fn clone_into(&self, target: &mut T)

Uses borrowed data to replace owned data, usually by cloning. Read more
Source§

impl<T, U> TryFrom<U> for T
where U: Into<T>,

Source§

type Error = Infallible

The type returned in the event of a conversion error.
Source§

fn try_from(value: U) -> Result<T, <T as TryFrom<U>>::Error>

Performs the conversion.
Source§

impl<T, U> TryInto<U> for T
where U: TryFrom<T>,

Source§

type Error = <U as TryFrom<T>>::Error

The type returned in the event of a conversion error.
Source§

fn try_into(self) -> Result<U, <U as TryFrom<T>>::Error>

Performs the conversion.
Source§

impl<V, T> VZip<V> for T
where V: MultiLane<T>,

Source§

fn vzip(self) -> V

Source§

impl<T> WithSubscriber for T

Source§

fn with_subscriber<S>(self, subscriber: S) -> WithDispatch<Self>
where S: Into<Dispatch>,

Attaches the provided Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

fn with_current_subscriber(self) -> WithDispatch<Self>

Attaches the current default Subscriber to this type, returning a WithDispatch wrapper. Read more
Source§

impl<T> ErasedDestructor for T
where T: 'static,