Skip to main content

Crate tokio_prompt_orchestrator

Crate tokio_prompt_orchestrator 

Source
Expand description

§tokio-prompt-orchestrator

A production-grade orchestrator for multi-stage LLM inference pipelines over Tokio.

Provides a five-stage directed pipeline with bounded MPSC channels, backpressure, request deduplication, circuit breakers, retry logic, rate limiting, Prometheus metrics, OpenTelemetry tracing, and an optional autonomous self-improving control loop.

§Architecture

Five-stage pipeline with bounded channels and backpressure:

PromptRequest -> RAG(512) -> Assemble(512) -> Inference(1024) -> Post(512) -> Stream(256)

Each stage runs as an independent tokio::task. When a downstream channel fills, send_with_shed drops the incoming item and records it in the DeadLetterQueue rather than blocking the upstream stage.

§Quick Start

use std::collections::HashMap;
use std::sync::Arc;
use tokio_prompt_orchestrator::{spawn_pipeline, EchoWorker, PromptRequest, SessionId, ModelWorker};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let worker: Arc<dyn ModelWorker> = Arc::new(EchoWorker::new());
    let handles = spawn_pipeline(worker);

    handles.input_tx.send(PromptRequest {
        session: SessionId::new("demo"),
        request_id: "req-1".to_string(),
        input: "Hello, pipeline!".to_string(),
        meta: HashMap::new(),
        deadline: None,
    }).await?;

    let mut guard = handles.output_rx.lock().await;
    if let Some(rx) = guard.as_mut() {
        if let Some(output) = rx.recv().await {
            println!("{}", output.text);
        }
    }
    Ok(())
}

§Module Organisation

ModuleDescription
stagesFive pipeline stage implementations and channel wiring
workerModelWorker trait and five production implementations
enhancedResilience primitives: circuit breaker, dedup, retry, cache, rate limiter
metricsPrometheus metrics initialisation and helper functions
configTOML-deserialisable PipelineConfig with hot-reload support
routingModelRouter for complexity-scored local/cloud routing
coordinationAgent fleet management and task claiming
self_tunePID controllers and telemetry bus (feature: self-tune)
self_modifyTask generation and validation gate (feature: self-modify)
intelligenceLearned router and autoscaler (feature: intelligence)
evolutionA/B experiments and snapshot rollback (feature: evolution)
self_improveWired self-improving loop (features: self-tune+self-modify+intelligence)
web_apiREST/SSE/WebSocket server (feature: web-api)
distributedRedis dedup and NATS coordination (feature: distributed)
tuiRatatui terminal dashboard (feature: tui)

§tokio-prompt-orchestrator

CI Coverage Crates.io docs.rs GitHub Pages Rust 1.85+ License: MIT

Multi-core, Tokio-native orchestration for LLM inference pipelines. Built with bounded backpressure channels, circuit breakers, request deduplication, and an optional autonomous self-tuning control loop.


§Architecture

The pipeline is a five-stage directed acyclic graph of bounded async channels. Each stage runs as an independent Tokio task. Backpressure propagates upstream when a downstream channel fills, and excess requests are shed gracefully to a dead-letter queue rather than blocking the pipeline.

[Prompt Submissions] -> [Dedup Stage] -> [Circuit Breaker] -> [Rate Limiter]
                                                                     |
                                                           [Worker Pool (Tokio)]
                                                                     |
                                                     [LLM Providers (Anthropic/OpenAI)]
                                                                     |
                                                 [Self-Improving Control Loop]
                                                                     |
                       [Prometheus Metrics] <- [OpenTelemetry] <- [Results]
                       [Web API (HTTP/WS)]
                        +------------------+
  PromptRequest ------> |  RAG Stage       | cap: 512
                        | (context fetch)  |
                        +--------+---------+
                                 |
                        +--------v---------+
                        |  Assemble Stage  | cap: 512
                        | (prompt builder) |
                        +--------+---------+
                                 |
                        +--------v---------+
                        |  Inference Stage | cap: 1024
                        | (model worker)   |
                        +--------+---------+
                                 |
                        +--------v---------+
                        |  Post Stage      | cap: 512
                        | (filter/format)  |
                        +--------+---------+
                                 |
                        +--------v---------+
                        |  Stream Stage    | cap: 256
                        | (output sink)    |
                        +------------------+

