distkit 0.2.2

A toolkit of distributed systems primitives for Rust, backed by Redis
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
A toolkit of distributed systems primitives for Rust, backed by Redis.

`distkit` (DISTributed system KIT) provides building blocks for distributed applications. The crate
currently offers three modules and they all run on the tokio runtime:

- **Counters** (`counter` feature, enabled by default) -- two counter (strict and lax)
  implementations that share a common async trait, letting you choose between
  immediate consistency and high-throughput eventual consistency.
- **Instance-aware counters** (`instance-aware-counter` feature) -- counters
  where each running process owns a named slice of the total, with automatic
  cleanup of contributions from processes that stop heartbeating. For example if you have
  a cluster of servers where each server reports its active connection count, the cumulative
  is the cluster-wide total, when a server dies, the contribution is automatically subtracted.
  If the server was temporarily offline, the contribution is automatically added back when it comes back online.
- **Rate limiting** (`trypema` feature, opt-in) -- re-exports the
  [`trypema`]https://docs.rs/trypema crate, providing sliding-window rate
  limiting with local, Redis-backed, and hybrid providers.

# Feature flags

| Feature                  | Default | Description                                                                                       |
| ------------------------ | ------- | ------------------------------------------------------------------------------------------------- |
| `counter`                | **yes** | Distributed counters ([`StrictCounter`], [`LaxCounter`])                                          |
| `instance-aware-counter` | no      | Instance aware distributed counters ([`StrictInstanceAwareCounter`], [`LaxInstanceAwareCounter`]) |
| `trypema`                | no      | Rate limiting via the [`trypema`]https://docs.rs/trypema crate                                  |

# Quick start

```rust
# use distkit::{RedisKey, counter::{StrictCounter, LaxCounter, CounterOptions, CounterTrait}};
# async fn example() -> Result<(), Box<dyn std::error::Error>> {
# let client = redis::Client::open("redis://127.0.0.1/")?;
# let conn = client.get_connection_manager().await?;
// Servers sharing the same prefix coordinate through the same Redis keys.
let prefix = RedisKey::try_from("my_app".to_string())?;
let options = CounterOptions::new(prefix, conn);

// Strict: every call hits Redis immediately
let strict = StrictCounter::new(options.clone());
let key = RedisKey::try_from("page_views".to_string())?;
strict.inc(&key, 1).await?;
let total = strict.get(&key).await?;

// Lax: buffered in memory, flushed to Redis every ~20 ms
let lax = LaxCounter::new(options);
lax.inc(&key, 1).await?;
let approx = lax.get(&key).await?;
# Ok(())
# }
```

# Choosing a counter

|                           | [`StrictCounter`]                      | [`LaxCounter`]                     | [`StrictInstanceAwareCounter`]        | [`LaxInstanceAwareCounter`]                |
| ------------------------- | -------------------------------------- | ---------------------------------- | ------------------------------------- | ------------------------------------------ |
| **Consistency**           | Immediate                              | Eventual (default: ~20 ms lag)     | Immediate                             | Eventual (`flush_interval` lag)            |
| **`inc` latency**         | Redis round-trip                       | Sub-microsecond (warm path)        | Redis round-trip                      | Sub-microsecond (warm path)                |
| **Redis I/O**             | Every operation                        | Batched on interval                | Every `inc`                           | Batched on interval                        |
| **`set` / `del`**         | Immediate                              | Immediate                          | Immediate (bumps epoch)               | Flushes pending delta, then immediate      |
| **Per-instance tracking** | No                                     | No                                 | Yes                                   | Yes                                        |
| **Dead-instance cleanup** | No                                     | No                                 | Yes                                   | Yes                                        |
| **Feature flag**          | `counter` (default)                    | `counter` (default)                | `instance-aware-counter`              | `instance-aware-counter`                   |
| **Use case**              | Billing, inventory, exact global count | Analytics, high-throughput metrics | Connection counts, exact live metrics | High-frequency per-node throughput metrics |

[`StrictCounter`] and [`LaxCounter`] implement [`CounterTrait`]. Both instance-aware
types implement [`InstanceAwareCounterTrait`]. Write generic code against either trait:

```rust
# use distkit::{DistkitError, RedisKey, counter::CounterTrait};
// Example: bumping a counter by 1 (strict or lax)
async fn bump<C: CounterTrait>(counter: &C, key: &RedisKey) -> Result<i64, DistkitError> {
    counter.inc(key, 1).await
}
```

```rust,no_run
# use distkit::{DistkitError, RedisKey, icounter::InstanceAwareCounterTrait};
async fn report_connection<C: InstanceAwareCounterTrait>(
    counter: &C,
    key: &RedisKey,
    delta: i64,
) -> Result<i64, DistkitError> {
    let (total, _mine) = counter.inc(key, delta).await?;
    Ok(total)
}
```

