flowstats
Streaming algorithms for Rust.
Overview
flowstats provides space-efficient probabilistic data structures for processing data streams. These algorithms trade a small amount of accuracy for dramatic reductions in memory usage, making them ideal for:
- Real-time analytics - Count distinct users, track popular items
- Monitoring systems - Compute percentiles, detect anomalies
- Data pipelines - Aggregate statistics across distributed workers
- Resource-constrained environments - Process unbounded streams with fixed memory
Additionally:
- Most structures are mergeable for distributed computation (except Space-Saving)
- It supports
no_stdenvironments, see no_std support section below.
Features
| Category | Algorithm | Use Case |
|---|---|---|
| Cardinality | HyperLogLog | Count distinct elements (~1% error in 16KB) |
| Frequency | Count-Min Sketch | Estimate item frequencies |
| Frequency | Space-Saving | Find top-k frequent items |
| Quantiles | t-digest | Estimate percentiles (p50, p99, etc.) |
| Membership | Bloom Filter | Test set membership (no false negatives) |
| Sampling | Reservoir Sampler | Uniform random sample from stream |
| Statistics | RunningStats | Mean, variance, min, max in one pass |
Installation
[]
= "0.1"
Or with specific features:
[]
= { = "0.1", = false, = ["cardinality", "frequency"] }
Quick Start
Count Distinct Elements (HyperLogLog)
use HyperLogLog;
use CardinalitySketch;
let mut hll = new; // ~0.8% error, 16KB memory
for user_id in user_events
println!; // ≈ actual count ± 0.8%
Estimate Percentiles (t-digest)
use TDigest;
use QuantileSketch;
let mut digest = new;
for latency in request_latencies
println!;
println!;
Find Top-K Items (Space-Saving)
use SpaceSaving;
let mut top_k = new; // Track top 100
for page in page_views
for in top_k.top_k
Test Set Membership (Bloom Filter)
use BloomFilter;
let mut seen = new; // 1% false positive rate
for item in stream
Estimate Frequencies (Count-Min Sketch)
use CountMinSketch;
let mut cms = new; // ε=0.1%, δ=1%
for event in events
let freq = cms.estimate;
println!; // Upper bound on true count
Random Sampling (Reservoir)
use ReservoirSampler;
let mut sampler = new; // Keep 1000 samples
for record in unlimited_stream
// Each record had equal probability of being sampled
let sample = sampler.sample;
Running Statistics
use RunningStats;
let mut stats = new;
for value in measurements
println!;
println!;
println!;
Distributed Aggregation
All sketches support merging for distributed computation:
use HyperLogLog;
use ;
// Worker 1
let mut hll1 = new;
for id in shard_1
// Worker 2
let mut hll2 = new;
for id in shard_2
// Coordinator
hll1.merge.unwrap;
println!;
Algorithm Selection Guide
| Need | Algorithm | Memory | Error |
|---|---|---|---|
| Count unique items | HyperLogLog | 16 KB | ~0.8% typical |
| Percentiles (p50, p99) | t-digest | Variable | Good tail accuracy* |
| Top-k items | Space-Saving | O(k) | Items with freq > n/k captured |
| Item frequencies | Count-Min Sketch | Configurable | Overestimates by ≤ε·n |
| Set membership | Bloom Filter | ~1.2 MB/100k | 1% false positive (configurable) |
| Random sample | Reservoir | O(k) | Uniform over stream |
| Mean/variance | RunningStats | 48 bytes | Exact |
*t-digest accuracy depends on compression parameter and data distribution.
Feature Flags
[]
= ["std", "cardinality", "frequency", "quantiles"]
= ["cardinality", "frequency", "quantiles", "membership", "sampling", "statistics"]
# Algorithm families
= [] # HyperLogLog
= [] # Count-Min Sketch, Space-Saving
= [] # t-digest
= [] # Bloom Filter
= [] # Reservoir Sampler
= [] # RunningStats
# Optional
= [] # Use standard library (disable for no_std)
= ["dep:serde"] # Serialization support
no_std Support
flowstats supports no_std environments that have the alloc crate.
Cargo.toml:
[]
= { = "0.1", = false, = ["cardinality"] }
Usage:
extern crate alloc;
use format;
use HyperLogLog;
use CardinalitySketch;
// You must provide a global allocator for your target
Requirements:
- Target must support the
alloccrate (heap allocation) - You must provide a global allocator (
#[global_allocator]) libmis included automatically for floating-point math
Parallel Processing
Sketches are not internally synchronized, but they're designed for parallel workloads via the merge pattern:
use thread;
use HyperLogLog;
use ;
// One sketch per thread, merge at the end
let handles: = data_shards.into_iter.map.collect;
// Merge results
let mut combined = new;
for handle in handles
For concurrent access to a single sketch, wrap in Arc<Mutex<_>>.
Error Handling
- NaN values: Silently ignored in t-digest and RunningStats
- Merge errors: Return
MergeErrorfor incompatible configurations - Serde validation: Deserialize validates invariants
Implementation Notes
- Hashing: Uses xxHash (xxh3) for speed. Deterministic within a process but not across versions.
- MSRV: Minimum supported Rust version is 1.75.0
- Stability: Pre-1.0 - API may change between minor versions
Roadmap
- Cuckoo Filter (better than Bloom for deletions)
- Theta Sketch (set operations on cardinalities)
- MinHash (Jaccard similarity)
- DDSketch (relative error quantiles)
- KLL Sketch (alternative quantiles)
- SIMD optimizations
License
Licensed under either of:
- Apache License, Version 2.0 (LICENSE-APACHE or http://www.apache.org/licenses/LICENSE-2.0)
- MIT license (LICENSE-MIT or http://opensource.org/licenses/MIT)
at your option.