firq-tower 0.1.3

Tower middleware integration for Firq scheduling
Documentation

Firq — Multi-tenant Scheduler for Rust Services

Crates.io (firq-core) Crates.io (firq-async) Crates.io (firq-tower) Docs.rs (firq-core) Docs.rs (firq-async) Docs.rs (firq-tower) License: MIT OR Apache-2.0

Firq is an in-process scheduler for Rust backends that need stable tail latency under contention.

Key capabilities:

  • Fair scheduling across tenants (DRR).
  • Explicit backpressure and bounded memory behavior.
  • Deadline-aware dequeue (DropExpired semantics).
  • Metrics for queue saturation, drops, rejections, and queue-time percentiles.

firq-core is runtime-agnostic. firq-async adds Tokio support. firq-tower integrates with Tower/Axum.

Install

From crates.io

[dependencies]
firq-core = "0.1.3"

Tokio integration:

[dependencies]
firq-core = "0.1.3"
firq-async = "0.1.3"

Tower/Axum integration:

[dependencies]
firq-core = "0.1.3"
firq-async = "0.1.3"
firq-tower = "0.1.3"

Optional metrics helpers (firq_core::prometheus) are behind feature metrics (enabled by default):

[dependencies]
firq-core = "0.1.3"

Disable metrics helpers if you only need scheduling primitives:

[dependencies]
firq-core = { version = "0.1.3", default-features = false }

From source

git clone https://github.com/saulhs12/firq.git
cd firq
cargo build --workspace

Quick start

cargo test -p firq-core
cargo test -p firq-async
cargo test -p firq-tower --test integration
cargo check -p firq-examples --bins
cargo run -p firq-examples --bin async
cargo run -p firq-examples --bin async_worker

Stability / SemVer

  • Firq is currently in 0.x; minor releases may include API-breaking changes.
  • Breaking changes include removing/renaming public types/functions, changing enum variants, or changing behavior in a way that requires code changes.
  • Patch releases (0.1.z) aim to be backward compatible and focus on fixes/hardening.
  • For production, pin a concrete release (=0.1.3) or a conservative range (~0.1.3) and review CHANGELOG.md before upgrades.
  • MSRV: Rust 1.85+ (rust-version = "1.85" in firq-core, firq-async, and firq-tower).

Docs & examples

  • firq-core: core scheduler API and crate-level minimal example. cargo add firq-core@0.1.3
  • firq-async: Tokio adapter and worker-backed consumer example. cargo add firq-async@0.1.3
  • firq-tower: Tower/Axum layer with header-based tenant extraction. cargo add firq-tower@0.1.3
  • All three crates include copyable crate-level examples in lib.rs docs.

Scheduler guarantees and non-guarantees

Guarantees:

  • DRR scheduling gives active tenants recurring turns (no permanent starvation for runnable tenants).
  • Queue limits (max_global, max_per_tenant) bound live pending work.
  • Cancellation and deadline expiry are reclaimed lazily and should not permanently consume capacity.
  • stats() counters are monotonic snapshots suitable for alerting and regression checks.
  • Fairness is cost-weighted: each dequeue charges task.cost against tenant deficit, refilled by quantum.
  • Priority is strict (High -> Normal -> Low) in dispatch order, so sustained High traffic can delay lower-priority work.

Non-guarantees:

  • No strict global FIFO ordering across tenants.
  • No cross-process fairness guarantee (scheduler is per-process/in-memory).
  • No hard latency SLA by itself; tune quantum, cost, capacity, and worker parallelism with production traffic.

When to use / not use

Use Firq when:

  • You need in-process multi-tenant fairness and backpressure in a Rust service.
  • You want deadline-aware admission/dispatch and explicit queue saturation signals.
  • You are integrating with sync workers, Tokio, or Tower/Axum middleware.

Do not use Firq when:

  • You need durable or distributed queue semantics.
  • You need fairness across multiple processes/nodes by itself.
  • You need persistent job orchestration/retries as the primary concern.

Scheduling flow

producers -> enqueue(tenant, priority, cost, deadline)
          -> sharded per-tenant priority queues
          -> DRR selection (cost vs quantum)
          -> dequeue
          -> worker pool / async consumer / tower layer

How to choose parameters