# Key types

- [`RedisKey`] -- A validated key string (non-empty, 255 chars max, no colons).
  Constructed via `TryFrom<String>`.
- [`CounterOptions`] -- Configuration bundle for counter construction. Carries
  a prefix, Redis connection, and the `allowed_lag` duration (default 20 ms).
  Implements `Clone`, so the same options can be passed to both counter types.
- [`CounterTrait`] -- The async trait that both counter types implement:
  `inc`, `dec`, `get`, `set`, `del`, `clear`.

# Error handling

All fallible operations return [`DistkitError`]:

- **`InvalidRedisKey`** -- Returned by `RedisKey::try_from` when the input is
  empty, longer than 255 characters, or contains a colon.
- **`RedisError`** -- A Redis operation failed (connection lost, script error,
  etc.). Wraps [`redis::RedisError`].
- **`CounterError`** -- A counter operation failed (e.g., a batch flush could
  not commit state to Redis).
- **`MutexPoisoned`** -- An internal lock was poisoned. This indicates a
  previous panic inside a critical section.
- **`CustomError`** -- Catch-all for internal errors.
- **`TrypemaError`** -- A rate-limiting operation failed (only present with the
  `trypema` feature).

# Instance-aware counters

Enable the `instance-aware-counter` feature to access per-instance distributed
counters.

```toml
[dependencies]
distkit = { version = "0.2", features = ["instance-aware-counter"] }
```

Instance-aware counters track each running instance's contribution separately.
The cumulative total is the sum of all **live** instances. When a instance stops
heartbeating for longer than `dead_instance_threshold_ms` (default 30 s), its
contribution is automatically subtracted from the cumulative on the next
operation by any surviving instance.

This makes them well-suited for:

- **Connection pool sizing** -- each server reports its active connection count;
  the cumulative is the cluster-wide total.
- **Live session counting** -- contributions disappear naturally when a node
  restarts or crashes.
- **Per-node metrics** -- see both the global total and each instance's slice.

## StrictInstanceAwareCounter

Every call is immediately consistent with Redis. `set` and `del` bump a
per-key **epoch** that causes stale instances to reset their stored count on
their next operation, preventing double-counting.

```rust,no_run
# use distkit::icounter::{
#     InstanceAwareCounterTrait,
#     StrictInstanceAwareCounter, StrictInstanceAwareCounterOptions,
# };
# use distkit::RedisKey;
# async fn example() -> Result<(), Box<dyn std::error::Error>> {
# let client = redis::Client::open("redis://127.0.0.1/")?;
# let conn = client.get_connection_manager().await?;
let prefix = RedisKey::try_from("my_app".to_string())?;
let counter = StrictInstanceAwareCounter::new(
    StrictInstanceAwareCounterOptions::new(prefix, conn),
);

let key = RedisKey::try_from("connections".to_string())?;

// Increment this instance's contribution; returns (cumulative, instance_count).
let (total, mine) = counter.inc(&key, 5).await?;

// Decrement this instance's contribution.
let (total, mine) = counter.dec(&key, 2).await?;

// Read without modifying.
let (total, mine) = counter.get(&key).await?;

// Set this instance's slice to an exact value without bumping the epoch.
// Other instances are not affected.
let (total, mine) = counter.set_on_instance(&key, 10).await?;

// Set the global total to an exact value and bump the epoch, making all
// other instances' stored counts stale.
let (total, mine) = counter.set(&key, 100).await?;

// Remove only this instance's contribution (no epoch bump).
let (total, removed) = counter.del_on_instance(&key).await?;

// Delete the key globally and bump the epoch.
let (old_total, _) = counter.del(&key).await?;
# Ok(())
# }
```

### Dead-instance cleanup

Each instance sends a heartbeat on every operation. If a process silently dies,
surviving instances automatically remove its contribution the next time any of
them touches the same key.

```rust,no_run
# use distkit::icounter::{
#     InstanceAwareCounterTrait,
#     StrictInstanceAwareCounter, StrictInstanceAwareCounterOptions,
# };
# use distkit::RedisKey;
# async fn example() -> Result<(), Box<dyn std::error::Error>> {
# let client = redis::Client::open("redis://127.0.0.1/")?;
# let conn1 = client.get_connection_manager().await?;
# let conn2 = client.get_connection_manager().await?;
let prefix = RedisKey::try_from("my_app".to_string())?;
let key = RedisKey::try_from("connections".to_string())?;

// Two independent instances sharing the same prefix.
let opts = |conn| StrictInstanceAwareCounterOptions {
    prefix: prefix.clone(),
    connection_manager: conn,
    dead_instance_threshold_ms: 30_000, // 30 s
};
let server_a = StrictInstanceAwareCounter::new(opts(conn1));
let server_b = StrictInstanceAwareCounter::new(opts(conn2));

server_a.inc(&key, 10).await?; // cumulative = 10
server_b.inc(&key,  5).await?; // cumulative = 15

// server_a goes offline. After 30 s, server_b's next call removes its
// contribution automatically.
let (total, _) = server_b.get(&key).await?; // total = 5 once cleaned up
# Ok(())
# }
```

