Skip to main content

Crate distkit

Crate distkit 

Source
Expand description

A toolkit of distributed systems primitives for Rust, backed by Redis.

distkit (DISTributed system KIT) provides building blocks for distributed applications. The crate currently offers four modules and they all run on the tokio runtime:

  • Counters (counter feature, enabled by default) – two counter (strict and lax) implementations that share a common async trait, letting you choose between immediate consistency and high-throughput eventual consistency.
  • Instance-aware counters (instance-aware-counter feature) – counters where each running process owns a named slice of the total, with automatic cleanup of contributions from processes that stop heartbeating. For example if you have a cluster of servers where each server reports its active connection count, the cumulative is the cluster-wide total, when a server dies, the contribution is automatically subtracted. If the server was temporarily offline, the contribution is automatically added back when it comes back online.
  • Distributed locks (lock feature, opt-in) – Redis-backed Mutex and RwLock whose surfaces mirror tokio::sync::Mutex / tokio::sync::RwLock. Acquire returns an RAII guard; the lease is renewed in the background and released on drop.
  • Rate limiting (trypema feature, opt-in) – re-exports the trypema crate, providing sliding-window rate limiting with local, Redis-backed, and hybrid providers.

§Feature flags

FeatureDefaultDescription
counteryesDistributed counters (StrictCounter, LaxCounter)
instance-aware-counternoInstance aware distributed counters (StrictInstanceAwareCounter, LaxInstanceAwareCounter)
locknoDistributed Mutex and RwLock
trypemanoRate limiting via the trypema crate

§Quick start

// Servers sharing the same prefix coordinate through the same Redis keys.
let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let options = CounterOptions::new(prefix, conn);

// Strict: every call hits Redis immediately
let strict = StrictCounter::new(options.clone());
let key = DistkitRedisKey::try_from("page_views".to_string())?;
strict.inc(&key, 1).await?;
let total = strict.get(&key).await?;

// Lax: buffered in memory, flushed to Redis every ~20 ms
let lax = LaxCounter::new(options);
lax.inc(&key, 1).await?;
let approx = lax.get(&key).await?;

§Choosing a counter

StrictCounterLaxCounterStrictInstanceAwareCounterLaxInstanceAwareCounter
ConsistencyImmediateEventual (default: ~20 ms lag)ImmediateEventual (flush_interval lag)
inc latencyRedis round-tripSub-microsecond (warm path)Redis round-tripSub-microsecond (warm path)
Redis I/OEvery operationBatched on intervalEvery incBatched on interval
set / delImmediateImmediateImmediate (bumps epoch)Flushes pending delta, then immediate
Per-instance trackingNoNoYesYes
Dead-instance cleanupNoNoYesYes
Feature flagcounter (default)counter (default)instance-aware-counterinstance-aware-counter
Use caseBilling, inventory, exact global countAnalytics, high-throughput metricsConnection counts, exact live metricsHigh-frequency per-node throughput metrics

StrictCounter and LaxCounter implement CounterTrait. Both instance-aware types implement InstanceAwareCounterTrait. Write generic code against either trait:

// Example: bumping a counter by 1 (strict or lax)
async fn bump<C: CounterTrait>(counter: &C, key: &DistkitRedisKey) -> Result<i64, DistkitError> {
    counter.inc(key, 1).await
}
async fn report_connection<C: InstanceAwareCounterTrait>(
    counter: &C,
    key: &DistkitRedisKey,
    delta: i64,
) -> Result<i64, DistkitError> {
    let (total, _mine) = counter.inc(key, delta).await?;
    Ok(total)
}

§Key types

  • DistkitRedisKey – A validated key string (non-empty, 255 chars max, no colons), with helpers like new, new_or_panic, and try_sanitize.
  • CounterComparator – The comparison operator used by conditional writes: Eq, Lt, Gt, Ne, or Nil.
  • CounterOptions – Configuration bundle for counter construction. Carries a prefix, Redis connection, and the allowed_lag duration (default 20 ms). Implements Clone, so the same options can be passed to both counter types.
  • CounterTrait – The async trait that both counter types implement: inc, inc_if, dec, get, set, set_if, del, clear, and multi-key helpers including inc_all_if and set_all_if.

§Conditional writes

Use CounterComparator with the *_if methods to apply a write only when the current value matches a condition. Failed comparisons return the current value unchanged.

let key = DistkitRedisKey::try_from("orders".to_string())?;
counter.set(&key, 10).await?;

