Expand description
§tokio-prompt-orchestrator
A production-grade orchestrator for multi-stage LLM inference pipelines over Tokio.
Provides a five-stage directed pipeline with bounded MPSC channels, backpressure, request deduplication, circuit breakers, retry logic, rate limiting, Prometheus metrics, OpenTelemetry tracing, and an optional autonomous self-improving control loop.
§Architecture
Five-stage pipeline with bounded channels and backpressure:
PromptRequest -> RAG(512) -> Assemble(512) -> Inference(1024) -> Post(512) -> Stream(256)Each stage runs as an independent tokio::task. When a downstream channel
fills, send_with_shed drops the incoming item and records it in the
DeadLetterQueue rather than blocking the upstream stage.
§Quick Start
use std::collections::HashMap;
use std::sync::Arc;
use tokio_prompt_orchestrator::{spawn_pipeline, EchoWorker, PromptRequest, SessionId, ModelWorker};
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let worker: Arc<dyn ModelWorker> = Arc::new(EchoWorker::new());
let handles = spawn_pipeline(worker);
handles.input_tx.send(PromptRequest {
session: SessionId::new("demo"),
request_id: "req-1".to_string(),
input: "Hello, pipeline!".to_string(),
meta: HashMap::new(),
deadline: None,
}).await?;
let mut guard = handles.output_rx.lock().await;
if let Some(rx) = guard.as_mut() {
if let Some(output) = rx.recv().await {
println!("{}", output.text);
}
}
Ok(())
}§Module Organisation
| Module | Description |
|---|---|
stages | Five pipeline stage implementations and channel wiring |
worker | ModelWorker trait and five production implementations |
enhanced | Resilience primitives: circuit breaker, dedup, retry, cache, rate limiter |
metrics | Prometheus metrics initialisation and helper functions |
config | TOML-deserialisable PipelineConfig with hot-reload support |
routing | ModelRouter for complexity-scored local/cloud routing |
coordination | Agent fleet management and task claiming |
self_tune | PID controllers and telemetry bus (feature: self-tune) |
self_modify | Task generation and validation gate (feature: self-modify) |
intelligence | Learned router and autoscaler (feature: intelligence) |
evolution | A/B experiments and snapshot rollback (feature: evolution) |
self_improve | Wired self-improving loop (features: self-tune+self-modify+intelligence) |
web_api | REST/SSE/WebSocket server (feature: web-api) |
distributed | Redis dedup and NATS coordination (feature: distributed) |
tui | Ratatui terminal dashboard (feature: tui) |
§tokio-prompt-orchestrator
Multi-core, Tokio-native orchestration for LLM inference pipelines. Built with bounded backpressure channels, circuit breakers, request deduplication, and an optional autonomous self-tuning control loop.
§Architecture
The pipeline is a five-stage directed acyclic graph of bounded async channels. Each stage runs as an independent Tokio task. Backpressure propagates upstream when a downstream channel fills, and excess requests are shed gracefully to a dead-letter queue rather than blocking the pipeline.
[Prompt Submissions] -> [Dedup Stage] -> [Circuit Breaker] -> [Rate Limiter]
|
[Worker Pool (Tokio)]
|
[LLM Providers (Anthropic/OpenAI)]
|
[Self-Improving Control Loop]
|
[Prometheus Metrics] <- [OpenTelemetry] <- [Results]
[Web API (HTTP/WS)] +------------------+
PromptRequest ------> | RAG Stage | cap: 512
| (context fetch) |
+--------+---------+
|
+--------v---------+
| Assemble Stage | cap: 512
| (prompt builder) |
+--------+---------+
|
+--------v---------+
| Inference Stage | cap: 1024
| (model worker) |
+--------+---------+
|
+--------v---------+
| Post Stage | cap: 512
| (filter/format) |
+--------+---------+
|
+--------v---------+
| Stream Stage | cap: 256
| (output sink) |
+------------------+Resilience layers applied at the inference stage:
- Deduplication: in-flight requests with identical prompts are coalesced into a single inference call; all waiting callers receive the same result.
- Circuit breaker: opens on consecutive failures, enters half-open probe mode after a configurable timeout.
- Retry with exponential backoff and jitter.
- Rate limiter: token-bucket guard at the pipeline entry point.
- Dead-letter queue: shed requests are stored in a ring buffer for inspection and replay.
§Quickstart
§Prebuilt binary (no Rust required)
Download orchestrator.exe from the releases page and run it. The first launch runs an interactive setup wizard that saves your API key and provider preference to orchestrator.env.
Which AI provider do you want to use?
1) Anthropic (Claude)
2) OpenAI (GPT-4o)
3) llama.cpp (local, no key)
4) echo (offline test mode)
Enter 1, 2, 3, or 4 [1]:After setup the orchestrator starts a terminal REPL and a web API on http://127.0.0.1:8080 simultaneously. You can type prompts in the terminal while agents and IDEs connect over HTTP in the background.
§Library usage
Add the dependency:
[dependencies]
tokio-prompt-orchestrator = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }Minimal async example using the echo worker (no API key required):
use std::collections::HashMap;
use tokio_prompt_orchestrator::{
spawn_pipeline, EchoWorker, PromptRequest, SessionId,
};
use std::sync::Arc;
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
// Spawn the five-stage pipeline with the echo worker.
// Swap EchoWorker for OpenAiWorker, AnthropicWorker, LlamaCppWorker, or VllmWorker.
let worker: Arc<dyn tokio_prompt_orchestrator::ModelWorker> = Arc::new(EchoWorker::new());
let handles = spawn_pipeline(worker);
// Send a prompt into the pipeline.
handles
.input_tx
.send(PromptRequest {
session: SessionId::new("demo"),
request_id: "req-1".to_string(),
input: "Hello, pipeline!".to_string(),
meta: HashMap::new(),
deadline: None,
})
.await?;
// Collect output from the stream stage.
let mut guard = handles.output_rx.lock().await;
if let Some(rx) = guard.as_mut() {
if let Some(output) = rx.recv().await {
println!("Response: {}", output.text);
}
}
Ok(())
}See examples/ for REST, WebSocket, SSE streaming, OpenAI, Anthropic, llama.cpp, and multi-worker round-robin setups.
§API Overview
| Type | Module | Description |
|---|---|---|
PromptRequest | lib | Input message sent into the pipeline |
SessionId | lib | Session identifier for affinity sharding |
OrchestratorError | lib | Crate-level error enum |
ModelWorker | worker | Async trait implemented by all inference backends |
EchoWorker | worker | Returns prompt words as tokens; for testing |
OpenAiWorker | worker | OpenAI chat completions API |
AnthropicWorker | worker | Anthropic Messages API |
LlamaCppWorker | worker | Local llama.cpp HTTP server |
VllmWorker | worker | vLLM inference server |
LoadBalancedWorker | worker | Round-robin or least-loaded pool of workers |
spawn_pipeline | stages | Launch the five-stage pipeline, return channel handles |
spawn_pipeline_with_config | stages | Same, with a full PipelineConfig |
PipelineConfig | config | TOML-deserialisable root configuration type |
CircuitBreaker | enhanced | Failure-rate circuit breaker |
Deduplicator | enhanced | In-flight request coalescer |
RetryPolicy | enhanced | Exponential backoff with jitter |
CacheLayer | enhanced | TTL LRU cache for inference results |
PriorityQueue | enhanced | Four-level priority scheduler |
DeadLetterQueue | lib | Ring buffer of shed requests |
send_with_shed | lib | Non-blocking channel send with graceful shedding |
shard_session | lib | FNV-1a session affinity shard helper |
§Configuration Reference
The orchestrator is configured via a TOML file. Pass it with --config pipeline.toml.
[pipeline]
name = "production"
version = "1.0"
description = "Optional description"
[stages.rag]
enabled = true
timeout_ms = 5000
max_context_tokens = 2048
[stages.assemble]
enabled = true
channel_capacity = 512
[stages.inference]
worker = "open_ai" # open_ai | anthropic | llama_cpp | vllm | echo
model = "gpt-4o"
max_tokens = 1024
temperature = 0.7
timeout_ms = 30000
[stages.post_process]
enabled = true
[stages.stream]
enabled = true
[resilience]
retry_attempts = 3
retry_base_ms = 100
retry_max_ms = 5000
circuit_breaker_threshold = 5
circuit_breaker_timeout_s = 60
circuit_breaker_success_rate = 0.8
[rate_limits]
enabled = false
requests_per_second = 100
burst_capacity = 20
[deduplication]
enabled = true
window_s = 300
max_entries = 10000
[observability]
log_format = "json" # pretty | json
metrics_port = 9090 # Prometheus scrape endpoint; omit to disable
tracing_endpoint = "http://jaeger:4318" # OTLP endpoint; omit to disableEnvironment variables:
| Variable | Purpose |
|---|---|
OPENAI_API_KEY | Required for OpenAiWorker |
ANTHROPIC_API_KEY | Required for AnthropicWorker |
LLAMA_CPP_URL | llama.cpp server URL (default: http://localhost:8080) |
VLLM_URL | vLLM server URL (default: http://localhost:8000) |
RUST_LOG | Log level filter (default: info) |
RUST_LOG_FORMAT | Set to json for newline-delimited JSON logs |
JAEGER_ENDPOINT | OTLP HTTP endpoint for distributed tracing |
OTEL_EXPORTER_OTLP_ENDPOINT | Alternative OTLP endpoint variable |
§Configuration
§Timeouts
Each stage can have its own timeout, and individual requests can carry a per-request deadline. These are configured independently:
[stages.rag]
timeout_ms = 5000 # RAG stage timeout (ms)
[stages.inference]
timeout_ms = 30000 # Per-worker inference call timeout (ms)
# Overrides DEFAULT_INFERENCE_TIMEOUT_SECS (120 s)
[resilience]
retry_base_ms = 100 # Initial retry delay (ms)
retry_max_ms = 5000 # Maximum retry delay after back-off (ms)
retry_attempts = 3 # Maximum number of retries per requestSet a per-request deadline in code:
use tokio_prompt_orchestrator::{PromptRequest, SessionId};
use std::collections::HashMap;
use std::time::{Duration, Instant};
let req = PromptRequest {
session: SessionId::new("user-1"),
request_id: "req-42".into(),
input: "Summarise this document.".into(),
meta: HashMap::new(),
deadline: Some(Instant::now() + Duration::from_secs(10)),
};§Circuit breaker
The circuit breaker opens after circuit_breaker_threshold consecutive
failures, waits circuit_breaker_timeout_s seconds, then probes in
HALF-OPEN mode. It closes again when the probe success rate meets
circuit_breaker_success_rate.
[resilience]
circuit_breaker_threshold = 5 # failures before opening
circuit_breaker_timeout_s = 60 # seconds before half-open probe
circuit_breaker_success_rate = 0.8 # probe success rate to re-close§Rate limiting
Token-bucket rate limiting is opt-in. Enable with --features rate-limiting.
[rate_limits]
enabled = true
requests_per_second = 100 # sustained rate (tokens/s)
burst_capacity = 20 # burst allowance on top of the steady rate§Live Dashboard
Start the TUI dashboard with the tui feature:
cargo run --bin tui --features tuiThe dashboard shows per-stage queue depths, circuit breaker state, deduplication hit rate, latency sparklines, and a scrolling log panel.
§Web API
Enable with --features web-api. The REST and WebSocket API is documented in WEB_API.md.
# Single prompt over REST
curl -X POST http://localhost:8080/v1/prompt \
-H "Content-Type: application/json" \
-d '{"input": "What is backpressure?"}'
# Streaming over WebSocket
wscat -c ws://localhost:8080/v1/stream§MCP Integration
Enable the MCP server with --features mcp and point Claude Desktop at it:
{
"mcpServers": {
"orchestrator": {
"url": "http://127.0.0.1:8080"
}
}
}§Feature Flags
All features are opt-in. The default build has no optional features.
| Flag | Enables | Required for |
|---|---|---|
web-api | Axum HTTP server: REST, SSE, WebSocket endpoints | mcp, dashboard |
metrics-server | Prometheus /metrics HTTP scrape endpoint | – |
tui | Ratatui terminal dashboard (requires crossterm) | tui binary |
mcp | Model Context Protocol server (requires web-api) | Claude Desktop integration |
caching | Redis-backed TTL result cache | – |
rate-limiting | Token-bucket rate limiter via governor crate | – |
distributed | Redis cross-node dedup + NATS pub/sub coordination | Multi-node deployments |
self-tune | PID controllers, telemetry bus, anomaly detector, snapshot store | All self-* features |
self-modify | MetaTaskGenerator, ValidationGate, AgentMemory (requires self-tune) | self-improving |
intelligence | LearnedRouter (bandit), Autoscaler, PromptOptimizer, SemanticDedup (requires self-tune) | evolution, self-improving |
evolution | A/B experiments, snapshot rollback, transfer learning (requires self-tune + intelligence) | – |
self-improving | Meta-feature: enables all of self-tune, self-modify, intelligence, evolution | self-improve binary |
full | web-api + metrics-server + caching + rate-limiting | Full-featured single-node deployment |
schema | JSON Schema export for PipelineConfig via schemars | gen_schema binary |
dashboard | Web dashboard UI (requires web-api) | dashboard binary |
§Deployment Guide
§Standalone Binary
cargo build --release --features full
./target/release/orchestrator --config pipeline.tomlThe orchestrator exposes:
- Web API on port 8080 (configurable)
- Prometheus metrics on port 9090 (configurable)
- TUI dashboard via
cargo run --bin tui --features tui
§Docker
docker build -t tokio-prompt-orchestrator .
docker run -p 8080:8080 -p 9090:9090 \
-e ANTHROPIC_API_KEY=sk-ant-... \
tokio-prompt-orchestratorThe docker-compose.yml in the repository starts the orchestrator alongside
Redis (for distributed dedup and caching) and a Prometheus/Grafana stack. A
pre-built Grafana dashboard is available in grafana-dashboard.json.
§Distributed Mode with Redis
Enable the distributed feature and configure Redis:
[distributed]
redis_url = "redis://redis:6379"
nats_url = "nats://nats:4222"
node_id = "node-1"All nodes share a Redis cluster for cross-node deduplication and leader election. Work is distributed via NATS subjects. Session affinity ensures same-session requests land on the same node when possible.
§Tuning Guide
§Worker Count
Start with one worker per physical core. Increase if inference latency is low and throughput is the bottleneck; decrease if memory pressure is high.
The Autoscaler (enabled with self-tune feature) adjusts this automatically
based on queue depth telemetry.
§Circuit Breaker Thresholds
[resilience]
circuit_breaker_threshold = 5 # failures before opening
circuit_breaker_timeout_s = 60 # seconds before half-open probe
circuit_breaker_success_rate = 0.8 # rate required to close from half-openFor unreliable providers (high timeout rate), lower circuit_breaker_threshold
to 3 and increase circuit_breaker_timeout_s to 120.
§Dedup Window
[deduplication]
window_s = 300 # cache TTL in seconds
max_entries = 10000 # maximum cached entriesIncrease window_s for workloads with repeated identical prompts (e.g. chatbot
FAQs). Decrease for workloads where freshness matters (e.g. real-time queries).
§Channel Buffer Sizes
The default channel sizes are optimised for a typical cloud LLM with 1-10 second inference latency. For local models (< 100ms), reduce all buffers by 50% to save memory. For very slow models (> 30s), double the Inference-to-Post buffer:
spawn_pipeline_with_config(worker, PipelineConfig {
rag_channel_capacity: 512,
assemble_channel_capacity: 512,
inference_channel_capacity: 2048, // doubled for slow models
post_channel_capacity: 512,
stream_channel_capacity: 256,
..Default::default()
})§Contributing
- Fork the repository and create a feature branch.
- Run
cargo fmt --allandcargo clippy -- -D warningsbefore pushing. - Add tests for any new public API surface.
- Open a pull request against
master. CI must pass before merge.
See CONTRIBUTING.md for the full guide.
§License
MIT. See LICENSE.
Re-exports§
pub use stages::spawn_pipeline;pub use stages::spawn_pipeline_with_config;pub use stages::LogSink;pub use stages::OutputSink;pub use stages::PipelineHandles;pub use stages::SinkError;pub use worker::stream_worker;pub use worker::AnthropicWorker;pub use worker::EchoWorker;pub use worker::LlamaCppWorker;pub use worker::LoadBalancedWorker;pub use worker::ModelWorker;pub use worker::OpenAiWorker;pub use worker::VllmWorker;
Modules§
- config
- Stage: Declarative Pipeline Configuration
- coordination
- Coordination — programmatic agent fleet management
- distributed
- Stage: Distributed Clustering
- enhanced
- Enhanced resilience and performance features for the pipeline.
- evolution
- Evolution Subsystem (Phase 4)
- intelligence
- Learned intelligence subsystem — neural routing, prompt optimization,
predictive autoscaling, quality estimation, lexical dedup, feedback ingestion.
All items require the
intelligencefeature flag. - metrics
- Prometheus metrics for the orchestrator pipeline.
- metrics_
server - Metrics HTTP server
- routing
- Stage: Model Routing Intelligence
- self_
improve - Self-Improving Loop
- self_
improve_ loop - Self-Improvement Loop
- self_
modify - Self-Modifying Agent Loop
- self_
tune - Self-tuning subsystem - telemetry, PID control, experiments, anomaly detection, cost, snapshots. All items in this module are gated behind the self-tune feature flag.
- stages
- Pipeline stage implementations with structured tracing.
- tui
- Module: TUI Dashboard
- web_api
- Web API Server
- worker
- Model worker abstraction and implementations
Structs§
- Assemble
Output - Output from the prompt assembly stage.
- Dead
Letter Queue - In-memory dead-letter queue for shed pipeline requests.
- Dropped
Request - A request that was dropped (shed) by the pipeline due to backpressure or
failure. Stored in the
DeadLetterQueuefor inspection and replay. - Inference
Output - Output from the inference stage.
- Post
Output - Output from the post-processing stage.
- Prompt
Request - Initial prompt request from client
- RagOutput
- Output from the RAG (retrieval-augmented generation) stage.
- Session
Id - Unique session identifier for request tracking and affinity
Enums§
- Orchestrator
Error - Orchestrator-specific errors.
- Pipeline
Stage - Pipeline stage identifier for metrics and logging.
- Send
Outcome - Outcome of a
send_with_shedcall.
Functions§
- init_
tracing - Initialise tracing with env-filter support. Call once at binary startup.
- send_
with_ shed - Send with graceful shedding on backpressure.
- shard_
session - Session affinity sharding helper.
- try_
build_ otel_ layer - Attempt to build an OpenTelemetry tracing layer, returning
Noneon error.
Type Aliases§
- Otel
Layer - Type alias for the optional OpenTelemetry tracing layer used in main.rs.