## LaxInstanceAwareCounter

A buffered wrapper around [`StrictInstanceAwareCounter`]. `inc` calls
accumulate locally and are flushed to the strict counter in bulk every
`flush_interval` (default 20 ms). Global operations (`set`, `del`, `clear`)
flush any pending delta first, then delegate immediately.

Use this when you have many `inc`/`dec` calls per second and can tolerate a
small consistency lag.

```rust,no_run
# use distkit::icounter::{
#     InstanceAwareCounterTrait,
#     LaxInstanceAwareCounter, LaxInstanceAwareCounterOptions,
# };
# use distkit::RedisKey;
# use std::time::Duration;
# async fn example() -> Result<(), Box<dyn std::error::Error>> {
# let client = redis::Client::open("redis://127.0.0.1/")?;
# let conn = client.get_connection_manager().await?;
let prefix = RedisKey::try_from("my_app".to_string())?;

// options: LaxInstanceAwareCounterOptions::new(prefix, conn) would give the same result.
let counter = LaxInstanceAwareCounter::new(LaxInstanceAwareCounterOptions {
    prefix,
    connection_manager: conn,
    dead_instance_threshold_ms: 30_000,
    flush_interval: Duration::from_millis(20),
    allowed_lag:    Duration::from_millis(20),
});

let key = RedisKey::try_from("connections".to_string())?;

// Returns the local estimate immediately — no Redis round-trip on warm path.
let (local_total, mine) = counter.inc(&key, 1).await?;

// Decrement also stays local until flushed.
let (local_total, mine) = counter.dec(&key, 1).await?;

// get() also returns the local estimate (cumulative + pending delta).
// A fresh instance with no local state falls back to the strict counter.
let (total, mine) = counter.get(&key).await?;
# Ok(())
# }
```

# Rate limiting (trypema)

Enable the `trypema` feature to access distributed rate limiting.

Trypema documentation website: <https://trypema.davidoyinbo.com>

```toml
[dependencies]
distkit = { version = "0.2", features = ["trypema"] }
```