Use these as starting points, then tune with real traffic and stats() metrics.

  • shards: Start with min(physical_cores, 8). Increase when many tenants are hot concurrently and enqueue lock contention is visible.
  • max_global and max_per_tenant: Size from memory budget first. Approximate memory as max_global * avg_task_size. Keep max_per_tenant low enough that one tenant cannot monopolize memory.
  • quantum and cost: Treat cost as "work units" per task, and quantum as work units granted per round. If heavy jobs are starving, increase quantum or lower heavy-job cost calibration.
  • deadlines: Use when stale work should be discarded (timeouts/SLO breaches). Expired items are removed lazily and should not permanently consume queue limits.
  • backpressure policy: Reject for strict admission control, DropOldestPerTenant/DropNewestPerTenant for lossy workloads, Timeout when producers can wait briefly for capacity.

Queue limits are enforced against live pending work. Cancelled/expired entries are compacted lazily on dequeue and enqueue maintenance passes.

Starting profiles (adjust with production traffic):

  • Public API: Reject or short Timeout (5-20ms), conservative max_per_tenant, in_flight_limit near available CPU parallelism.
  • Internal RPC/workers: Reject with moderate max_per_tenant, calibrate cost for heavy endpoints, increase quantum if heavy tasks starve.
  • Lossy telemetry/batch edge: DropOldestPerTenant or DropNewestPerTenant based on whether freshness or completeness matters more.

Core usage (firq-core)

Use this when workers are synchronous (threads) or when direct control over dequeue loops is required.

use firq_core::{
    BackpressurePolicy, DequeueResult, EnqueueResult, Priority, Scheduler, SchedulerConfig, Task,
    TenantKey,
};
use std::collections::HashMap;
use std::time::Instant;

let scheduler = Scheduler::new(SchedulerConfig {
    shards: 4,
    max_global: 10_000,
    max_per_tenant: 1_000,
    quantum: 5,
    quantum_by_tenant: HashMap::new(),
    quantum_provider: None,
    backpressure: BackpressurePolicy::Reject,
    backpressure_by_tenant: HashMap::new(),
    top_tenants_capacity: 64,
});

let tenant = TenantKey::from(42);
let task = Task {
    payload: "work",
    enqueue_ts: Instant::now(),
    deadline: None,
    priority: Priority::Normal,
    cost: 1,
};

match scheduler.enqueue(tenant, task) {
    EnqueueResult::Enqueued => {}
    EnqueueResult::Rejected(reason) => {
        eprintln!("rejected: {reason:?}");
    }
    EnqueueResult::Closed => {
        eprintln!("scheduler closed");
    }
}

match scheduler.dequeue_blocking() {
    DequeueResult::Task { tenant, task } => {
        println!("tenant={} payload={}", tenant.as_u64(), task.payload);
    }
    DequeueResult::Empty => {}
    DequeueResult::Closed => {}
}

let stats = scheduler.stats();
println!("enqueued={} dequeued={}", stats.enqueued, stats.dequeued);

Async usage (firq-async, Tokio)

Use this when producers/consumers are async and run under Tokio.

use firq_async::{
    AsyncScheduler, EnqueueResult, Priority, Scheduler, SchedulerConfig, Task, TenantKey,
};
use std::sync::Arc;
use std::time::Instant;

let core = Arc::new(Scheduler::new(SchedulerConfig::default()));
let scheduler = AsyncScheduler::new(core);

let tenant = TenantKey::from(7);
let task = Task {
    payload: "job",
    enqueue_ts: Instant::now(),
    deadline: None,
    priority: Priority::Normal,
    cost: 1,
};

match scheduler.enqueue(tenant, task) {
    EnqueueResult::Enqueued => {}
    EnqueueResult::Rejected(reason) => panic!("rejected: {reason:?}"),
    EnqueueResult::Closed => panic!("closed"),
}

// Recommended for steady consumers: dedicated worker mode.
let mut receiver = scheduler.receiver_with_worker(1024);
while let Some(item) = receiver.recv().await {
    println!(
        "tenant={} payload={}",
        item.tenant.as_u64(),
        item.task.payload
    );
}

// Fallback for one-off dequeue calls:
let _ = scheduler.dequeue_async().await;

Axum usage (firq-tower)

firq-tower provides a Tower layer that handles scheduling, cancellation before turn, in-flight gating, and rejection mapping.

use axum::{extract::Request, routing::get, Router};
use firq_tower::{Firq, TenantKey};

