distkit 0.2.1

A toolkit of distributed systems primitives for Rust, backed by Redis
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
# distkit

A toolkit of distributed systems primitives for Rust, backed by Redis.

[![Crates.io](https://img.shields.io/crates/v/distkit.svg)](https://crates.io/crates/distkit)
[![Documentation](https://docs.rs/distkit/badge.svg)](https://docs.rs/distkit)
[![License: MIT](https://img.shields.io/badge/License-MIT-blue.svg)](LICENSE)

## What is distkit?

distkit provides building blocks for distributed applications. It ships
distributed counters (strict and lax), instance-aware counters, and rate
limiting, all backed by Redis.

## Features

- **StrictCounter** -- every operation executes a Redis Lua script atomically.
  Reads always reflect the latest write. Best for billing, inventory, or
  anything where accuracy is critical.
- **LaxCounter** -- buffers increments in memory and flushes to Redis every
  ~20 ms. Sub-microsecond latency on the hot path. Best for analytics and
  high-throughput metrics.
- **Instance-aware counters** -- each running instance owns a named slice of the
  total, with automatic cleanup of contributions from instances that stop
  heartbeating.
- **Rate limiting** (opt-in `trypema` feature) -- sliding-window rate limiting
  with local, Redis-backed, and hybrid providers. Supports absolute and
  probabilistic suppression strategies.
- **Safe by default** -- `#![forbid(unsafe_code)]`, no panics in library code.

## Feature flags

| Feature                  | Default | Description                                                              |
| ------------------------ | ------- | ------------------------------------------------------------------------ |
| `counter`                | **yes** | Distributed counters (`StrictCounter`, `LaxCounter`)                     |
| `instance-aware-counter` | no      | Per-instance counters (`StrictInstanceAwareCounter`, `LaxInstanceAwareCounter`) |
| `trypema`                | no      | Rate limiting via the [trypema]https://docs.rs/trypema crate           |

## Installation

```sh
cargo add distkit
```

Or add to `Cargo.toml`:

```toml
[dependencies]
distkit = "0.2"
```

To enable instance-aware counters or rate limiting:

```toml
[dependencies]
distkit = { version = "0.2", features = ["instance-aware-counter", "trypema"] }
```

distkit requires a running Redis instance (5.0+ for Lua script support).

## Quick start

```rust
use distkit::{RedisKey, counter::{StrictCounter, LaxCounter, CounterOptions, CounterTrait}};

#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
    let client = redis::Client::open("redis://127.0.0.1/")?;
    let conn = client.get_connection_manager().await?;

    let prefix = RedisKey::try_from("my_app".to_string())?;
    let options = CounterOptions::new(prefix, conn);

    let key = RedisKey::try_from("page_views".to_string())?;

    // Strict: immediate consistency
    let strict = StrictCounter::new(options.clone());
    strict.inc(&key, 1).await?;
    let total = strict.get(&key).await?;
    println!("strict: {total}");

    // Lax: eventual consistency, much faster
    let lax = LaxCounter::new(options);
    lax.inc(&key, 1).await?;
    let approx = lax.get(&key).await?;
    println!("lax: {approx}");

    Ok(())
}
```

## Counter types

### StrictCounter

Every call is a single Redis round-trip executing an atomic Lua script. The
counter value is always authoritative.

```rust
let key = RedisKey::try_from("orders".to_string())?;
strict.inc(&key, 1).await?;   // HINCRBY via Lua
strict.set(&key, 100).await?; // HSET via Lua
strict.del(&key).await?;      // HDEL, returns old value
strict.clear().await?;        // DEL on the hash
```

### LaxCounter

Writes are buffered in a local `DashMap` and flushed to Redis in batched
pipelines every `allowed_lag` (default 20 ms). Reads return the local view
(`remote_total + pending_delta`), which is always consistent within the same
process.

```rust
let key = RedisKey::try_from("impressions".to_string())?;
lax.inc(&key, 1).await?;         // local atomic add, sub-microsecond
let val = lax.get(&key).await?;  // reads local state, no Redis hit
```

A background Tokio task handles flushing. It holds a `Weak` reference to the
counter, so it stops automatically when the counter is dropped.

### Choosing a counter

|                           | `StrictCounter`                        | `LaxCounter`                       | `StrictInstanceAwareCounter`          | `LaxInstanceAwareCounter`                  |
| ------------------------- | -------------------------------------- | ---------------------------------- | ------------------------------------- | ------------------------------------------ |
| **Consistency**           | Immediate                              | Eventual (default: ~20 ms lag)     | Immediate                             | Eventual (`flush_interval` lag)            |
| **`inc` latency**         | Redis round-trip                       | Sub-microsecond (warm path)        | Redis round-trip                      | Sub-microsecond (warm path)                |
| **Redis I/O**             | Every operation                        | Batched on interval                | Every `inc`                           | Batched on interval                        |
| **`set` / `del`**         | Immediate                              | Immediate                          | Immediate (bumps epoch)               | Flushes pending delta, then immediate      |
| **Per-instance tracking** | No                                     | No                                 | Yes                                   | Yes                                        |
| **Dead-instance cleanup** | No                                     | No                                 | Yes                                   | Yes                                        |
| **Feature flag**          | `counter` (default)                    | `counter` (default)                | `instance-aware-counter`              | `instance-aware-counter`                   |
| **Use case**              | Billing, inventory, exact global count | Analytics, high-throughput metrics | Connection counts, exact live metrics | High-frequency per-node throughput metrics |

## Instance-aware counters

Enable the `instance-aware-counter` feature:

```toml
[dependencies]
distkit = { version = "0.2", features = ["instance-aware-counter"] }
```

Instance-aware counters track each running instance's contribution separately.
The cumulative total is the sum of all **live** instances. When an instance stops
heartbeating for longer than `dead_instance_threshold_ms` (default 30 s), its
contribution is automatically subtracted from the cumulative on the next
operation by any surviving instance.

This makes them well-suited for:

- **Connection pool sizing** -- each server reports its active connection count;
  the cumulative is the cluster-wide total.
- **Live session counting** -- contributions disappear naturally when a node
  restarts or crashes.
- **Per-node metrics** -- see both the global total and each instance's slice.

### StrictInstanceAwareCounter

Every call is immediately consistent with Redis. `set` and `del` bump a
per-key **epoch** that causes stale instances to reset their stored count on
their next operation, preventing double-counting.

```rust
use distkit::icounter::{
    InstanceAwareCounterTrait,
    StrictInstanceAwareCounter, StrictInstanceAwareCounterOptions,
};
use distkit::RedisKey;

let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;
let prefix = RedisKey::try_from("my_app".to_string())?;
let counter = StrictInstanceAwareCounter::new(
    StrictInstanceAwareCounterOptions::new(prefix, conn),
);

let key = RedisKey::try_from("connections".to_string())?;

// Increment this instance's contribution; returns (cumulative, instance_count).
let (total, mine) = counter.inc(&key, 5).await?;

// Decrement this instance's contribution.
let (total, mine) = counter.dec(&key, 2).await?;

// Read without modifying.
let (total, mine) = counter.get(&key).await?;

// Set this instance's slice to an exact value without bumping the epoch.
let (total, mine) = counter.set_on_instance(&key, 10).await?;

// Set the global total to an exact value and bump the epoch.
let (total, mine) = counter.set(&key, 100).await?;

// Remove only this instance's contribution.
let (total, removed) = counter.del_on_instance(&key).await?;

// Delete the key globally and bump the epoch.
let (old_total, _) = counter.del(&key).await?;
```

#### Dead-instance cleanup

Each instance sends a heartbeat on every operation. If a process silently dies,
surviving instances automatically remove its contribution the next time any of
them touches the same key.

```rust
use distkit::icounter::{
    InstanceAwareCounterTrait,
    StrictInstanceAwareCounter, StrictInstanceAwareCounterOptions,
};
use distkit::RedisKey;

let client = redis::Client::open("redis://127.0.0.1/")?;
let conn1 = client.get_connection_manager().await?;
let conn2 = client.get_connection_manager().await?;
let prefix = RedisKey::try_from("my_app".to_string())?;
let key = RedisKey::try_from("connections".to_string())?;

let opts = |conn| StrictInstanceAwareCounterOptions {
    prefix: prefix.clone(),
    connection_manager: conn,
    dead_instance_threshold_ms: 30_000, // 30 s
};
let server_a = StrictInstanceAwareCounter::new(opts(conn1));
let server_b = StrictInstanceAwareCounter::new(opts(conn2));

server_a.inc(&key, 10).await?; // cumulative = 10
server_b.inc(&key,  5).await?; // cumulative = 15

// server_a goes offline. After 30 s, server_b's next call removes its
// contribution automatically.
let (total, _) = server_b.get(&key).await?; // total = 5 once cleaned up
```

### LaxInstanceAwareCounter

A buffered wrapper around `StrictInstanceAwareCounter`. `inc` calls accumulate
locally and are flushed to the strict counter in bulk every `flush_interval`
(default 20 ms). Global operations (`set`, `del`, `clear`) flush any pending
delta first, then delegate immediately.

Use this when you have many `inc`/`dec` calls per second and can tolerate a
small consistency lag.

```rust
use distkit::icounter::{
    InstanceAwareCounterTrait,
    LaxInstanceAwareCounter, LaxInstanceAwareCounterOptions,
};
use distkit::RedisKey;
use std::time::Duration;

let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;
let prefix = RedisKey::try_from("my_app".to_string())?;
let counter = LaxInstanceAwareCounter::new(LaxInstanceAwareCounterOptions {
    prefix,
    connection_manager: conn,
    dead_instance_threshold_ms: 30_000,
    flush_interval: Duration::from_millis(20),
    allowed_lag:    Duration::from_millis(20),
});

let key = RedisKey::try_from("connections".to_string())?;

// Returns the local estimate immediately — no Redis round-trip on warm path.
let (local_total, mine) = counter.inc(&key, 1).await?;

// Decrement also stays local until flushed.
let (local_total, mine) = counter.dec(&key, 1).await?;

// get() also returns the local estimate (cumulative + pending delta).
let (total, mine) = counter.get(&key).await?;
```

## Rate limiting (trypema)

Enable the `trypema` feature to access sliding-window rate limiting.

Trypema documentation website: <https://trypema.davidoyinbo.com>

```toml
[dependencies]
distkit = { version = "0.2", features = ["trypema"] }
```

All public types from the [`trypema`](https://docs.rs/trypema) crate are
re-exported under `distkit::trypema`. The module provides:

- **Sliding-window rate limiting** with configurable window size and rate.
- **Three providers** -- local (in-process), Redis-backed (distributed), and
  hybrid (local fast-path with periodic Redis sync).
- **Two strategies** -- absolute (binary allow/reject) and suppressed
  (probabilistic degradation that smoothly ramps rejection probability).

### Local rate limiting (absolute)

```rust
use std::sync::Arc;
use distkit::trypema::{
    HardLimitFactor, RateGroupSizeMs, RateLimit, RateLimitDecision,
    RateLimiter, RateLimiterOptions, SuppressionFactorCacheMs, WindowSizeSeconds,
    local::LocalRateLimiterOptions,
};

let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
    local: LocalRateLimiterOptions {
        window_size_seconds: WindowSizeSeconds::try_from(60).unwrap(),
        rate_group_size_ms: RateGroupSizeMs::try_from(100).unwrap(),
        hard_limit_factor: HardLimitFactor::default(),
        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
    },
}));

rl.run_cleanup_loop();

let rate = RateLimit::try_from(10.0).unwrap(); // 10 requests per second

match rl.local().absolute().inc("user_123", &rate, 1) {
    RateLimitDecision::Allowed => { /* process request */ }
    RateLimitDecision::Rejected { retry_after_ms, .. } => {
        eprintln!("Rate limited, retry in {retry_after_ms} ms");
    }
    _ => {}
}
```

### Redis-backed rate limiting

For distributed enforcement across multiple processes or servers:

```rust
use std::sync::Arc;
use distkit::trypema::{
    HardLimitFactor, RateGroupSizeMs, RateLimit, RateLimitDecision,
    RateLimiter, RateLimiterOptions, SuppressionFactorCacheMs, WindowSizeSeconds,
    local::LocalRateLimiterOptions,
    redis::{RedisKey, RedisRateLimiterOptions},
    hybrid::SyncIntervalMs,
};

let client = redis::Client::open("redis://127.0.0.1/")?;
let conn = client.get_connection_manager().await?;
let window = WindowSizeSeconds::try_from(60)?;
let bucket = RateGroupSizeMs::try_from(100)?;

let rl = Arc::new(RateLimiter::new(RateLimiterOptions {
    local: LocalRateLimiterOptions {
        window_size_seconds: window,
        rate_group_size_ms: bucket,
        hard_limit_factor: HardLimitFactor::default(),
        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
    },
    redis: RedisRateLimiterOptions {
        connection_manager: conn,
        prefix: None,
        window_size_seconds: window,
        rate_group_size_ms: bucket,
        hard_limit_factor: HardLimitFactor::default(),
        suppression_factor_cache_ms: SuppressionFactorCacheMs::default(),
        sync_interval_ms: SyncIntervalMs::default(),
    },
}));

rl.run_cleanup_loop();

let key = RedisKey::try_from("user_123".to_string())?;
let rate = RateLimit::try_from(50.0)?;

// Distributed absolute enforcement
let decision = rl.redis().absolute().inc(&key, &rate, 1).await?;

// Or use the hybrid provider for local fast-path with Redis sync
let decision = rl.hybrid().absolute().inc(&key, &rate, 1).await?;
```

See the [trypema documentation](https://docs.rs/trypema) for full API details
and advanced configuration.

## Development

### Prerequisites

- Rust (latest stable)
- Docker (for the test Redis instance)

### Commands

```sh
make test       # Start Redis, run tests, tear down
make bench      # Start Redis, run criterion benchmarks, tear down
make redis-up   # Start Redis on port 16379
make redis-down # Stop Redis and remove volumes
```

Tests and benchmarks require the `REDIS_URL` environment variable.
The `make` targets set this automatically.

## License

MIT