firq-async 0.1.3

Tokio async adapter for firq-core
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
# Firq — Multi-tenant Scheduler for Rust Services

[![Crates.io (firq-core)](https://img.shields.io/crates/v/firq-core.svg)](https://crates.io/crates/firq-core)
[![Crates.io (firq-async)](https://img.shields.io/crates/v/firq-async.svg)](https://crates.io/crates/firq-async)
[![Crates.io (firq-tower)](https://img.shields.io/crates/v/firq-tower.svg)](https://crates.io/crates/firq-tower)
[![Docs.rs (firq-core)](https://docs.rs/firq-core/badge.svg)](https://docs.rs/firq-core)
[![Docs.rs (firq-async)](https://docs.rs/firq-async/badge.svg)](https://docs.rs/firq-async)
[![Docs.rs (firq-tower)](https://docs.rs/firq-tower/badge.svg)](https://docs.rs/firq-tower)
[![License: MIT OR Apache-2.0](https://img.shields.io/badge/license-MIT%20OR%20Apache--2.0-blue.svg)](#license)

**Firq** is an in-process scheduler for Rust backends that need stable tail latency under contention.

Key capabilities:

- Fair scheduling across tenants (DRR).
- Explicit backpressure and bounded memory behavior.
- Deadline-aware dequeue (`DropExpired` semantics).
- Metrics for queue saturation, drops, rejections, and queue-time percentiles.

`firq-core` is runtime-agnostic. `firq-async` adds Tokio support. `firq-tower` integrates with Tower/Axum.

## Install

### From crates.io

```toml
[dependencies]
firq-core = "0.1.3"
```

Tokio integration:

```toml
[dependencies]
firq-core = "0.1.3"
firq-async = "0.1.3"
```

Tower/Axum integration:

```toml
[dependencies]
firq-core = "0.1.3"
firq-async = "0.1.3"
firq-tower = "0.1.3"
```

Optional metrics helpers (`firq_core::prometheus`) are behind feature `metrics` (enabled by default):

```toml
[dependencies]
firq-core = "0.1.3"
```

Disable metrics helpers if you only need scheduling primitives:

```toml
[dependencies]
firq-core = { version = "0.1.3", default-features = false }
```

### From source

```bash
git clone https://github.com/saulhs12/firq.git
cd firq
cargo build --workspace
```

## Quick start

```bash
cargo test -p firq-core
cargo test -p firq-async
cargo test -p firq-tower --test integration
cargo check -p firq-examples --bins
cargo run -p firq-examples --bin async
cargo run -p firq-examples --bin async_worker
```

## Stability / SemVer

- Firq is currently in `0.x`; minor releases may include API-breaking changes.
- Breaking changes include removing/renaming public types/functions, changing enum variants, or changing behavior in a way that requires code changes.
- Patch releases (`0.1.z`) aim to be backward compatible and focus on fixes/hardening.
- For production, pin a concrete release (`=0.1.3`) or a conservative range (`~0.1.3`) and review `CHANGELOG.md` before upgrades.
- MSRV: Rust `1.85+` (`rust-version = "1.85"` in `firq-core`, `firq-async`, and `firq-tower`).

## Docs & examples

- [`firq-core`]https://docs.rs/firq-core: core scheduler API and crate-level minimal example. `cargo add firq-core@0.1.3`
- [`firq-async`]https://docs.rs/firq-async: Tokio adapter and worker-backed consumer example. `cargo add firq-async@0.1.3`
- [`firq-tower`]https://docs.rs/firq-tower: Tower/Axum layer with header-based tenant extraction. `cargo add firq-tower@0.1.3`
- All three crates include copyable crate-level examples in `lib.rs` docs.

## Scheduler guarantees and non-guarantees

Guarantees:

- DRR scheduling gives active tenants recurring turns (no permanent starvation for runnable tenants).
- Queue limits (`max_global`, `max_per_tenant`) bound live pending work.
- Cancellation and deadline expiry are reclaimed lazily and should not permanently consume capacity.
- `stats()` counters are monotonic snapshots suitable for alerting and regression checks.
- Fairness is cost-weighted: each dequeue charges `task.cost` against tenant deficit, refilled by `quantum`.
- Priority is strict (`High` -> `Normal` -> `Low`) in dispatch order, so sustained `High` traffic can delay lower-priority work.

Non-guarantees:

- No strict global FIFO ordering across tenants.
- No cross-process fairness guarantee (scheduler is per-process/in-memory).
- No hard latency SLA by itself; tune `quantum`, `cost`, capacity, and worker parallelism with production traffic.

## When to use / not use

Use Firq when:

- You need in-process multi-tenant fairness and backpressure in a Rust service.
- You want deadline-aware admission/dispatch and explicit queue saturation signals.
- You are integrating with sync workers, Tokio, or Tower/Axum middleware.

Do not use Firq when:

- You need durable or distributed queue semantics.
- You need fairness across multiple processes/nodes by itself.
- You need persistent job orchestration/retries as the primary concern.

## Scheduling flow

```text
producers -> enqueue(tenant, priority, cost, deadline)
          -> sharded per-tenant priority queues
          -> DRR selection (cost vs quantum)
          -> dequeue
          -> worker pool / async consumer / tower layer
```

## How to choose parameters

Use these as starting points, then tune with real traffic and `stats()` metrics.

- `shards`:
  Start with `min(physical_cores, 8)`. Increase when many tenants are hot concurrently and enqueue lock contention is visible.
- `max_global` and `max_per_tenant`:
  Size from memory budget first. Approximate memory as `max_global * avg_task_size`. Keep `max_per_tenant` low enough that one tenant cannot monopolize memory.
- `quantum` and `cost`:
  Treat `cost` as "work units" per task, and `quantum` as work units granted per round. If heavy jobs are starving, increase `quantum` or lower heavy-job `cost` calibration.
- `deadlines`:
  Use when stale work should be discarded (timeouts/SLO breaches). Expired items are removed lazily and should not permanently consume queue limits.
- backpressure policy:
  `Reject` for strict admission control, `DropOldestPerTenant`/`DropNewestPerTenant` for lossy workloads, `Timeout` when producers can wait briefly for capacity.

Queue limits are enforced against live pending work. Cancelled/expired entries are compacted lazily on dequeue and enqueue maintenance passes.

Starting profiles (adjust with production traffic):

- Public API: `Reject` or short `Timeout` (5-20ms), conservative `max_per_tenant`, `in_flight_limit` near available CPU parallelism.
- Internal RPC/workers: `Reject` with moderate `max_per_tenant`, calibrate `cost` for heavy endpoints, increase `quantum` if heavy tasks starve.
- Lossy telemetry/batch edge: `DropOldestPerTenant` or `DropNewestPerTenant` based on whether freshness or completeness matters more.

## Core usage (`firq-core`)

Use this when workers are synchronous (threads) or when direct control over dequeue loops is required.

```rust
use firq_core::{
    BackpressurePolicy, DequeueResult, EnqueueResult, Priority, Scheduler, SchedulerConfig, Task,
    TenantKey,
};
use std::collections::HashMap;
use std::time::Instant;

let scheduler = Scheduler::new(SchedulerConfig {
    shards: 4,
    max_global: 10_000,
    max_per_tenant: 1_000,
    quantum: 5,
    quantum_by_tenant: HashMap::new(),
    quantum_provider: None,
    backpressure: BackpressurePolicy::Reject,
    backpressure_by_tenant: HashMap::new(),
    top_tenants_capacity: 64,
});

let tenant = TenantKey::from(42);
let task = Task {
    payload: "work",
    enqueue_ts: Instant::now(),
    deadline: None,
    priority: Priority::Normal,
    cost: 1,
};

match scheduler.enqueue(tenant, task) {
    EnqueueResult::Enqueued => {}
    EnqueueResult::Rejected(reason) => {
        eprintln!("rejected: {reason:?}");
    }
    EnqueueResult::Closed => {
        eprintln!("scheduler closed");
    }
}

match scheduler.dequeue_blocking() {
    DequeueResult::Task { tenant, task } => {
        println!("tenant={} payload={}", tenant.as_u64(), task.payload);
    }
    DequeueResult::Empty => {}
    DequeueResult::Closed => {}
}

let stats = scheduler.stats();
println!("enqueued={} dequeued={}", stats.enqueued, stats.dequeued);
```

## Async usage (`firq-async`, Tokio)

Use this when producers/consumers are async and run under Tokio.

```rust
use firq_async::{
    AsyncScheduler, EnqueueResult, Priority, Scheduler, SchedulerConfig, Task, TenantKey,
};
use std::sync::Arc;
use std::time::Instant;

let core = Arc::new(Scheduler::new(SchedulerConfig::default()));
let scheduler = AsyncScheduler::new(core);

let tenant = TenantKey::from(7);
let task = Task {
    payload: "job",
    enqueue_ts: Instant::now(),
    deadline: None,
    priority: Priority::Normal,
    cost: 1,
};

match scheduler.enqueue(tenant, task) {
    EnqueueResult::Enqueued => {}
    EnqueueResult::Rejected(reason) => panic!("rejected: {reason:?}"),
    EnqueueResult::Closed => panic!("closed"),
}

// Recommended for steady consumers: dedicated worker mode.
let mut receiver = scheduler.receiver_with_worker(1024);
while let Some(item) = receiver.recv().await {
    println!(
        "tenant={} payload={}",
        item.tenant.as_u64(),
        item.task.payload
    );
}

// Fallback for one-off dequeue calls:
let _ = scheduler.dequeue_async().await;
```

## Axum usage (`firq-tower`)

`firq-tower` provides a Tower layer that handles scheduling, cancellation before turn, in-flight gating, and rejection mapping.

```rust
use axum::{extract::Request, routing::get, Router};
use firq_tower::{Firq, TenantKey};

let firq_layer = Firq::new()
    .with_shards(4)
    .with_max_global(1000)
    .with_max_per_tenant(100)
    .with_quantum(10)
    .with_in_flight_limit(128)
    .with_deadline_extractor::<Request, _>(|req| {
        req.headers()
            .get("X-Deadline-Ms")
            .and_then(|h| h.to_str().ok())
            .and_then(|v| v.parse::<u64>().ok())
            .map(|ms| std::time::Instant::now() + std::time::Duration::from_millis(ms))
    })
    .build(|req: &Request| {
        req.headers()
            .get("X-Tenant-ID")
            .and_then(|h| h.to_str().ok())
            .and_then(|s| s.parse::<u64>().ok())
            .or_else(|| {
                req.headers()
                    .get("Authorization")
                    .and_then(|h| h.to_str().ok())
                    .and_then(|raw| raw.strip_prefix("Bearer "))
                    .and_then(|token| token.strip_prefix("tenant:"))
                    .and_then(|claim| claim.parse::<u64>().ok())
            })
            .map(TenantKey::from)
            .unwrap_or(TenantKey::from(0))
    });

let app = Router::new()
    .route("/", get(|| async { "ok" }))
    .layer(firq_layer);
```

Default rejection mapping:

- `TenantFull` -> `429`
- `GlobalFull` -> `503`
- `Timeout` -> `503`

## Actix-web usage (manual scheduling gate)

Firq does not currently provide a first-party `firq-actix` crate.

Use `firq-async` in handlers or middleware to gate work before executing heavy logic:

```rust
use actix_web::{web, HttpRequest, HttpResponse};
use firq_async::{AsyncScheduler, DequeueResult, EnqueueResult, Priority, Task, TenantKey};
use std::time::Instant;

struct Permit;

#[derive(Clone)]
struct AppState {
    scheduler: AsyncScheduler<Permit>,
}

async fn guarded_handler(
    state: web::Data<AppState>,
    req: HttpRequest,
) -> actix_web::Result<HttpResponse> {
    let tenant = req
        .headers()
        .get("X-Tenant-ID")
        .and_then(|v| v.to_str().ok())
        .and_then(|v| v.parse::<u64>().ok())
        .map(TenantKey::from)
        .unwrap_or(TenantKey::from(0));

    let task = Task {
        payload: Permit,
        enqueue_ts: Instant::now(),
        deadline: None,
        priority: Priority::Normal,
        cost: 1,
    };

    match state.scheduler.enqueue(tenant, task) {
        EnqueueResult::Enqueued => {}
        EnqueueResult::Rejected(_) => return Ok(HttpResponse::TooManyRequests().finish()),
        EnqueueResult::Closed => return Ok(HttpResponse::ServiceUnavailable().finish()),
    }

    match state.scheduler.dequeue_async().await {
        DequeueResult::Task { .. } => Ok(HttpResponse::Ok().body("ok")),
        DequeueResult::Closed => Ok(HttpResponse::ServiceUnavailable().finish()),
        DequeueResult::Empty => Ok(HttpResponse::InternalServerError().finish()),
    }
}
```

Runnable Actix example in this repository:

- `crates/firq-examples/src/bin/actix_web.rs`

## Benchmarks

Run reproducible scenarios:

```bash
cargo run --release -p firq-bench
```

Quick smoke run (single scenario, short duration):

```bash
FIRQ_BENCH_SCENARIO=capacity_pressure FIRQ_BENCH_SECONDS=2 \
  cargo run --release -p firq-bench
```

No extra features are required for benchmark runs.

Scenarios in the benchmark binary include (real names):

- `hot_tenant_sustained`
- `deadline_expiration`
- `capacity_pressure`

What to observe (without relying on fixed numbers):

- Fairness: in `hot_tenant_sustained`, cold tenants should still be served.
- Queue-time behavior: compare p95/p99 queue-time trends between schedulers.
- Backpressure/expiry signals: in `capacity_pressure` and `deadline_expiration`, verify drop/reject/expired counters respond as load changes.

Use runs to compare parameter sets and regressions; do not treat a single run as a universal baseline.

## Repository layout

- `crates/firq-core`: scheduler engine.
- `crates/firq-async`: Tokio adapter.
- `crates/firq-tower`: Tower layer.
- `crates/firq-examples`: runnable examples (`publish = false`).
- `crates/firq-bench`: benchmark runner (`publish = false`).

## Community and governance

- Contribution guide: `CONTRIBUTING.md`
- Security policy: `SECURITY.md`
- Support channels: `SUPPORT.md`
- Release process: `RELEASING.md`

## Local quality gates

```bash
cargo fmt --all -- --check
cargo clippy --workspace --all-targets --all-features -- -D warnings
cargo test -p firq-core
cargo test -p firq-async
cargo test -p firq-tower --test integration
cargo check -p firq-examples --bins
```

Release dry-runs:

```bash
cargo publish --dry-run -p firq-core
# after firq-core is published on crates.io:
cargo publish --dry-run -p firq-async
# after firq-async is published on crates.io:
cargo publish --dry-run -p firq-tower
```


## License

This project is dual-licensed under:

- MIT (`LICENSE-MIT`)
- Apache-2.0 (`LICENSE-APACHE`)