# rrq-runner
[](https://crates.io/crates/rrq-runner)
[](https://docs.rs/rrq-runner)
[](LICENSE)
**Rust runtime for building RRQ job handlers.** Write your job handlers in Rust and let this crate handle the socket protocol, connection management, and job dispatching.
## What is RRQ?
RRQ (Reliable Redis Queue) is a distributed job queue with a Rust orchestrator and language-flexible workers. This crate lets you build high-performance job handlers in Rust that connect to the RRQ orchestrator.
**Why Rust runners?**
- **Maximum performance** - Native code for CPU-intensive tasks
- **Memory safety** - No GC pauses, predictable latency
- **Async native** - Built on Tokio for efficient concurrency
- **Same ecosystem** - Use Rust crates in your job handlers
## Installation
```toml
[dependencies]
rrq-runner = "0.9"
```
With OpenTelemetry:
```toml
[dependencies]
rrq-runner = { version = "0.9", features = ["otel"] }
```
## Quick Start
```rust
use rrq_runner::{Registry, run_tcp, parse_tcp_socket, ExecutionOutcome};
use serde_json::json;
fn main() -> Result<(), Box<dyn std::error::Error>> {
let mut registry = Registry::new();
registry.register("greet", |request| async move {
let name = request.params.get("name")
.and_then(|v| v.as_str())
.unwrap_or("World");
ExecutionOutcome::success(
request.job_id.clone(),
request.request_id.clone(),
json!({ "message": format!("Hello, {}!", name) }),
)
});
let args: Vec<String> = std::env::args().skip(1).collect();
let tcp_socket = args
.iter()
.position(|arg| arg == "--tcp-socket")
.and_then(|idx| args.get(idx + 1))
.ok_or("Missing --tcp-socket")?;
let addr = tcp_socket.as_str();
run_tcp(®istry, parse_tcp_socket(&addr)?)
}
```
## Handler Functions
Handlers receive an `ExecutionRequest` and return an `ExecutionOutcome`:
```rust
registry.register("process_order", |request| async move {
// Access job metadata
println!("Job: {}", request.job_id);
println!("Attempt: {}", request.context.attempt);
// Access parameters
let order_id = request.params.get("order_id")
.and_then(|v| v.as_str())
.ok_or("missing order_id")?;
// Do work...
ExecutionOutcome::success(
request.job_id.clone(),
request.request_id.clone(),
json!({ "processed": order_id }),
)
});
```
## Outcome Types
```rust
// Success
ExecutionOutcome::success(job_id, request_id, json!({"result": "ok"}))
// Failure (may be retried)
ExecutionOutcome::failure(job_id, request_id, "Something went wrong".to_string())
// Retry after delay
ExecutionOutcome::retry_after(job_id, request_id, "Rate limited".to_string(), 60)
// Timeout
ExecutionOutcome::timeout(job_id, request_id)
// Cancelled
ExecutionOutcome::cancelled(job_id, request_id)
```
## OpenTelemetry
```rust
use rrq_runner::{RunnerRuntime, Registry, parse_tcp_socket};
use rrq_runner::telemetry::otel::{init_tracing, OtelTelemetry};
fn main() -> Result<(), Box<dyn std::error::Error>> {
let runtime = RunnerRuntime::new()?;
// Initialize tracing
{ let _guard = runtime.enter(); init_tracing("my-runner")?; }
let mut registry = Registry::new();
registry.register("traced_job", |req| async move {
ExecutionOutcome::success(req.job_id.clone(), req.request_id.clone(), json!({}))
});
let args: Vec<String> = std::env::args().skip(1).collect();
let tcp_socket = args
.iter()
.position(|arg| arg == "--tcp-socket")
.and_then(|idx| args.get(idx + 1))
.ok_or("Missing --tcp-socket")?;
runtime.run_tcp_with(®istry, parse_tcp_socket(tcp_socket)?, &OtelTelemetry)
}
```
Configure via:
- `OTEL_EXPORTER_OTLP_TRACES_ENDPOINT` - Traces endpoint (highest precedence)
- `OTEL_EXPORTER_OTLP_METRICS_ENDPOINT` - Metrics endpoint (highest precedence)
- `OTEL_EXPORTER_OTLP_LOGS_ENDPOINT` - Logs endpoint (highest precedence)
- `OTEL_EXPORTER_OTLP_ENDPOINT` - Fallback base OTLP/HTTP endpoint (for example `http://127.0.0.1:4318`)
- `OTEL_EXPORTER_OTLP_HEADERS` and `OTEL_EXPORTER_OTLP_{TRACES|METRICS|LOGS}_HEADERS` - Comma-separated `key=value` headers
- `OTEL_SERVICE_NAME` - Service name
Endpoint resolution rules:
- Signal-specific endpoint vars take precedence over `OTEL_EXPORTER_OTLP_ENDPOINT`
- If a signal-specific endpoint is unset, RRQ falls back to `OTEL_EXPORTER_OTLP_ENDPOINT` and appends `/v1/{traces|metrics|logs}`
- If a signal-specific endpoint is explicitly set to an empty value, that signal is disabled (no fallback)
## Configuration
Add your runner to `rrq.toml`:
```toml
[rrq.runners.rust]
type = "socket"
cmd = ["./target/release/my-runner"]
tcp_socket = "127.0.0.1:9000"
pool_size = 4
max_in_flight = 10
```
## Related Crates
| [`rrq`](https://crates.io/crates/rrq) | Orchestrator |
| [`rrq-producer`](https://crates.io/crates/rrq-producer) | Enqueue jobs |
| [`rrq-protocol`](https://crates.io/crates/rrq-protocol) | Wire protocol |
## License
Apache-2.0