Crate nats_counters

Crate nats_counters 

Source
Expand description

NATS JetStream distributed counters.

This crate provides support for distributed counters using NATS JetStream streams configured with the AllowMsgCounter option.

§Overview

The counters module wraps JetStream streams configured with AllowMsgCounter to provide distributed counters. Each subject in the stream represents a separate counter.

Counters are tracked across multiple sources, allowing for aggregation and source history. The module supports operations like incrementing/decrementing counters, loading current values, and retrieving source contributions.

§Key Features

  • Arbitrary Precision: Uses BigInt for unlimited integer size
  • Source Tracking: Track counter contributions from multiple streams
  • Batch Operations: Efficiently fetch multiple counter values
  • Atomic Operations: Server-side atomic increment/decrement operations

§Stream Requirements

Streams must have the following configuration:

  • allow_message_counter: true - Enables counter functionality
  • allow_direct: true - Required for efficient counter reads

§Quick Start

use nats_counters::CounterExt;
use async_nats::jetstream::stream::Config;

// Connect to NATS and create JetStream context
let client = async_nats::connect("localhost:4222").await?;
let js = async_nats::jetstream::new(client);

// Create a counter-enabled stream
let config = Config {
    name: "COUNTERS".to_string(),
    subjects: vec!["counters.>".to_string()],
    allow_message_counter: true,
    allow_direct: true,
    ..Default::default()
};
js.create_stream(config).await?;

// Get the counter
let counter = js.get_counter("COUNTERS").await?;

// Increment a counter
let value = counter.add("counters.visits", 1).await?;
println!("Visits: {}", value);

// Read current value
let current = counter.load("counters.visits").await?;
println!("Current visits: {}", current);

§Source Tracking

When using stream sourcing, counters track contributions from each source stream:

// Get counter with source information
let entry = counter.get("counters.total").await?;
println!("Total: {}", entry.value);

// Show breakdown by source
for (stream, subjects) in &entry.sources {
    for (subject, value) in subjects {
        println!("  {} contributed {} from {}", stream, value, subject);
    }
}

Re-exports§

pub use errors::CounterError;
pub use errors::CounterErrorKind;
pub use errors::Result;
pub use counter_ext::CounterExt;

Modules§

counter_ext
Extension trait for JetStream Context to work with counters.
errors
parser
Parser utilities for counter values and sources.

Structs§

Counter
Implementation of distributed counters using a JetStream stream.
Entry
Represents a counter’s current state with full source history.

Constants§

COUNTER_INCREMENT_HEADER
Header key used to indicate counter increment values.
COUNTER_SOURCES_HEADER
Header key used to store source contributions in counter messages.

Type Aliases§

CounterSources
Map of source streams to their subject-value contributions.