distkit
A toolkit of distributed systems primitives for Rust, backed by Redis.
What is distkit?
distkit provides building blocks for distributed applications. It ships distributed counters (strict and lax), instance-aware counters, and rate limiting, all backed by Redis.
Features
- StrictCounter -- every operation executes a Redis Lua script atomically. Reads always reflect the latest write. Best for billing, inventory, or anything where accuracy is critical.
- LaxCounter -- buffers increments in memory and flushes to Redis every ~20 ms. Sub-microsecond latency on the hot path. Best for analytics and high-throughput metrics.
- Instance-aware counters -- each running instance owns a named slice of the total, with automatic cleanup of contributions from instances that stop heartbeating.
- Rate limiting (opt-in
trypemafeature) -- sliding-window rate limiting with local, Redis-backed, and hybrid providers. Supports absolute and probabilistic suppression strategies. - Safe by default --
#![forbid(unsafe_code)], no panics in library code.
Feature flags
| Feature | Default | Description |
|---|---|---|
counter |
yes | Distributed counters (StrictCounter, LaxCounter) |
instance-aware-counter |
no | Per-instance counters (StrictInstanceAwareCounter, LaxInstanceAwareCounter) |
trypema |
no | Rate limiting via the trypema crate |
Installation
Or add to Cargo.toml:
[]
= "0.2"
To enable instance-aware counters or rate limiting:
[]
= { = "0.2", = ["instance-aware-counter", "trypema"] }
distkit requires a running Redis instance (5.0+ for Lua script support).
Quick start
use ;
async
Counter types
StrictCounter
Every call is a single Redis round-trip executing an atomic Lua script. The counter value is always authoritative.
let key = try_from?;
strict.inc.await?; // HINCRBY via Lua
strict.set.await?; // HSET via Lua
strict.del.await?; // HDEL, returns old value
strict.clear.await?; // DEL on the hash
LaxCounter
Writes are buffered in a local DashMap and flushed to Redis in batched
pipelines every allowed_lag (default 20 ms). Reads return the local view
(remote_total + pending_delta), which is always consistent within the same
process.
let key = try_from?;
lax.inc.await?; // local atomic add, sub-microsecond
let val = lax.get.await?; // reads local state, no Redis hit
A background Tokio task handles flushing. It holds a Weak reference to the
counter, so it stops automatically when the counter is dropped.
Choosing a counter
StrictCounter |
LaxCounter |
StrictInstanceAwareCounter |
LaxInstanceAwareCounter |
|
|---|---|---|---|---|
| Consistency | Immediate | Eventual (default: ~20 ms lag) | Immediate | Eventual (flush_interval lag) |
inc latency |
Redis round-trip | Sub-microsecond (warm path) | Redis round-trip | Sub-microsecond (warm path) |
| Redis I/O | Every operation | Batched on interval | Every inc |
Batched on interval |
set / del |
Immediate | Immediate | Immediate (bumps epoch) | Flushes pending delta, then immediate |
| Per-instance tracking | No | No | Yes | Yes |
| Dead-instance cleanup | No | No | Yes | Yes |
| Feature flag | counter (default) |
counter (default) |
instance-aware-counter |
instance-aware-counter |
| Use case | Billing, inventory, exact global count | Analytics, high-throughput metrics | Connection counts, exact live metrics | High-frequency per-node throughput metrics |
Instance-aware counters
Enable the instance-aware-counter feature:
[]
= { = "0.2", = ["instance-aware-counter"] }
Instance-aware counters track each running instance's contribution separately.
The cumulative total is the sum of all live instances. When an instance stops
heartbeating for longer than dead_instance_threshold_ms (default 30 s), its
contribution is automatically subtracted from the cumulative on the next
operation by any surviving instance.
This makes them well-suited for:
- Connection pool sizing -- each server reports its active connection count; the cumulative is the cluster-wide total.
- Live session counting -- contributions disappear naturally when a node restarts or crashes.
- Per-node metrics -- see both the global total and each instance's slice.
StrictInstanceAwareCounter
Every call is immediately consistent with Redis. set and del bump a
per-key epoch that causes stale instances to reset their stored count on
their next operation, preventing double-counting.
use ;
use RedisKey;
let client = open?;
let conn = client.get_connection_manager.await?;
let prefix = try_from?;
let counter = new;
let key = try_from?;
// Increment this instance's contribution; returns (cumulative, instance_count).
let = counter.inc.await?;
// Decrement this instance's contribution.
let = counter.dec.await?;
// Read without modifying.
let = counter.get.await?;
// Set this instance's slice to an exact value without bumping the epoch.
let = counter.set_on_instance.await?;
// Set the global total to an exact value and bump the epoch.
let = counter.set.await?;
// Remove only this instance's contribution.
let = counter.del_on_instance.await?;
// Delete the key globally and bump the epoch.
let = counter.del.await?;
Dead-instance cleanup
Each instance sends a heartbeat on every operation. If a process silently dies, surviving instances automatically remove its contribution the next time any of them touches the same key.
use ;
use RedisKey;
let client = open?;
let conn1 = client.get_connection_manager.await?;
let conn2 = client.get_connection_manager.await?;
let prefix = try_from?;
let key = try_from?;
let opts = ;
let server_a = new;
let server_b = new;
server_a.inc.await?; // cumulative = 10
server_b.inc.await?; // cumulative = 15
// server_a goes offline. After 30 s, server_b's next call removes its
// contribution automatically.
let = server_b.get.await?; // total = 5 once cleaned up
LaxInstanceAwareCounter
A buffered wrapper around StrictInstanceAwareCounter. inc calls accumulate
locally and are flushed to the strict counter in bulk every flush_interval
(default 20 ms). Global operations (set, del, clear) flush any pending
delta first, then delegate immediately.
Use this when you have many inc/dec calls per second and can tolerate a
small consistency lag.
use ;
use RedisKey;
use Duration;
let client = open?;
let conn = client.get_connection_manager.await?;
let prefix = try_from?;
let counter = new;
let key = try_from?;
// Returns the local estimate immediately — no Redis round-trip on warm path.
let = counter.inc.await?;
// Decrement also stays local until flushed.
let = counter.dec.await?;
// get() also returns the local estimate (cumulative + pending delta).
let = counter.get.await?;
Rate limiting (trypema)
Enable the trypema feature to access sliding-window rate limiting.
Trypema documentation website: https://trypema.davidoyinbo.com
[]
= { = "0.2", = ["trypema"] }
All public types from the trypema crate are
re-exported under distkit::trypema. The module provides:
- Sliding-window rate limiting with configurable window size and rate.
- Three providers -- local (in-process), Redis-backed (distributed), and hybrid (local fast-path with periodic Redis sync).
- Two strategies -- absolute (binary allow/reject) and suppressed (probabilistic degradation that smoothly ramps rejection probability).
Local rate limiting (absolute)
use Arc;
use ;
let rl = new;
rl.run_cleanup_loop;
let rate = try_from.unwrap; // 10 requests per second
match rl.local.absolute.inc
Redis-backed rate limiting
For distributed enforcement across multiple processes or servers:
use Arc;
use ;
let client = open?;
let conn = client.get_connection_manager.await?;
let window = try_from?;
let bucket = try_from?;
let rl = new;
rl.run_cleanup_loop;
let key = try_from?;
let rate = try_from?;
// Distributed absolute enforcement
let decision = rl.redis.absolute.inc.await?;
// Or use the hybrid provider for local fast-path with Redis sync
let decision = rl.hybrid.absolute.inc.await?;
See the trypema documentation for full API details and advanced configuration.
Development
Prerequisites
- Rust (latest stable)
- Docker (for the test Redis instance)
Commands
Tests and benchmarks require the REDIS_URL environment variable.
The make targets set this automatically.
License
MIT