nats_counters/
lib.rs

1// Copyright 2025 Synadia Communications Inc.
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! NATS JetStream distributed counters.
15//!
16//! This crate provides support for distributed counters using NATS JetStream streams
17//! configured with the `AllowMsgCounter` option.
18//!
19//! # Overview
20//!
21//! The counters module wraps JetStream streams configured with `AllowMsgCounter` to provide
22//! distributed counters. Each subject in the stream represents a separate counter.
23//!
24//! Counters are tracked across multiple sources, allowing for aggregation and source history.
25//! The module supports operations like incrementing/decrementing counters, loading current values,
26//! and retrieving source contributions.
27//!
28//! ## Key Features
29//!
30//! - **Arbitrary Precision**: Uses `BigInt` for unlimited integer size
31//! - **Source Tracking**: Track counter contributions from multiple streams
32//! - **Batch Operations**: Efficiently fetch multiple counter values
33//! - **Atomic Operations**: Server-side atomic increment/decrement operations
34//!
35//! ## Stream Requirements
36//!
37//! Streams must have the following configuration:
38//! - `allow_message_counter: true` - Enables counter functionality
39//! - `allow_direct: true` - Required for efficient counter reads
40//!
41//! # Quick Start
42//!
43//! ```no_run
44//! use nats_counters::CounterExt;
45//! use async_nats::jetstream::stream::Config;
46//!
47//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
48//! // Connect to NATS and create JetStream context
49//! let client = async_nats::connect("localhost:4222").await?;
50//! let js = async_nats::jetstream::new(client);
51//!
52//! // Create a counter-enabled stream
53//! let config = Config {
54//!     name: "COUNTERS".to_string(),
55//!     subjects: vec!["counters.>".to_string()],
56//!     allow_message_counter: true,
57//!     allow_direct: true,
58//!     ..Default::default()
59//! };
60//! js.create_stream(config).await?;
61//!
62//! // Get the counter
63//! let counter = js.get_counter("COUNTERS").await?;
64//!
65//! // Increment a counter
66//! let value = counter.add("counters.visits", 1).await?;
67//! println!("Visits: {}", value);
68//!
69//! // Read current value
70//! let current = counter.load("counters.visits").await?;
71//! println!("Current visits: {}", current);
72//! # Ok(())
73//! # }
74//! ```
75//!
76//! # Source Tracking
77//!
78//! When using stream sourcing, counters track contributions from each source stream:
79//!
80//! ```no_run
81//! # async fn example(counter: &nats_counters::Counter) -> Result<(), Box<dyn std::error::Error>> {
82//! // Get counter with source information
83//! let entry = counter.get("counters.total").await?;
84//! println!("Total: {}", entry.value);
85//!
86//! // Show breakdown by source
87//! for (stream, subjects) in &entry.sources {
88//!     for (subject, value) in subjects {
89//!         println!("  {} contributed {} from {}", stream, value, subject);
90//!     }
91//! }
92//! # Ok(())
93//! # }
94
95pub mod errors;
96pub mod parser;
97
98use num_bigint::BigInt;
99use std::collections::HashMap;
100
101pub use errors::{CounterError, CounterErrorKind, Result};
102
103/// Header key used to store source contributions in counter messages.
104pub const COUNTER_SOURCES_HEADER: &str = "Nats-Counter-Sources";
105
106/// Header key used to indicate counter increment values.
107pub const COUNTER_INCREMENT_HEADER: &str = "Nats-Incr";
108
109/// Map of source streams to their subject-value contributions.
110pub type CounterSources = HashMap<String, HashMap<String, BigInt>>;
111
112/// Represents a counter's current state with full source history.
113#[derive(Debug, Clone)]
114pub struct Entry {
115    /// The counter's subject name.
116    pub subject: String,
117
118    /// The current counter value.
119    pub value: BigInt,
120
121    /// Maps source identifiers to their subject-value contributions.
122    pub sources: CounterSources,
123
124    /// Most recent increment value for this entry.
125    /// Useful for recounting and auditing purposes.
126    pub increment: Option<BigInt>,
127}
128
129// Module for the extension trait
130pub mod counter_ext;
131
132// Module for the counter implementation
133mod counter;
134
135// Re-export main types
136pub use counter::Counter;
137pub use counter_ext::CounterExt;