rrq-runner

A Rust runtime for building RRQ job runners. This crate handles the socket protocol, connection management, and job dispatching—you just implement your handlers.
Features
- TCP socket support for orchestrator communication
- Concurrent job execution with configurable parallelism
- Graceful cancellation of in-flight jobs
- OpenTelemetry integration for distributed tracing (optional)
- Handler registry for routing jobs to functions
- Async/await native with Tokio runtime
Installation
[dependencies]
rrq-runner = "0.9"
With OpenTelemetry tracing:
[dependencies]
rrq-runner = { version = "0.9", features = ["otel"] }
Quick Start
Create a simple runner with one handler:
use rrq_runner::{parse_tcp_socket, run_tcp, ExecutionOutcome, Registry};
use serde_json::json;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut registry = Registry::new();
registry.register("greet", |request| async move {
let name = request.args.get(0)
.and_then(|v| v.as_str())
.unwrap_or("World");
ExecutionOutcome::success(
request.job_id.clone(),
request.request_id.clone(),
json!({ "message": format!("Hello, {}!", name) }),
)
});
let addr = std::env::var("RRQ_RUNNER_TCP_SOCKET")?;
let socket_addr = parse_tcp_socket(&addr)?;
run_tcp(®istry, socket_addr)
}
Handler Functions
Handlers receive an ExecutionRequest and return an ExecutionOutcome:
use rrq_runner::{ExecutionOutcome, Registry};
use rrq_protocol::ExecutionRequest;
let mut registry = Registry::new();
registry.register("process_order", |request: ExecutionRequest| async move {
println!("Job ID: {}", request.job_id);
println!("Attempt: {}", request.context.attempt);
println!("Queue: {}", request.context.queue_name);
let order_id = request.args.get(0)
.and_then(|v| v.as_str())
.ok_or("missing order_id")?;
let priority = request.kwargs.get("priority")
.and_then(|v| v.as_str())
.unwrap_or("normal");
ExecutionOutcome::success(
request.job_id.clone(),
request.request_id.clone(),
serde_json::json!({ "processed": order_id }),
)
});
Outcome Types
Return different outcomes based on execution result:
use rrq_runner::ExecutionOutcome;
ExecutionOutcome::success(job_id, request_id, json!({"result": "ok"}))
ExecutionOutcome::failure(job_id, request_id, "Something went wrong".to_string())
ExecutionOutcome::retry_after(job_id, request_id, "Rate limited".to_string(), 60)
ExecutionOutcome::timeout(job_id, request_id)
ExecutionOutcome::cancelled(job_id, request_id)
Custom Runtime
For more control, create your own RunnerRuntime:
use rrq_runner::{RunnerRuntime, Registry, ENV_RUNNER_TCP_SOCKET, parse_tcp_socket};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = RunnerRuntime::new()?;
let mut registry = Registry::new();
registry.register("echo", |req| async move {
ExecutionOutcome::success(
req.job_id.clone(),
req.request_id.clone(),
serde_json::json!(req.args),
)
});
let addr = std::env::var(ENV_RUNNER_TCP_SOCKET)?;
let socket_addr = parse_tcp_socket(&addr)?;
runtime.run_tcp(®istry, socket_addr)
}
OpenTelemetry Tracing
Enable the otel feature for distributed tracing:
use rrq_runner::{RunnerRuntime, Registry, ENV_RUNNER_TCP_SOCKET, parse_tcp_socket};
use rrq_runner::telemetry::otel::{init_tracing, OtelTelemetry};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = RunnerRuntime::new()?;
{
let _guard = runtime.enter();
init_tracing("my-runner")?;
}
let telemetry = OtelTelemetry;
let mut registry = Registry::new();
registry.register("traced_handler", |request| async move {
ExecutionOutcome::success(
request.job_id.clone(),
request.request_id.clone(),
serde_json::json!({"traced": true}),
)
});
let addr = std::env::var(ENV_RUNNER_TCP_SOCKET)?;
let socket_addr = parse_tcp_socket(&addr)?;
runtime.run_tcp_with(®istry, socket_addr, &telemetry)
}
Configure via standard OpenTelemetry environment variables:
OTEL_EXPORTER_OTLP_ENDPOINT - OTLP collector endpoint
OTEL_SERVICE_NAME - Service name for traces
Configuration in rrq.toml
Point the RRQ orchestrator to your runner:
[rrq.runners.rust]
type = "socket"
cmd = ["./target/release/my-runner"]
tcp_socket = "127.0.0.1:9000"
pool_size = 4
max_in_flight = 10
Environment Variables
Set by the orchestrator when spawning runners:
| Variable |
Description |
RRQ_RUNNER_TCP_SOCKET |
TCP socket address for communication |
Example Project Structure
my-runner/
├── Cargo.toml
└── src/
└── main.rs
[package]
name = "my-runner"
version = "0.1.0"
edition = "2021"
[[bin]]
name = "my-runner"
[dependencies]
rrq-runner = "0.9"
serde_json = "1.0"
Related Crates
License
Apache-2.0