assert_eq!(
    counter.inc_if(&key, CounterComparator::Eq(10), 5).await?,
    (15, 10)
);
assert_eq!(
    counter.set_if(&key, CounterComparator::Gt(20), 99).await?,
    (15, 15)
);
assert_eq!(
    counter
        .inc_all_if(&[
            (&key, CounterComparator::Eq(15), 2),
            (&key, CounterComparator::Nil, 3),
        ])
        .await?,
    vec![(&key, 17, 15), (&key, 20, 17)]
);

§Error handling

All fallible operations return DistkitError:

  • InvalidRedisKey – Returned by DistkitRedisKey::try_from when the input is empty, longer than 255 characters, or contains a colon.
  • RedisError – A Redis operation failed (connection lost, script error, etc.). Wraps redis::RedisError.
  • CounterError – A counter operation failed (e.g., a batch flush could not commit state to Redis).
  • MutexPoisoned – An internal lock was poisoned. This indicates a previous panic inside a critical section.
  • CustomError – Catch-all for internal errors.
  • LockError – A lock operation failed (only present with the lock feature): AcquireFail (a non-blocking acquire would block), Timeout (a bounded acquire ran out the clock), NotOwner, or the InvalidTtl / InvalidOwner validation errors.
  • TrypemaError – A rate-limiting operation failed (only present with the trypema feature).

§Instance-aware counters

Enable the instance-aware-counter feature to access per-instance distributed counters.

[dependencies]
distkit = { version = "0.4", features = ["instance-aware-counter"] }

Instance-aware counters track each running instance’s contribution separately. The cumulative total is the sum of all live instances. When a instance stops heartbeating for longer than dead_instance_threshold_ms (default 30 s), its contribution is automatically subtracted from the cumulative on the next operation by any surviving instance.

This makes them well-suited for:

  • Connection pool sizing – each server reports its active connection count; the cumulative is the cluster-wide total.
  • Live session counting – contributions disappear naturally when a node restarts or crashes.
  • Per-node metrics – see both the global total and each instance’s slice.

Conditional instance-aware writes follow the same pattern:

  • inc_if and set_if compare against the cumulative total.
  • set_on_instance_if and set_all_on_instance_if compare against this instance’s current slice.

§StrictInstanceAwareCounter

Every call is immediately consistent with Redis. set and del bump a per-key epoch that causes stale instances to reset their stored count on their next operation, preventing double-counting.

let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let counter = StrictInstanceAwareCounter::new(
    StrictInstanceAwareCounterOptions::new(prefix, conn),
);

let key = DistkitRedisKey::try_from("connections".to_string())?;

// Increment this instance's contribution; returns (cumulative, instance_count).
let (total, mine) = counter.inc(&key, 5).await?;

// Decrement this instance's contribution.
let (total, mine) = counter.dec(&key, 2).await?;

// Read without modifying.
let (total, mine) = counter.get(&key).await?;

// Set this instance's slice to an exact value without bumping the epoch.
// Other instances are not affected.
let (total, mine) = counter.set_on_instance(&key, 10).await?;

// Set the global total to an exact value and bump the epoch, making all
// other instances' stored counts stale.
let (total, mine) = counter.set(&key, 100).await?;

// Remove only this instance's contribution (no epoch bump).
let (total, removed) = counter.del_on_instance(&key).await?;

// Delete the key globally and bump the epoch.
let (old_total, _) = counter.del(&key).await?;

§Dead-instance cleanup

Each instance sends a heartbeat on every operation. If a process silently dies, surviving instances automatically remove its contribution the next time any of them touches the same key.

let prefix = DistkitRedisKey::try_from("my_app".to_string())?;
let key = DistkitRedisKey::try_from("connections".to_string())?;

// Two independent instances sharing the same prefix.
let opts = |conn| StrictInstanceAwareCounterOptions {
    prefix: prefix.clone(),
    connection_manager: conn,
    dead_instance_threshold_ms: 30_000, // 30 s
};
let server_a = StrictInstanceAwareCounter::new(opts(conn1));
let server_b = StrictInstanceAwareCounter::new(opts(conn2));

server_a.inc(&key, 10).await?; // cumulative = 10
server_b.inc(&key,  5).await?; // cumulative = 15

// server_a goes offline. After 30 s, server_b's next call removes its
// contribution automatically.
let (total, _) = server_b.get(&key).await?; // total = 5 once cleaned up

§LaxInstanceAwareCounter