Resilience layers applied at the inference stage:

  • Deduplication: in-flight requests with identical prompts are coalesced into a single inference call; all waiting callers receive the same result.
  • Circuit breaker: opens on consecutive failures, enters half-open probe mode after a configurable timeout.
  • Retry with exponential backoff and jitter.
  • Rate limiter: token-bucket guard at the pipeline entry point.
  • Dead-letter queue: shed requests are stored in a ring buffer for inspection and replay.

§Quickstart

§Prebuilt binary (no Rust required)

Download orchestrator.exe from the releases page and run it. The first launch runs an interactive setup wizard that saves your API key and provider preference to orchestrator.env.

Which AI provider do you want to use?

1) Anthropic  (Claude)
2) OpenAI     (GPT-4o)
3) llama.cpp  (local, no key)
4) echo       (offline test mode)

Enter 1, 2, 3, or 4 [1]:

After setup the orchestrator starts a terminal REPL and a web API on http://127.0.0.1:8080 simultaneously. You can type prompts in the terminal while agents and IDEs connect over HTTP in the background.

§Library usage

Add the dependency:

[dependencies]
tokio-prompt-orchestrator = "0.1"
tokio = { version = "1", features = ["rt-multi-thread", "macros"] }

Minimal async example using the echo worker (no API key required):

use std::collections::HashMap;
use tokio_prompt_orchestrator::{
    spawn_pipeline, EchoWorker, PromptRequest, SessionId,
};
use std::sync::Arc;

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    // Spawn the five-stage pipeline with the echo worker.
    // Swap EchoWorker for OpenAiWorker, AnthropicWorker, LlamaCppWorker, or VllmWorker.
    let worker: Arc<dyn tokio_prompt_orchestrator::ModelWorker> = Arc::new(EchoWorker::new());
    let handles = spawn_pipeline(worker);

    // Send a prompt into the pipeline.
    handles
        .input_tx
        .send(PromptRequest {
            session: SessionId::new("demo"),
            request_id: "req-1".to_string(),
            input: "Hello, pipeline!".to_string(),
            meta: HashMap::new(),
            deadline: None,
        })
        .await?;

    // Collect output from the stream stage.
    let mut guard = handles.output_rx.lock().await;
    if let Some(rx) = guard.as_mut() {
        if let Some(output) = rx.recv().await {
            println!("Response: {}", output.text);
        }
    }

    Ok(())
}

See examples/ for REST, WebSocket, SSE streaming, OpenAI, Anthropic, llama.cpp, and multi-worker round-robin setups.


§API Overview

TypeModuleDescription
PromptRequestlibInput message sent into the pipeline
SessionIdlibSession identifier for affinity sharding
OrchestratorErrorlibCrate-level error enum
ModelWorkerworkerAsync trait implemented by all inference backends
EchoWorkerworkerReturns prompt words as tokens; for testing
OpenAiWorkerworkerOpenAI chat completions API
AnthropicWorkerworkerAnthropic Messages API
LlamaCppWorkerworkerLocal llama.cpp HTTP server
VllmWorkerworkervLLM inference server
LoadBalancedWorkerworkerRound-robin or least-loaded pool of workers
spawn_pipelinestagesLaunch the five-stage pipeline, return channel handles
spawn_pipeline_with_configstagesSame, with a full PipelineConfig
PipelineConfigconfigTOML-deserialisable root configuration type
CircuitBreakerenhancedFailure-rate circuit breaker
DeduplicatorenhancedIn-flight request coalescer
RetryPolicyenhancedExponential backoff with jitter
CacheLayerenhancedTTL LRU cache for inference results
PriorityQueueenhancedFour-level priority scheduler
DeadLetterQueuelibRing buffer of shed requests
send_with_shedlibNon-blocking channel send with graceful shedding
shard_sessionlibFNV-1a session affinity shard helper