All public types from the [`trypema`](https://docs.rs/trypema) crate are
re-exported under `distkit::trypema`. The module provides:

- **Sliding-window rate limiting** with configurable window size and rate.
- **Three providers** -- local (in-process via `DashMap`), Redis-backed
  (distributed enforcement via Lua scripts), and hybrid (local fast-path
  with periodic Redis sync).
- **Two strategies** -- absolute (binary allow/reject) and suppressed
  (probabilistic degradation that smoothly ramps rejection probability).

## Local rate limiting (absolute)

Use the local provider for in-process rate limiting with sub-microsecond
latency. The absolute strategy gives a deterministic allow/reject decision.

```rust,no_run
# use std::sync::Arc;
# use distkit::trypema::{
#     HardLimitFactor, RateGroupSizeMs, RateLimit, RateLimitDecision,
#     RateLimiter, RateLimiterOptions, SuppressionFactorCacheMs, WindowSizeSeconds,
#     local::LocalRateLimiterOptions,
#     redis::RedisRateLimiterOptions,
#     hybrid::SyncIntervalMs,
# };
# fn example() {
# let connection_manager: redis::aio::ConnectionManager = todo!();
let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
    local: LocalRateLimiterOptions {
        window_size_seconds: WindowSizeSeconds::try_from(60).unwrap(),
        rate_group_size_ms: RateGroupSizeMs::try_from(100).unwrap(),
        hard_limit_factor: HardLimitFactor::default(),
        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
    },
#        redis: RedisRateLimiterOptions {
#        connection_manager,
#        prefix: None,
#        window_size_seconds: WindowSizeSeconds::try_from(60).unwrap(),
#        rate_group_size_ms: RateGroupSizeMs::try_from(100).unwrap(),
#        hard_limit_factor: HardLimitFactor::default(),
#        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
#        sync_interval_ms: SyncIntervalMs::default(),
#    },
}));

// Optional: start background cleanup for stale keys
rl.run_cleanup_loop();

let rate = RateLimit::try_from(10.0).unwrap(); // 10 requests per second

match rl.local().absolute().inc("user_123", &rate, 1) {
    RateLimitDecision::Allowed => {
        // Process the request
    }
    RateLimitDecision::Rejected { retry_after_ms, .. } => {
        // Back off and retry later
        eprintln!("Rate limited, retry in {retry_after_ms} ms");
    }
    _ => {}
}
# }
```

## Local rate limiting (suppressed)

The suppressed strategy smoothly ramps rejection probability as load
approaches the limit, instead of a hard cutoff.

```rust,no_run
# use std::sync::Arc;
# use distkit::trypema::{
#     HardLimitFactor, RateGroupSizeMs, RateLimit, RateLimitDecision,
#     RateLimiter, RateLimiterOptions, SuppressionFactorCacheMs, WindowSizeSeconds,
#     local::LocalRateLimiterOptions,
#     redis::RedisRateLimiterOptions,
#     hybrid::SyncIntervalMs,
# };
# fn example() {
# let connection_manager: redis::aio::ConnectionManager = todo!();
# let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
#     local: LocalRateLimiterOptions {
#         window_size_seconds: WindowSizeSeconds::try_from(60).unwrap(),
#         rate_group_size_ms: RateGroupSizeMs::try_from(100).unwrap(),
#         hard_limit_factor: HardLimitFactor::try_from(1.5).unwrap(),
#         suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
#     },
#     redis: RedisRateLimiterOptions {
#         connection_manager,
#         prefix: None,
#         window_size_seconds: WindowSizeSeconds::try_from(60).unwrap(),
#         rate_group_size_ms: RateGroupSizeMs::try_from(100).unwrap(),
#         hard_limit_factor: HardLimitFactor::try_from(1.5).unwrap(),
#         suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
#         sync_interval_ms: SyncIntervalMs::default(),
#     },
# }));
let rate = RateLimit::try_from(100.0).unwrap();

match rl.local().suppressed().inc("api_endpoint", &rate, 1) {
    RateLimitDecision::Allowed => {
        // Well under the limit
    }
    RateLimitDecision::Suppressed { is_allowed, suppression_factor } => {
        if is_allowed {
            // Approaching the limit but still admitted
            tracing::info!("Suppression factor: {suppression_factor:.2}");
        } else {
            // Probabilistically rejected
        }
    }
    RateLimitDecision::Rejected { .. } => {
        // Over the hard limit (rate * hard_limit_factor)
    }
}
# }
```

## Redis-backed rate limiting

For distributed enforcement across multiple processes or servers, use the
Redis provider. Each call executes an atomic Lua script.

```rust,no_run
# use std::sync::Arc;
# use distkit::trypema::{
#     HardLimitFactor, RateGroupSizeMs, RateLimit, RateLimitDecision,
#     RateLimiter, RateLimiterOptions, SuppressionFactorCacheMs, WindowSizeSeconds,
#     local::LocalRateLimiterOptions,
#     redis::{RedisKey, RedisRateLimiterOptions},
#     hybrid::SyncIntervalMs,
# };
# async fn example() -> Result<(), Box<dyn std::error::Error>> {
# let client = redis::Client::open("redis://127.0.0.1/")?;
# let conn = client.get_connection_manager().await?;
let window = WindowSizeSeconds::try_from(60)?;
let bucket = RateGroupSizeMs::try_from(100)?;

let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
    local: LocalRateLimiterOptions {
        window_size_seconds: window,
        rate_group_size_ms: bucket,
        hard_limit_factor: HardLimitFactor::default(),
        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
    },
    redis: RedisRateLimiterOptions {
        connection_manager: conn,
        prefix: None,
        window_size_seconds: window,
        rate_group_size_ms: bucket,
        hard_limit_factor: HardLimitFactor::default(),
        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
        sync_interval_ms: SyncIntervalMs::default(),
    },
}));

rl.run_cleanup_loop();

let key = RedisKey::try_from("user_123".to_string())?;
let rate = RateLimit::try_from(50.0)?;

// Distributed absolute enforcement
let decision = rl.redis().absolute().inc(&key, &rate, 1).await?;

// Or use the hybrid provider for local fast-path with Redis sync
let decision = rl.hybrid().absolute().inc(&key, &rate, 1).await?;
# Ok(())
# }
```

See the [`trypema` documentation](https://docs.rs/trypema) for full API
details and advanced configuration.

[`StrictCounter`]: counter::StrictCounter
[`LaxCounter`]: counter::LaxCounter
[`CounterTrait`]: counter::CounterTrait
[`CounterOptions`]: counter::CounterOptions
[`StrictInstanceAwareCounter`]: icounter::StrictInstanceAwareCounter
[`LaxInstanceAwareCounter`]: icounter::LaxInstanceAwareCounter
[`InstanceAwareCounterTrait`]: icounter::InstanceAwareCounterTrait