A buffered wrapper around StrictInstanceAwareCounter. inc calls accumulate locally and are flushed to the strict counter in bulk every flush_interval (default 20 ms). Global operations (set, del, clear) flush any pending delta first, then delegate immediately.

Use this when you have many inc/dec calls per second and can tolerate a small consistency lag.

let prefix = DistkitRedisKey::try_from("my_app".to_string())?;

// options: LaxInstanceAwareCounterOptions::new(prefix, conn) would give the same result.
let counter = LaxInstanceAwareCounter::new(LaxInstanceAwareCounterOptions {
    prefix,
    connection_manager: conn,
    dead_instance_threshold_ms: 30_000,
    flush_interval: Duration::from_millis(20),
    allowed_lag:    Duration::from_millis(20),
});

let key = DistkitRedisKey::try_from("connections".to_string())?;

// Returns the local estimate immediately — no Redis round-trip on warm path.
let (local_total, mine) = counter.inc(&key, 1).await?;

// Decrement also stays local until flushed.
let (local_total, mine) = counter.dec(&key, 1).await?;

// get() also returns the local estimate (cumulative + pending delta).
// A fresh instance with no local state falls back to the strict counter.
let (total, mine) = counter.get(&key).await?;

§Distributed locks

Enable the lock feature to access Redis-backed distributed locks.

[dependencies]
distkit = { version = "0.4", features = ["lock"] }

Mutex (mutual exclusion) and RwLock (reader-writer) mirror the surface of tokio::sync::Mutex / tokio::sync::RwLock as closely as a network lock allows. Both are built from LockOptions (defaults: ttl 30 s, a fresh UUID v4 owner, max_wait None = wait until acquired, retry_interval 50 ms); use LockOptionsBuilder (via LockOptions::builder) to override fields. One lock object owns exactly one resource — the key and owner are bound at construction.

Unlike tokio’s locks, the guards hold no inner data — they are pure access tokens, like tokio::Mutex<()>. A held lock renews its lease in the background every ttl/3; releasing happens on Drop (best-effort, fire-and-forget) or via an explicit awaitable release() that returns the final LockGuardState. Because the lease lives in Redis, not in this process, code whose correctness depends on the lock should re-check get_state (no Redis round-trip) before its critical section rather than trusting the guard’s existence alone. Each guard also exposes get_on_attempt — the zero-based acquire poll that won the lock (0 on the first try, higher when it waited through contention) — handy for metrics and backoff tuning.

§Mutex

let redis_url = std::env::var("REDIS_URL")
    .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let client = redis::Client::open(redis_url)?;
let conn = client.get_connection_manager().await?;

let key = DistkitRedisKey::try_from("invoice_42".to_string())?;
let mutex = Mutex::new(LockOptions::new(key, conn));

// One non-blocking attempt; Err(LockError::AcquireFail) if already held.
let guard = mutex.try_lock().await?;

// The lease is renewed in the background. Re-check before relying on it.
if guard.get_state() == LockGuardState::Acquired {
    // ... critical section ...
}

// Which acquire attempt won the lock: 0 on the first poll, higher under contention.
let _attempts = guard.get_on_attempt();

// Release and observe the final state (or just drop the guard).
let state = guard.release().await?;
assert_eq!(state, LockGuardState::Released);

mutex.lock().await? waits up to max_wait (forever when None), and mutex.try_lock_with_timeout(timeout).await? waits a bounded time — polling at the lock’s configured retry_interval — and returns LockError::Timeout if the deadline passes first. mutex.try_lock_with_retries(max_retries).await? instead bounds by attempt count, polling at the same retry_interval and returning LockError::RetriesExhausted after max_retries retries. (try_lock_for(timeout, retry_interval) is deprecated in favor of try_lock_with_timeout.)

§RwLock

Many readers may hold the lock at once; a writer holds it alone. The lock is writer-preferring: while a writer is waiting, new readers are turned away so the writer is not starved (the flip side is possible reader starvation under constant writers).

let redis_url = std::env::var("REDIS_URL")
    .unwrap_or_else(|_| "redis://127.0.0.1:6379".to_string());
let client = redis::Client::open(redis_url)?;
let conn = client.get_connection_manager().await?;

let key = DistkitRedisKey::try_from("config_blob".to_string())?;
let rw = RwLock::new(LockOptions::new(key, conn));

// Shared read access — multiple readers coexist.
let r1 = rw.try_read().await?;
let r2 = rw.try_read().await?;
r1.release().await?;
r2.release().await?;

// Exclusive write access — waits for all readers/the writer to clear.
let w = rw.write().await?;
w.release().await?;