§Configuration Reference

The orchestrator is configured via a TOML file. Pass it with --config pipeline.toml.

[pipeline]
name        = "production"
version     = "1.0"
description = "Optional description"

[stages.rag]
enabled           = true
timeout_ms        = 5000
max_context_tokens = 2048

[stages.assemble]
enabled          = true
channel_capacity = 512

[stages.inference]
worker      = "open_ai"   # open_ai | anthropic | llama_cpp | vllm | echo
model       = "gpt-4o"
max_tokens  = 1024
temperature = 0.7
timeout_ms  = 30000

[stages.post_process]
enabled = true

[stages.stream]
enabled = true

[resilience]
retry_attempts               = 3
retry_base_ms                = 100
retry_max_ms                 = 5000
circuit_breaker_threshold    = 5
circuit_breaker_timeout_s    = 60
circuit_breaker_success_rate = 0.8

[rate_limits]
enabled             = false
requests_per_second = 100
burst_capacity      = 20

[deduplication]
enabled    = true
window_s   = 300
max_entries = 10000

[observability]
log_format        = "json"    # pretty | json
metrics_port      = 9090      # Prometheus scrape endpoint; omit to disable
tracing_endpoint  = "http://jaeger:4318"  # OTLP endpoint; omit to disable

Environment variables:

