Expand description
A toolkit of distributed systems primitives for Rust, backed by Redis.
distkit (DISTributed system KIT) provides building blocks for distributed applications. The crate
currently offers three modules and they all run on the tokio runtime:
- Counters (
counterfeature, enabled by default) – two counter (strict and lax) implementations that share a common async trait, letting you choose between immediate consistency and high-throughput eventual consistency. - Instance-aware counters (
instance-aware-counterfeature) – counters where each running process owns a named slice of the total, with automatic cleanup of contributions from processes that stop heartbeating. For example if you have a cluster of servers where each server reports its active connection count, the cumulative is the cluster-wide total, when a server dies, the contribution is automatically subtracted. If the server was temporarily offline, the contribution is automatically added back when it comes back online. - Rate limiting (
trypemafeature, opt-in) – re-exports thetrypemacrate, providing sliding-window rate limiting with local, Redis-backed, and hybrid providers.
§Feature flags
| Feature | Default | Description |
|---|---|---|
counter | yes | Distributed counters (StrictCounter, LaxCounter) |
instance-aware-counter | no | Instance aware distributed counters (StrictInstanceAwareCounter, LaxInstanceAwareCounter) |
trypema | no | Rate limiting via the trypema crate |
§Quick start
// Servers sharing the same prefix coordinate through the same Redis keys.
let prefix = RedisKey::try_from("my_app".to_string())?;
let options = CounterOptions::new(prefix, conn);
// Strict: every call hits Redis immediately
let strict = StrictCounter::new(options.clone());
let key = RedisKey::try_from("page_views".to_string())?;
strict.inc(&key, 1).await?;
let total = strict.get(&key).await?;
// Lax: buffered in memory, flushed to Redis every ~20 ms
let lax = LaxCounter::new(options);
lax.inc(&key, 1).await?;
let approx = lax.get(&key).await?;§Choosing a counter
StrictCounter | LaxCounter | StrictInstanceAwareCounter | LaxInstanceAwareCounter | |
|---|---|---|---|---|
| Consistency | Immediate | Eventual (default: ~20 ms lag) | Immediate | Eventual (flush_interval lag) |
inc latency | Redis round-trip | Sub-microsecond (warm path) | Redis round-trip | Sub-microsecond (warm path) |
| Redis I/O | Every operation | Batched on interval | Every inc | Batched on interval |
set / del | Immediate | Immediate | Immediate (bumps epoch) | Flushes pending delta, then immediate |
| Per-instance tracking | No | No | Yes | Yes |
| Dead-instance cleanup | No | No | Yes | Yes |
| Feature flag | counter (default) | counter (default) | instance-aware-counter | instance-aware-counter |
| Use case | Billing, inventory, exact global count | Analytics, high-throughput metrics | Connection counts, exact live metrics | High-frequency per-node throughput metrics |
StrictCounter and LaxCounter implement CounterTrait. Both instance-aware
types implement InstanceAwareCounterTrait. Write generic code against either trait:
// Example: bumping a counter by 1 (strict or lax)
async fn bump<C: CounterTrait>(counter: &C, key: &RedisKey) -> Result<i64, DistkitError> {
counter.inc(key, 1).await
}async fn report_connection<C: InstanceAwareCounterTrait>(
counter: &C,
key: &RedisKey,
delta: i64,
) -> Result<i64, DistkitError> {
let (total, _mine) = counter.inc(key, delta).await?;
Ok(total)
}§Key types
RedisKey– A validated key string (non-empty, 255 chars max, no colons). Constructed viaTryFrom<String>.CounterOptions– Configuration bundle for counter construction. Carries a prefix, Redis connection, and theallowed_lagduration (default 20 ms). ImplementsClone, so the same options can be passed to both counter types.CounterTrait– The async trait that both counter types implement:inc,dec,get,set,del,clear.
§Error handling
All fallible operations return DistkitError:
InvalidRedisKey– Returned byRedisKey::try_fromwhen the input is empty, longer than 255 characters, or contains a colon.RedisError– A Redis operation failed (connection lost, script error, etc.). Wrapsredis::RedisError.CounterError– A counter operation failed (e.g., a batch flush could not commit state to Redis).MutexPoisoned– An internal lock was poisoned. This indicates a previous panic inside a critical section.CustomError– Catch-all for internal errors.TrypemaError– A rate-limiting operation failed (only present with thetrypemafeature).
§Instance-aware counters
Enable the instance-aware-counter feature to access per-instance distributed
counters.
[dependencies]
distkit = { version = "0.2", features = ["instance-aware-counter"] }Instance-aware counters track each running instance’s contribution separately.
The cumulative total is the sum of all live instances. When a instance stops
heartbeating for longer than dead_instance_threshold_ms (default 30 s), its
contribution is automatically subtracted from the cumulative on the next
operation by any surviving instance.
This makes them well-suited for:
- Connection pool sizing – each server reports its active connection count; the cumulative is the cluster-wide total.
- Live session counting – contributions disappear naturally when a node restarts or crashes.
- Per-node metrics – see both the global total and each instance’s slice.
§StrictInstanceAwareCounter
Every call is immediately consistent with Redis. set and del bump a
per-key epoch that causes stale instances to reset their stored count on
their next operation, preventing double-counting.
let prefix = RedisKey::try_from("my_app".to_string())?;
let counter = StrictInstanceAwareCounter::new(
StrictInstanceAwareCounterOptions::new(prefix, conn),
);
let key = RedisKey::try_from("connections".to_string())?;
// Increment this instance's contribution; returns (cumulative, instance_count).
let (total, mine) = counter.inc(&key, 5).await?;
// Decrement this instance's contribution.
let (total, mine) = counter.dec(&key, 2).await?;
// Read without modifying.
let (total, mine) = counter.get(&key).await?;
// Set this instance's slice to an exact value without bumping the epoch.
// Other instances are not affected.
let (total, mine) = counter.set_on_instance(&key, 10).await?;
// Set the global total to an exact value and bump the epoch, making all
// other instances' stored counts stale.
let (total, mine) = counter.set(&key, 100).await?;
// Remove only this instance's contribution (no epoch bump).
let (total, removed) = counter.del_on_instance(&key).await?;
// Delete the key globally and bump the epoch.
let (old_total, _) = counter.del(&key).await?;§Dead-instance cleanup
Each instance sends a heartbeat on every operation. If a process silently dies, surviving instances automatically remove its contribution the next time any of them touches the same key.
let prefix = RedisKey::try_from("my_app".to_string())?;
let key = RedisKey::try_from("connections".to_string())?;
// Two independent instances sharing the same prefix.
let opts = |conn| StrictInstanceAwareCounterOptions {
prefix: prefix.clone(),
connection_manager: conn,
dead_instance_threshold_ms: 30_000, // 30 s
};
let server_a = StrictInstanceAwareCounter::new(opts(conn1));
let server_b = StrictInstanceAwareCounter::new(opts(conn2));
server_a.inc(&key, 10).await?; // cumulative = 10
server_b.inc(&key, 5).await?; // cumulative = 15
// server_a goes offline. After 30 s, server_b's next call removes its
// contribution automatically.
let (total, _) = server_b.get(&key).await?; // total = 5 once cleaned up§LaxInstanceAwareCounter
A buffered wrapper around StrictInstanceAwareCounter. inc calls
accumulate locally and are flushed to the strict counter in bulk every
flush_interval (default 20 ms). Global operations (set, del, clear)
flush any pending delta first, then delegate immediately.
Use this when you have many inc/dec calls per second and can tolerate a
small consistency lag.
let prefix = RedisKey::try_from("my_app".to_string())?;
// options: LaxInstanceAwareCounterOptions::new(prefix, conn) would give the same result.
let counter = LaxInstanceAwareCounter::new(LaxInstanceAwareCounterOptions {
prefix,
connection_manager: conn,
dead_instance_threshold_ms: 30_000,
flush_interval: Duration::from_millis(20),
allowed_lag: Duration::from_millis(20),
});
let key = RedisKey::try_from("connections".to_string())?;
// Returns the local estimate immediately — no Redis round-trip on warm path.
let (local_total, mine) = counter.inc(&key, 1).await?;
// Decrement also stays local until flushed.
let (local_total, mine) = counter.dec(&key, 1).await?;
// get() also returns the local estimate (cumulative + pending delta).
// A fresh instance with no local state falls back to the strict counter.
let (total, mine) = counter.get(&key).await?;§Rate limiting (trypema)
Enable the trypema feature to access distributed rate limiting.
Trypema documentation website: https://trypema.davidoyinbo.com
[dependencies]
distkit = { version = "0.2", features = ["trypema"] }All public types from the trypema crate are
re-exported under distkit::trypema. The module provides:
- Sliding-window rate limiting with configurable window size and rate.
- Three providers – local (in-process via
DashMap), Redis-backed (distributed enforcement via Lua scripts), and hybrid (local fast-path with periodic Redis sync). - Two strategies – absolute (binary allow/reject) and suppressed (probabilistic degradation that smoothly ramps rejection probability).
§Local rate limiting (absolute)
Use the local provider for in-process rate limiting with sub-microsecond latency. The absolute strategy gives a deterministic allow/reject decision.
let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
local: LocalRateLimiterOptions {
window_size_seconds: WindowSizeSeconds::try_from(60).unwrap(),
rate_group_size_ms: RateGroupSizeMs::try_from(100).unwrap(),
hard_limit_factor: HardLimitFactor::default(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
},
}));
// Optional: start background cleanup for stale keys
rl.run_cleanup_loop();
let rate = RateLimit::try_from(10.0).unwrap(); // 10 requests per second
match rl.local().absolute().inc("user_123", &rate, 1) {
RateLimitDecision::Allowed => {
// Process the request
}
RateLimitDecision::Rejected { retry_after_ms, .. } => {
// Back off and retry later
eprintln!("Rate limited, retry in {retry_after_ms} ms");
}
_ => {}
}§Local rate limiting (suppressed)
The suppressed strategy smoothly ramps rejection probability as load approaches the limit, instead of a hard cutoff.
let rate = RateLimit::try_from(100.0).unwrap();
match rl.local().suppressed().inc("api_endpoint", &rate, 1) {
RateLimitDecision::Allowed => {
// Well under the limit
}
RateLimitDecision::Suppressed { is_allowed, suppression_factor } => {
if is_allowed {
// Approaching the limit but still admitted
tracing::info!("Suppression factor: {suppression_factor:.2}");
} else {
// Probabilistically rejected
}
}
RateLimitDecision::Rejected { .. } => {
// Over the hard limit (rate * hard_limit_factor)
}
}§Redis-backed rate limiting
For distributed enforcement across multiple processes or servers, use the Redis provider. Each call executes an atomic Lua script.
let window = WindowSizeSeconds::try_from(60)?;
let bucket = RateGroupSizeMs::try_from(100)?;
let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
local: LocalRateLimiterOptions {
window_size_seconds: window,
rate_group_size_ms: bucket,
hard_limit_factor: HardLimitFactor::default(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
},
redis: RedisRateLimiterOptions {
connection_manager: conn,
prefix: None,
window_size_seconds: window,
rate_group_size_ms: bucket,
hard_limit_factor: HardLimitFactor::default(),
suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
sync_interval_ms: SyncIntervalMs::default(),
},
}));
rl.run_cleanup_loop();
let key = RedisKey::try_from("user_123".to_string())?;
let rate = RateLimit::try_from(50.0)?;
// Distributed absolute enforcement
let decision = rl.redis().absolute().inc(&key, &rate, 1).await?;
// Or use the hybrid provider for local fast-path with Redis sync
let decision = rl.hybrid().absolute().inc(&key, &rate, 1).await?;See the trypema documentation for full API
details and advanced configuration.
Modules§
- counter
counter - Distributed counter implementations.
- icounter
instance-aware-counter - Instance-aware counters that track each running instance’s contribution separately.
- trypema
trypema - Rate limiting via the
trypemacrate.
Structs§
- Redis
Key - A validated Redis key.
Enums§
- Distkit
Error - Top-level error type for all distkit operations.