Each side has the same forms: waiting (read / write), non-blocking (try_read / try_write), time-bounded (try_read_with_timeout / try_write_with_timeout), and retry-bounded (try_read_with_retries / try_write_with_retries). The bounded forms poll at the lock’s configured retry_interval; the retry-bounded forms return LockError::RetriesExhausted after max_retries retries. (The older try_read_for / try_write_for forms are deprecated.)

§Rate limiting (trypema)

Enable the trypema feature to access distributed rate limiting.

Trypema documentation website: https://trypema.davidoyinbo.com

[dependencies]
distkit = { version = "0.4", features = ["trypema"] }

All public types from the trypema crate are re-exported under distkit::trypema. The module provides:

  • Sliding-window rate limiting with configurable window size and rate.
  • Three providers – local (in-process via DashMap), Redis-backed (distributed enforcement via Lua scripts), and hybrid (local fast-path with periodic Redis sync).
  • Two strategies – absolute (binary allow/reject) and suppressed (probabilistic degradation that smoothly ramps rejection probability).

§Local rate limiting (absolute)

Use the local provider for in-process rate limiting with sub-microsecond latency. The absolute strategy gives a deterministic allow/reject decision.

let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
    local: LocalRateLimiterOptions {
        window_size_seconds: WindowSizeSeconds::try_from(60).unwrap(),
        rate_group_size_ms: RateGroupSizeMs::try_from(100).unwrap(),
        hard_limit_factor: HardLimitFactor::default(),
        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
    },
}));

// Optional: start background cleanup for stale keys
rl.run_cleanup_loop();

let rate = RateLimit::try_from(10.0).unwrap(); // 10 requests per second

match rl.local().absolute().inc("user_123", &rate, 1) {
    RateLimitDecision::Allowed => {
        // Process the request
    }
    RateLimitDecision::Rejected { retry_after_ms, .. } => {
        // Back off and retry later
        eprintln!("Rate limited, retry in {retry_after_ms} ms");
    }
    _ => {}
}

§Local rate limiting (suppressed)

The suppressed strategy smoothly ramps rejection probability as load approaches the limit, instead of a hard cutoff.

let rate = RateLimit::try_from(100.0).unwrap();

match rl.local().suppressed().inc("api_endpoint", &rate, 1) {
    RateLimitDecision::Allowed => {
        // Well under the limit
    }
    RateLimitDecision::Suppressed { is_allowed, suppression_factor } => {
        if is_allowed {
            // Approaching the limit but still admitted
            tracing::info!("Suppression factor: {suppression_factor:.2}");
        } else {
            // Probabilistically rejected
        }
    }
    RateLimitDecision::Rejected { .. } => {
        // Over the hard limit (rate * hard_limit_factor)
    }
}

§Redis-backed rate limiting

For distributed enforcement across multiple processes or servers, use the Redis provider. Each call executes an atomic Lua script.

let window = WindowSizeSeconds::try_from(60)?;
let bucket = RateGroupSizeMs::try_from(100)?;

let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
    local: LocalRateLimiterOptions {
        window_size_seconds: window,
        rate_group_size_ms: bucket,
        hard_limit_factor: HardLimitFactor::default(),
        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
    },
    redis: RedisRateLimiterOptions {
        connection_manager: conn,
        prefix: None,
        window_size_seconds: window,
        rate_group_size_ms: bucket,
        hard_limit_factor: HardLimitFactor::default(),
        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
        sync_interval_ms: SyncIntervalMs::default(),
    },
}));

rl.run_cleanup_loop();

let key = RedisKey::try_from("user_123".to_string())?;
let rate = RateLimit::try_from(50.0)?;

// Distributed absolute enforcement
let decision = rl.redis().absolute().inc(&key, &rate, 1).await?;

// Or use the hybrid provider for local fast-path with Redis sync
let decision = rl.hybrid().absolute().inc(&key, &rate, 1).await?;

See the trypema documentation for full API details and advanced configuration.

Modules§

countercounter
Distributed counter implementations.
icounterinstance-aware-counter
Instance-aware counters that track each running instance’s contribution separately.
locklock
Distributed lock primitives.
trypematrypema
Rate limiting via the trypema crate.

Structs§

DistkitRedisKey
A validated Redis key.

Enums§

CounterComparator
Comparison operator used by conditional counter writes.
DistkitError
Top-level error type for all distkit operations.

Type Aliases§

RedisKey
Backwards-compatible alias for DistkitRedisKey.