rust-pipe 0.1.1

Lightweight typed task dispatch from Rust to polyglot workers (TypeScript, Python, Go, Java, C#, Ruby, Elixir, Swift, PHP)
Documentation

rust-pipe

Lightweight typed task dispatch from Rust to polyglot workers (TypeScript, Python, Go, Java, C#, Ruby, Elixir, Swift, PHP).

Crates.io Docs.rs License


What it does

rust-pipe sits between "raw gRPC" (too low-level) and "Temporal" (too heavy). It gives you typed task dispatch from a Rust orchestrator to workers written in any language โ€” with zero boilerplate.

%%{init: {'theme': 'dark', 'themeVariables': { 'primaryColor': '#1f2937', 'primaryTextColor': '#f9fafb', 'primaryBorderColor': '#6366f1', 'lineColor': '#818cf8', 'secondaryColor': '#374151', 'tertiaryColor': '#111827', 'edgeLabelBackground': '#1f2937', 'fontSize': '14px'}}}%%

graph LR
    subgraph RUST["๐Ÿฆ€ Rust Dispatcher"]
        direction TB
        D[/"Task Dispatch Engine"\]
        P[("Worker Pool")]
        D --- P
    end

    subgraph TRANSPORTS["โšก Transports"]
        direction TB
        WS["๐ŸŒ WebSocket"]
        STDIO["๐Ÿ“Ÿ stdio"]
        DOCK["๐Ÿณ Docker"]
        SSHT["๐Ÿ”‘ SSH"]
        WASM["โš™๏ธ WASM"]
    end

    subgraph WORKERS["๐Ÿ”ง Workers"]
        direction TB
        TS["TypeScript"]
        PY["Python"]
        GO["Go"]
        JAVA["Java"]
        CS["C#"]
        RUBY["Ruby"]
        CLI["Any CLI"]
    end

    RUST ==>|route| TRANSPORTS
    TRANSPORTS ==>|deliver| WORKERS

    style RUST fill:#1e1b4b,stroke:#6366f1,stroke-width:2px,color:#e0e7ff
    style TRANSPORTS fill:#172554,stroke:#3b82f6,stroke-width:2px,color:#dbeafe
    style WORKERS fill:#052e16,stroke:#22c55e,stroke-width:2px,color:#dcfce7

How it works

%%{init: {'theme': 'dark', 'themeVariables': { 'primaryColor': '#1f2937', 'primaryTextColor': '#f9fafb', 'primaryBorderColor': '#6366f1', 'lineColor': '#818cf8', 'secondaryColor': '#374151', 'tertiaryColor': '#111827', 'actorBkg': '#312e81', 'actorBorder': '#6366f1', 'actorTextColor': '#e0e7ff', 'activationBkgColor': '#1e3a5f', 'activationBorderColor': '#3b82f6', 'signalColor': '#818cf8', 'noteBkgColor': '#1c1917', 'noteBorderColor': '#a16207', 'noteTextColor': '#fef3c7'}}}%%

sequenceDiagram
    participant D as ๐Ÿฆ€ Dispatcher
    participant W as ๐Ÿ”ง Worker

    rect rgba(99, 102, 241, 0.1)
        Note over D,W: ๐Ÿ”Œ Connection Phase
        W->>+D: Connect via WebSocket
        W->>D: WorkerRegister {tasks, capacity}
        D->>-W: WorkerRegistered โœ“
        D-->>D: Add to pool (least-loaded routing)
    end

    rect rgba(34, 197, 94, 0.1)
        Note over D,W: ๐Ÿš€ Dispatch Phase
        D->>D: select_and_reserve(task_type)
        D->>+W: TaskDispatch {taskType, payload}
        W->>W: Execute handler
        W->>-D: TaskResult {status: Completed, payload}
        D-->>D: Resolve DispatchResult to caller
    end

    rect rgba(59, 130, 246, 0.1)
        Note over D,W: ๐Ÿ’“ Health Monitoring
        loop Every 5 seconds
            W->>D: Heartbeat {activeTasks, capacity}
        end
    end

    rect rgba(239, 68, 68, 0.1)
        Note over D,W: โ˜ ๏ธ Failure Detection
        D->>D: Heartbeat timeout exceeded
        D-->>D: Mark worker Dead
        D-->>D: Fail pending tasks โ†’ WorkerDisconnected
    end

Architecture

%%{init: {'theme': 'dark', 'themeVariables': { 'primaryColor': '#1f2937', 'primaryTextColor': '#f9fafb', 'primaryBorderColor': '#6366f1', 'lineColor': '#818cf8', 'secondaryColor': '#374151', 'fontSize': '13px'}}}%%

graph TB
    subgraph CORE["๐Ÿฆ€ rust-pipe crate"]
        direction TB
        DISP["<b>Dispatcher</b><br/>Task routing & lifecycle"]
        POOL["<b>Worker Pool</b><br/>Least-loaded selection"]
        VALID["<b>Validation</b><br/>Input sanitization"]
        SCHEMA["<b>Schema</b><br/>Task, Result, Priority"]
        DISP --> POOL
        DISP --> VALID
        DISP --> SCHEMA
    end

    subgraph TRANS["โšก Transport Layer"]
        direction LR
        WS["๐ŸŒ<br/>WebSocket<br/><i>networked</i>"]
        STDIO["๐Ÿ“Ÿ<br/>stdio<br/><i>local CLI</i>"]
        DOCK["๐Ÿณ<br/>Docker<br/><i>isolated</i>"]
        SSHT["๐Ÿ”‘<br/>SSH<br/><i>remote</i>"]
        WASMT["โš™๏ธ<br/>WASM<br/><i>sandboxed</i>"]
    end

    subgraph SDKS["๐Ÿ“ฆ Language SDKs"]
        direction LR
        S1["TypeScript<br/><code>@rust-pipe/worker</code>"]
        S2["Python<br/><code>pip install rust-pipe</code>"]
        S3["Go<br/><code>rust-pipe-go</code>"]
        S4["Java ยท C# ยท Ruby<br/>Elixir ยท Swift ยท PHP"]
        S5["<b>Any CLI</b><br/><i>No SDK needed</i>"]
    end

    CORE ==> TRANS
    TRANS ==> SDKS

    style CORE fill:#1e1b4b,stroke:#6366f1,stroke-width:2px,color:#e0e7ff
    style TRANS fill:#172554,stroke:#3b82f6,stroke-width:2px,color:#dbeafe
    style SDKS fill:#052e16,stroke:#22c55e,stroke-width:2px,color:#dcfce7
    style DISP fill:#312e81,stroke:#818cf8,color:#e0e7ff
    style POOL fill:#312e81,stroke:#818cf8,color:#e0e7ff
    style VALID fill:#312e81,stroke:#818cf8,color:#e0e7ff
    style SCHEMA fill:#312e81,stroke:#818cf8,color:#e0e7ff
    style WS fill:#1e3a5f,stroke:#60a5fa,color:#dbeafe
    style STDIO fill:#1e3a5f,stroke:#60a5fa,color:#dbeafe
    style DOCK fill:#1e3a5f,stroke:#60a5fa,color:#dbeafe
    style SSHT fill:#1e3a5f,stroke:#60a5fa,color:#dbeafe
    style WASMT fill:#1e3a5f,stroke:#60a5fa,color:#dbeafe

Features

Feature Description
๐ŸŽฏ Least-loaded routing Tasks go to the worker with most available capacity
๐Ÿ’€ Dead worker detection Heartbeat timeout marks workers dead, fails their pending tasks
๐Ÿšฆ Backpressure Workers signal overload, dispatcher throttles
๐Ÿ›‘ Graceful shutdown stop() drains tasks and cleanly disconnects
๐Ÿ”’ Input validation All transport configs validated against command injection
๐Ÿ”„ Idempotent start Safe to call start() multiple times
โฑ๏ธ Timeout support Per-task timeouts with await_with_timeout()
๐Ÿ“Š Pool stats Real-time visibility into worker capacity and utilization

Task Lifecycle

%%{init: {'theme': 'dark', 'themeVariables': { 'primaryColor': '#312e81', 'primaryTextColor': '#e0e7ff', 'primaryBorderColor': '#6366f1', 'lineColor': '#818cf8', 'secondaryColor': '#1e3a5f', 'tertiaryColor': '#1c1917', 'fontSize': '13px'}}}%%

stateDiagram-v2
    direction LR

    [*] --> Pending: Task::new()
    Pending --> Dispatched: dispatch()
    Dispatched --> Running: Worker receives

    Running --> Completed: โœ… Success
    Running --> Failed: โŒ Error
    Running --> TimedOut: โฑ๏ธ Deadline exceeded

    Dispatched --> Failed: Transport error
    Running --> Cancelled: Kill signal

    Completed --> [*]
    Failed --> [*]
    TimedOut --> [*]
    Cancelled --> [*]

Quick start

Rust (dispatcher)

use rust_pipe::prelude::*;
use serde_json::json;
use std::time::Duration;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
    let dispatcher = Dispatcher::builder()
        .host("0.0.0.0")
        .port(9876)
        .build();

    dispatcher.start().await?;

    // Workers connect via WebSocket and register their supported task types.
    // Once connected, dispatch tasks:
    let task = Task::new("scan-target", json!({
        "url": "https://example.com",
        "checks": ["xss", "sqli"]
    }))
    .with_timeout(60_000)
    .with_priority(Priority::High);

    let handle = dispatcher.dispatch(task).await?;
    let result = handle.await_with_timeout(Duration::from_secs(60)).await?;

    println!("Result: {:?}", result.payload);
    Ok(())
}

TypeScript (worker)

import { createWorker } from '@rust-pipe/worker';

const worker = createWorker({
  url: 'ws://localhost:9876',
  handlers: {
    'scan-target': async (task) => {
      // Do work...
      return { vulnerabilities: 3 };
    },
  },
});

await worker.start();

Python (worker)

from rust_pipe import create_worker

async def scan_target(task):
    # Do work...
    return {"vulnerabilities": 3}

worker = create_worker(
    url="ws://localhost:9876",
    handlers={"scan-target": scan_target},
)
await worker.start()

Bash (worker via stdio โ€” no SDK needed)

#!/bin/bash
# Read JSON task from stdin, write JSON result to stdout
while IFS= read -r line; do
  task_id=$(echo "$line" | jq -r '.task.id')
  echo "{\"type\":\"TaskResult\",\"result\":{\"taskId\":\"$task_id\",\"status\":\"Completed\",\"payload\":{\"done\":true},\"durationMs\":1,\"workerId\":\"bash\"}}"
done

SDKs

Language Package Install
TypeScript @rust-pipe/worker npm install @rust-pipe/worker
Python rust-pipe pip install rust-pipe
Go rust-pipe-go go get github.com/albyte-ai/rust-pipe-go
Java io.rustpipe:rust-pipe-worker Maven Central
C# RustPipe.Worker dotnet add package RustPipe.Worker
Ruby rust_pipe gem install rust_pipe
Elixir rust_pipe {:rust_pipe, "~> 0.1.0"} in mix.exs
Swift RustPipe Swift Package Manager
PHP rust-pipe/worker composer require rust-pipe/worker
Any CLI None needed Read/write JSON on stdin/stdout

Wire Protocol

%%{init: {'theme': 'dark', 'themeVariables': { 'primaryColor': '#1f2937', 'primaryTextColor': '#f9fafb', 'primaryBorderColor': '#6366f1', 'lineColor': '#818cf8', 'fontSize': '12px'}}}%%

graph LR
    subgraph DISPATCH["๐Ÿ“ค Dispatcher โ†’ Worker"]
        TD["<b>TaskDispatch</b><br/>{task: {id, taskType, payload, metadata}}"]
        KILL["<b>Kill</b><br/>{taskId, reason}"]
        SHUT["<b>Shutdown</b><br/>{graceful: bool}"]
    end

    subgraph RESPONSE["๐Ÿ“ฅ Worker โ†’ Dispatcher"]
        TR["<b>TaskResult</b><br/>{taskId, status, payload, durationMs}"]
        HB["<b>Heartbeat</b><br/>{activeTasks, capacity, uptimeSeconds}"]
        BP["<b>Backpressure</b><br/>{currentLoad, shouldThrottle}"]
        WR["<b>WorkerRegister</b><br/>{workerId, supportedTasks, maxConcurrency}"]
    end

    style DISPATCH fill:#1e1b4b,stroke:#6366f1,stroke-width:2px,color:#e0e7ff
    style RESPONSE fill:#052e16,stroke:#22c55e,stroke-width:2px,color:#dcfce7
    style TD fill:#312e81,stroke:#818cf8,color:#e0e7ff
    style KILL fill:#312e81,stroke:#818cf8,color:#e0e7ff
    style SHUT fill:#312e81,stroke:#818cf8,color:#e0e7ff
    style TR fill:#14532d,stroke:#4ade80,color:#dcfce7
    style HB fill:#14532d,stroke:#4ade80,color:#dcfce7
    style BP fill:#14532d,stroke:#4ade80,color:#dcfce7
    style WR fill:#14532d,stroke:#4ade80,color:#dcfce7

All fields use camelCase. JSON over WebSocket or stdin/stdout:

{"type": "TaskDispatch", "task": {"id": "...", "taskType": "scan", "payload": {...}}}
{"type": "TaskResult", "result": {"taskId": "...", "status": "Completed", "payload": {...}}}
{"type": "Heartbeat", "payload": {"workerId": "w1", "activeTasks": 2, "capacity": 10}}

Transports

Transport Use case Requires SDK? Isolation
๐ŸŒ WebSocket Networked workers, auto-scaling pools Yes Process
๐Ÿ“Ÿ stdio Any CLI tool as a worker No Process
๐Ÿณ Docker Untrusted workloads, resource limits No Container
๐Ÿ”‘ SSH Distribute across machines No Machine
โš™๏ธ WASM Plugin systems, user-provided logic No Sandbox

Safety

Protection Implementation
Command injection Input validation on worker IDs, Docker images, hostnames, env vars
Resource exhaustion Capacity-based routing, backpressure signaling
Hanging tasks Dead worker detection fails pending tasks automatically
Data leakage Per-worker task isolation, no shared state between workers
Denial of service Rate limiting via max_concurrency per worker

License

Licensed under either of:

at your option.