Expand description
NATS JetStream distributed counters.
This crate provides support for distributed counters using NATS JetStream streams
configured with the AllowMsgCounter option.
§Overview
The counters module wraps JetStream streams configured with AllowMsgCounter to provide
distributed counters. Each subject in the stream represents a separate counter.
Counters are tracked across multiple sources, allowing for aggregation and source history. The module supports operations like incrementing/decrementing counters, loading current values, and retrieving source contributions.
§Key Features
- Arbitrary Precision: Uses
BigIntfor unlimited integer size - Source Tracking: Track counter contributions from multiple streams
- Batch Operations: Efficiently fetch multiple counter values
- Atomic Operations: Server-side atomic increment/decrement operations
§Stream Requirements
Streams must have the following configuration:
allow_message_counter: true- Enables counter functionalityallow_direct: true- Required for efficient counter reads
§Quick Start
use nats_counters::CounterExt;
use async_nats::jetstream::stream::Config;
// Connect to NATS and create JetStream context
let client = async_nats::connect("localhost:4222").await?;
let js = async_nats::jetstream::new(client);
// Create a counter-enabled stream
let config = Config {
name: "COUNTERS".to_string(),
subjects: vec!["counters.>".to_string()],
allow_message_counter: true,
allow_direct: true,
..Default::default()
};
js.create_stream(config).await?;
// Get the counter
let counter = js.get_counter("COUNTERS").await?;
// Increment a counter
let value = counter.add("counters.visits", 1).await?;
println!("Visits: {}", value);
// Read current value
let current = counter.load("counters.visits").await?;
println!("Current visits: {}", current);§Source Tracking
When using stream sourcing, counters track contributions from each source stream:
// Get counter with source information
let entry = counter.get("counters.total").await?;
println!("Total: {}", entry.value);
// Show breakdown by source
for (stream, subjects) in &entry.sources {
for (subject, value) in subjects {
println!(" {} contributed {} from {}", stream, value, subject);
}
}Re-exports§
pub use errors::CounterError;pub use errors::CounterErrorKind;pub use errors::Result;pub use counter_ext::CounterExt;
Modules§
- counter_
ext - Extension trait for JetStream Context to work with counters.
- errors
- parser
- Parser utilities for counter values and sources.
Structs§
- Counter
- Implementation of distributed counters using a JetStream stream.
- Entry
- Represents a counter’s current state with full source history.
Constants§
- COUNTER_
INCREMENT_ HEADER - Header key used to indicate counter increment values.
- COUNTER_
SOURCES_ HEADER - Header key used to store source contributions in counter messages.
Type Aliases§
- Counter
Sources - Map of source streams to their subject-value contributions.