let firq_layer = Firq::new()
    .with_shards(4)
    .with_max_global(1000)
    .with_max_per_tenant(100)
    .with_quantum(10)
    .with_in_flight_limit(128)
    .with_deadline_extractor::<Request, _>(|req| {
        req.headers()
            .get("X-Deadline-Ms")
            .and_then(|h| h.to_str().ok())
            .and_then(|v| v.parse::<u64>().ok())
            .map(|ms| std::time::Instant::now() + std::time::Duration::from_millis(ms))
    })
    .build(|req: &Request| {
        req.headers()
            .get("X-Tenant-ID")
            .and_then(|h| h.to_str().ok())
            .and_then(|s| s.parse::<u64>().ok())
            .or_else(|| {
                req.headers()
                    .get("Authorization")
                    .and_then(|h| h.to_str().ok())
                    .and_then(|raw| raw.strip_prefix("Bearer "))
                    .and_then(|token| token.strip_prefix("tenant:"))
                    .and_then(|claim| claim.parse::<u64>().ok())
            })
            .map(TenantKey::from)
            .unwrap_or(TenantKey::from(0))
    });

let app = Router::new()
    .route("/", get(|| async { "ok" }))
    .layer(firq_layer);

Default rejection mapping:

  • TenantFull -> 429
  • GlobalFull -> 503
  • Timeout -> 503

Actix-web usage (manual scheduling gate)

Firq does not currently provide a first-party firq-actix crate.

Use firq-async in handlers or middleware to gate work before executing heavy logic:

use actix_web::{web, HttpRequest, HttpResponse};
use firq_async::{AsyncScheduler, DequeueResult, EnqueueResult, Priority, Task, TenantKey};
use std::time::Instant;

struct Permit;

#[derive(Clone)]
struct AppState {
    scheduler: AsyncScheduler<Permit>,
}

async fn guarded_handler(
    state: web::Data<AppState>,
    req: HttpRequest,
) -> actix_web::Result<HttpResponse> {
    let tenant = req
        .headers()
        .get("X-Tenant-ID")
        .and_then(|v| v.to_str().ok())
        .and_then(|v| v.parse::<u64>().ok())
        .map(TenantKey::from)
        .unwrap_or(TenantKey::from(0));

    let task = Task {
        payload: Permit,
        enqueue_ts: Instant::now(),
        deadline: None,
        priority: Priority::Normal,
        cost: 1,
    };

    match state.scheduler.enqueue(tenant, task) {
        EnqueueResult::Enqueued => {}
        EnqueueResult::Rejected(_) => return Ok(HttpResponse::TooManyRequests().finish()),
        EnqueueResult::Closed => return Ok(HttpResponse::ServiceUnavailable().finish()),
    }

    match state.scheduler.dequeue_async().await {
        DequeueResult::Task { .. } => Ok(HttpResponse::Ok().body("ok")),
        DequeueResult::Closed => Ok(HttpResponse::ServiceUnavailable().finish()),
        DequeueResult::Empty => Ok(HttpResponse::InternalServerError().finish()),
    }
}

Runnable Actix example in this repository:

  • crates/firq-examples/src/bin/actix_web.rs

Benchmarks

Run reproducible scenarios:

cargo run --release -p firq-bench

Quick smoke run (single scenario, short duration):

FIRQ_BENCH_SCENARIO=capacity_pressure FIRQ_BENCH_SECONDS=2 \
  cargo run --release -p firq-bench

No extra features are required for benchmark runs.

Scenarios in the benchmark binary include (real names):

  • hot_tenant_sustained
  • deadline_expiration
  • capacity_pressure

What to observe (without relying on fixed numbers):

  • Fairness: in hot_tenant_sustained, cold tenants should still be served.
  • Queue-time behavior: compare p95/p99 queue-time trends between schedulers.
  • Backpressure/expiry signals: in capacity_pressure and deadline_expiration, verify drop/reject/expired counters respond as load changes.

Use runs to compare parameter sets and regressions; do not treat a single run as a universal baseline.

Repository layout

  • crates/firq-core: scheduler engine.
  • crates/firq-async: Tokio adapter.
  • crates/firq-tower: Tower layer.
  • crates/firq-examples: runnable examples (publish = false).
  • crates/firq-bench: benchmark runner (publish = false).

Community and governance

  • Contribution guide: CONTRIBUTING.md
  • Security policy: SECURITY.md
  • Support channels: SUPPORT.md
  • Release process: RELEASING.md

Local quality gates

cargo fmt --all -- --check
cargo clippy --workspace --all-targets --all-features -- -D warnings
cargo test -p firq-core
cargo test -p firq-async
cargo test -p firq-tower --test integration
cargo check -p firq-examples --bins

Release dry-runs:

cargo publish --dry-run -p firq-core
# after firq-core is published on crates.io:
cargo publish --dry-run -p firq-async
# after firq-async is published on crates.io:
cargo publish --dry-run -p firq-tower

License

This project is dual-licensed under:

  • MIT (LICENSE-MIT)
  • Apache-2.0 (LICENSE-APACHE)