VariablePurpose
OPENAI_API_KEYRequired for OpenAiWorker
ANTHROPIC_API_KEYRequired for AnthropicWorker
LLAMA_CPP_URLllama.cpp server URL (default: http://localhost:8080)
VLLM_URLvLLM server URL (default: http://localhost:8000)
RUST_LOGLog level filter (default: info)
RUST_LOG_FORMATSet to json for newline-delimited JSON logs
JAEGER_ENDPOINTOTLP HTTP endpoint for distributed tracing
OTEL_EXPORTER_OTLP_ENDPOINTAlternative OTLP endpoint variable

§Configuration

§Timeouts

Each stage can have its own timeout, and individual requests can carry a per-request deadline. These are configured independently:

[stages.rag]
timeout_ms = 5000       # RAG stage timeout (ms)

[stages.inference]
timeout_ms = 30000      # Per-worker inference call timeout (ms)
                        # Overrides DEFAULT_INFERENCE_TIMEOUT_SECS (120 s)

[resilience]
retry_base_ms = 100     # Initial retry delay (ms)
retry_max_ms  = 5000    # Maximum retry delay after back-off (ms)
retry_attempts = 3      # Maximum number of retries per request

Set a per-request deadline in code:

use tokio_prompt_orchestrator::{PromptRequest, SessionId};
use std::collections::HashMap;
use std::time::{Duration, Instant};

let req = PromptRequest {
    session: SessionId::new("user-1"),
    request_id: "req-42".into(),
    input: "Summarise this document.".into(),
    meta: HashMap::new(),
    deadline: Some(Instant::now() + Duration::from_secs(10)),
};

§Circuit breaker

The circuit breaker opens after circuit_breaker_threshold consecutive failures, waits circuit_breaker_timeout_s seconds, then probes in HALF-OPEN mode. It closes again when the probe success rate meets circuit_breaker_success_rate.

[resilience]
circuit_breaker_threshold    = 5    # failures before opening
circuit_breaker_timeout_s    = 60   # seconds before half-open probe
circuit_breaker_success_rate = 0.8  # probe success rate to re-close

§Rate limiting

Token-bucket rate limiting is opt-in. Enable with --features rate-limiting.

[rate_limits]
enabled             = true
requests_per_second = 100   # sustained rate (tokens/s)
burst_capacity      = 20    # burst allowance on top of the steady rate

§Live Dashboard

Start the TUI dashboard with the tui feature:

cargo run --bin tui --features tui

The dashboard shows per-stage queue depths, circuit breaker state, deduplication hit rate, latency sparklines, and a scrolling log panel.


§Web API

Enable with --features web-api. The REST and WebSocket API is documented in WEB_API.md.

# Single prompt over REST
curl -X POST http://localhost:8080/v1/prompt \
  -H "Content-Type: application/json" \
  -d '{"input": "What is backpressure?"}'

# Streaming over WebSocket
wscat -c ws://localhost:8080/v1/stream

§MCP Integration

Enable the MCP server with --features mcp and point Claude Desktop at it:

{
  "mcpServers": {
    "orchestrator": {
      "url": "http://127.0.0.1:8080"
    }
  }
}

§Feature Flags

All features are opt-in. The default build has no optional features.

FlagEnablesRequired for
web-apiAxum HTTP server: REST, SSE, WebSocket endpointsmcp, dashboard
metrics-serverPrometheus /metrics HTTP scrape endpoint
tuiRatatui terminal dashboard (requires crossterm)tui binary
mcpModel Context Protocol server (requires web-api)Claude Desktop integration
cachingRedis-backed TTL result cache
rate-limitingToken-bucket rate limiter via governor crate
distributedRedis cross-node dedup + NATS pub/sub coordinationMulti-node deployments
self-tunePID controllers, telemetry bus, anomaly detector, snapshot storeAll self-* features
self-modifyMetaTaskGenerator, ValidationGate, AgentMemory (requires self-tune)self-improving
intelligenceLearnedRouter (bandit), Autoscaler, PromptOptimizer, SemanticDedup (requires self-tune)evolution, self-improving
evolutionA/B experiments, snapshot rollback, transfer learning (requires self-tune + intelligence)
self-improvingMeta-feature: enables all of self-tune, self-modify, intelligence, evolutionself-improve binary
fullweb-api + metrics-server + caching + rate-limitingFull-featured single-node deployment
schemaJSON Schema export for PipelineConfig via schemarsgen_schema binary
dashboardWeb dashboard UI (requires web-api)dashboard binary

§Deployment Guide

§Standalone Binary

cargo build --release --features full
./target/release/orchestrator --config pipeline.toml

The orchestrator exposes:

  • Web API on port 8080 (configurable)
  • Prometheus metrics on port 9090 (configurable)
  • TUI dashboard via cargo run --bin tui --features tui

§Docker

docker build -t tokio-prompt-orchestrator .
docker run -p 8080:8080 -p 9090:9090 \
  -e ANTHROPIC_API_KEY=sk-ant-... \
  tokio-prompt-orchestrator

The docker-compose.yml in the repository starts the orchestrator alongside Redis (for distributed dedup and caching) and a Prometheus/Grafana stack. A pre-built Grafana dashboard is available in grafana-dashboard.json.

§Distributed Mode with Redis

Enable the distributed feature and configure Redis:

[distributed]
redis_url = "redis://redis:6379"
nats_url  = "nats://nats:4222"
node_id   = "node-1"

All nodes share a Redis cluster for cross-node deduplication and leader election. Work is distributed via NATS subjects. Session affinity ensures same-session requests land on the same node when possible.


§Tuning Guide

§Worker Count

Start with one worker per physical core. Increase if inference latency is low and throughput is the bottleneck; decrease if memory pressure is high.

The Autoscaler (enabled with self-tune feature) adjusts this automatically based on queue depth telemetry.

§Circuit Breaker Thresholds

[resilience]
circuit_breaker_threshold    = 5    # failures before opening
circuit_breaker_timeout_s    = 60   # seconds before half-open probe
circuit_breaker_success_rate = 0.8  # rate required to close from half-open

For unreliable providers (high timeout rate), lower circuit_breaker_threshold to 3 and increase circuit_breaker_timeout_s to 120.

§Dedup Window

[deduplication]
window_s    = 300    # cache TTL in seconds
max_entries = 10000  # maximum cached entries

Increase window_s for workloads with repeated identical prompts (e.g. chatbot FAQs). Decrease for workloads where freshness matters (e.g. real-time queries).

§Channel Buffer Sizes

The default channel sizes are optimised for a typical cloud LLM with 1-10 second inference latency. For local models (< 100ms), reduce all buffers by 50% to save memory. For very slow models (> 30s), double the Inference-to-Post buffer:

spawn_pipeline_with_config(worker, PipelineConfig {
    rag_channel_capacity: 512,
    assemble_channel_capacity: 512,
    inference_channel_capacity: 2048,   // doubled for slow models
    post_channel_capacity: 512,
    stream_channel_capacity: 256,
    ..Default::default()
})

§Contributing

  1. Fork the repository and create a feature branch.
  2. Run cargo fmt --all and cargo clippy -- -D warnings before pushing.
  3. Add tests for any new public API surface.
  4. Open a pull request against master. CI must pass before merge.

See CONTRIBUTING.md for the full guide.


§License

MIT. See LICENSE.

Re-exports§

pub use stages::spawn_pipeline;
pub use stages::spawn_pipeline_with_config;
pub use stages::LogSink;
pub use stages::OutputSink;
pub use stages::PipelineHandles;
pub use stages::SinkError;
pub use worker::stream_worker;
pub use worker::AnthropicWorker;
pub use worker::EchoWorker;
pub use worker::LlamaCppWorker;
pub use worker::LoadBalancedWorker;
pub use worker::ModelWorker;
pub use worker::OpenAiWorker;
pub use worker::VllmWorker;

Modules§

config
Stage: Declarative Pipeline Configuration
coordination
Coordination — programmatic agent fleet management
distributed
Stage: Distributed Clustering
enhanced
Enhanced resilience and performance features for the pipeline.
evolution
Evolution Subsystem (Phase 4)
intelligence
Learned intelligence subsystem — neural routing, prompt optimization, predictive autoscaling, quality estimation, lexical dedup, feedback ingestion. All items require the intelligence feature flag.
metrics
Prometheus metrics for the orchestrator pipeline.
metrics_server
Metrics HTTP server
routing
Stage: Model Routing Intelligence
self_improve
Self-Improving Loop
self_improve_loop
Self-Improvement Loop
self_modify
Self-Modifying Agent Loop
self_tune
Self-tuning subsystem - telemetry, PID control, experiments, anomaly detection, cost, snapshots. All items in this module are gated behind the self-tune feature flag.
stages
Pipeline stage implementations with structured tracing.
tui
Module: TUI Dashboard
web_api
Web API Server
worker
Model worker abstraction and implementations

Structs§

AssembleOutput
Output from the prompt assembly stage.
DeadLetterQueue
In-memory dead-letter queue for shed pipeline requests.
DroppedRequest
A request that was dropped (shed) by the pipeline due to backpressure or failure. Stored in the DeadLetterQueue for inspection and replay.
InferenceOutput
Output from the inference stage.
PostOutput
Output from the post-processing stage.
PromptRequest
Initial prompt request from client
RagOutput
Output from the RAG (retrieval-augmented generation) stage.
SessionId
Unique session identifier for request tracking and affinity

Enums§

OrchestratorError
Orchestrator-specific errors.
PipelineStage
Pipeline stage identifier for metrics and logging.
SendOutcome
Outcome of a send_with_shed call.

Functions§

init_tracing
Initialise tracing with env-filter support. Call once at binary startup.
send_with_shed
Send with graceful shedding on backpressure.
shard_session
Session affinity sharding helper.
try_build_otel_layer
Attempt to build an OpenTelemetry tracing layer, returning None on error.

Type Aliases§

OtelLayer
Type alias for the optional OpenTelemetry tracing layer used in main.rs.