nats_counters/
parser.rs

1// Copyright 2025 Synadia Communications Inc.
2// Licensed under the Apache License, Version 2.0 (the "License");
3// you may not use this file except in compliance with the License.
4// You may obtain a copy of the License at
5//
6// http://www.apache.org/licenses/LICENSE-2.0
7//
8// Unless required by applicable law or agreed to in writing, software
9// distributed under the License is distributed on an "AS IS" BASIS,
10// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
11// See the License for the specific language governing permissions and
12// limitations under the License.
13
14//! Parser utilities for counter values and sources.
15
16use crate::CounterSources;
17use crate::errors::{CounterError, CounterErrorKind, Result};
18use async_nats::HeaderMap;
19use num_bigint::BigInt;
20use serde::{Deserialize, Serialize};
21use std::collections::HashMap;
22use std::str::FromStr;
23
24/// JSON structure for counter value payload.
25#[derive(Debug, Serialize, Deserialize)]
26struct CounterPayload {
27    val: String,
28}
29
30/// Parses a counter value from message data.
31///
32/// Counter values are stored as JSON in the format: `{"val": "123"}`
33///
34/// # Arguments
35///
36/// * `data` - The raw message data containing the JSON payload
37///
38/// # Returns
39///
40/// A `BigInt` representing the counter value.
41pub fn parse_counter_value(data: &[u8]) -> Result<BigInt> {
42    if data.is_empty() {
43        return Err(CounterError::new(CounterErrorKind::InvalidCounterValue));
44    }
45
46    let payload: CounterPayload = serde_json::from_slice(data)
47        .map_err(|e| CounterError::with_source(CounterErrorKind::Serialization, e))?;
48
49    BigInt::from_str(&payload.val)
50        .map_err(|_| CounterError::new(CounterErrorKind::InvalidResponseValue))
51}
52
53/// Parses a counter value from PubAck value field.
54///
55/// When publishing to a stream with counters enabled, the PubAck response
56/// includes a `value` field containing the current counter value as a string.
57///
58/// # Arguments
59///
60/// * `value` - Optional string value from PubAck
61///
62/// # Returns
63///
64/// A `BigInt` representing the counter value.
65pub fn parse_counter_value_from_string(value: Option<String>) -> Result<BigInt> {
66    match value {
67        Some(val_str) => BigInt::from_str(&val_str)
68            .map_err(|_| CounterError::new(CounterErrorKind::InvalidResponseValue)),
69        None => Err(CounterError::new(CounterErrorKind::MissingResponseValue)),
70    }
71}
72
73/// Parses counter sources from message headers.
74///
75/// Sources are stored in the `Nats-Counter-Sources` header as JSON.
76/// The format is: `{"source1": {"subject1": "value1", "subject2": "value2"}}`
77///
78/// # Arguments
79///
80/// * `headers` - The message headers containing the sources information
81///
82/// # Returns
83///
84/// A `CounterSources` map or `None` if no sources header is present.
85pub fn parse_sources(headers: &HeaderMap) -> Result<CounterSources> {
86    let sources_header = headers.get(crate::COUNTER_SOURCES_HEADER);
87
88    match sources_header {
89        Some(header_value) => {
90            let header_str = header_value.as_str();
91
92            // Parse as JSON map of source -> subject -> value (as strings)
93            let sources_json: HashMap<String, HashMap<String, String>> =
94                serde_json::from_str(header_str)
95                    .map_err(|e| CounterError::with_source(CounterErrorKind::Serialization, e))?;
96
97            // Convert string values to BigInt
98            let mut sources = HashMap::new();
99            for (source_id, subjects) in sources_json {
100                let mut subject_values = HashMap::new();
101                for (subject, value_str) in subjects {
102                    let value = BigInt::from_str(&value_str)
103                        .map_err(|_| CounterError::new(CounterErrorKind::InvalidSources))?;
104                    subject_values.insert(subject, value);
105                }
106                sources.insert(source_id, subject_values);
107            }
108
109            Ok(sources)
110        }
111        None => Ok(HashMap::new()),
112    }
113}
114
115/// Parses an increment value from message headers.
116///
117/// The increment value is stored in the `Nats-Incr` header.
118///
119/// # Arguments
120///
121/// * `headers` - The message headers containing the increment value
122///
123/// # Returns
124///
125/// An optional `BigInt` representing the increment value.
126pub fn parse_increment(headers: &HeaderMap) -> Result<Option<BigInt>> {
127    let increment_header = headers.get(crate::COUNTER_INCREMENT_HEADER);
128
129    match increment_header {
130        Some(header_value) => {
131            let increment_str = header_value.as_str();
132
133            let increment = BigInt::from_str(increment_str)
134                .map_err(|_| CounterError::new(CounterErrorKind::InvalidResponseValue))?;
135
136            Ok(Some(increment))
137        }
138        None => Ok(None),
139    }
140}
141
142#[cfg(test)]
143mod tests {
144    use super::*;
145
146    #[test]
147    fn test_parse_counter_value_valid() {
148        let data = br#"{"val": "123"}"#;
149        let result = parse_counter_value(data).unwrap();
150        assert_eq!(result, BigInt::from(123));
151    }
152
153    #[test]
154    fn test_parse_counter_value_negative() {
155        let data = br#"{"val": "-456"}"#;
156        let result = parse_counter_value(data).unwrap();
157        assert_eq!(result, BigInt::from(-456));
158    }
159
160    #[test]
161    fn test_parse_counter_value_large() {
162        let data = br#"{"val": "999999999999999999999999999999"}"#;
163        let result = parse_counter_value(data).unwrap();
164        assert_eq!(
165            result,
166            BigInt::from_str("999999999999999999999999999999").unwrap()
167        );
168    }
169
170    #[test]
171    fn test_parse_counter_value_empty() {
172        let data = b"";
173        let result = parse_counter_value(data);
174        assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::InvalidCounterValue));
175    }
176
177    #[test]
178    fn test_parse_counter_value_invalid_json() {
179        let data = b"not json";
180        let result = parse_counter_value(data);
181        assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::Serialization));
182    }
183
184    #[test]
185    fn test_parse_counter_value_invalid_number() {
186        let data = br#"{"val": "not a number"}"#;
187        let result = parse_counter_value(data);
188        assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::InvalidResponseValue));
189    }
190
191    #[test]
192    fn test_parse_sources_empty() {
193        let headers = HeaderMap::new();
194        let result = parse_sources(&headers).unwrap();
195        assert!(result.is_empty());
196    }
197
198    #[test]
199    fn test_parse_sources_valid() {
200        let mut headers = HeaderMap::new();
201        headers.insert(
202            crate::COUNTER_SOURCES_HEADER,
203            r#"{"stream1": {"subject1": "100", "subject2": "200"}}"#,
204        );
205
206        let result = parse_sources(&headers).unwrap();
207        assert_eq!(result.len(), 1);
208        assert!(result.contains_key("stream1"));
209
210        let stream1 = &result["stream1"];
211        assert_eq!(stream1.len(), 2);
212        assert_eq!(stream1["subject1"], BigInt::from(100));
213        assert_eq!(stream1["subject2"], BigInt::from(200));
214    }
215
216    #[test]
217    fn test_parse_sources_multiple_streams() {
218        let mut headers = HeaderMap::new();
219        headers.insert(
220            crate::COUNTER_SOURCES_HEADER,
221            r#"{"stream1": {"sub1": "10"}, "stream2": {"sub2": "20"}}"#,
222        );
223
224        let result = parse_sources(&headers).unwrap();
225        assert_eq!(result.len(), 2);
226        assert_eq!(result["stream1"]["sub1"], BigInt::from(10));
227        assert_eq!(result["stream2"]["sub2"], BigInt::from(20));
228    }
229
230    #[test]
231    fn test_parse_increment_present() {
232        let mut headers = HeaderMap::new();
233        headers.insert(crate::COUNTER_INCREMENT_HEADER, "42");
234
235        let result = parse_increment(&headers).unwrap();
236        assert_eq!(result, Some(BigInt::from(42)));
237    }
238
239    #[test]
240    fn test_parse_increment_negative() {
241        let mut headers = HeaderMap::new();
242        headers.insert(crate::COUNTER_INCREMENT_HEADER, "-10");
243
244        let result = parse_increment(&headers).unwrap();
245        assert_eq!(result, Some(BigInt::from(-10)));
246    }
247
248    #[test]
249    fn test_parse_increment_absent() {
250        let headers = HeaderMap::new();
251        let result = parse_increment(&headers).unwrap();
252        assert_eq!(result, None);
253    }
254
255    #[test]
256    fn test_parse_increment_invalid() {
257        let mut headers = HeaderMap::new();
258        headers.insert(crate::COUNTER_INCREMENT_HEADER, "not_a_number");
259
260        let result = parse_increment(&headers);
261        assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::InvalidResponseValue));
262    }
263
264    #[test]
265    fn test_parse_counter_value_from_string_valid() {
266        let value = Some("42".to_string());
267        let result = parse_counter_value_from_string(value).unwrap();
268        assert_eq!(result, BigInt::from(42));
269    }
270
271    #[test]
272    fn test_parse_counter_value_from_string_large() {
273        let value = Some("999999999999999999999999".to_string());
274        let result = parse_counter_value_from_string(value).unwrap();
275        assert_eq!(
276            result,
277            BigInt::from_str("999999999999999999999999").unwrap()
278        );
279    }
280
281    #[test]
282    fn test_parse_counter_value_from_string_none() {
283        let result = parse_counter_value_from_string(None);
284        assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::MissingResponseValue));
285    }
286
287    #[test]
288    fn test_parse_counter_value_from_string_invalid() {
289        let value = Some("not_a_number".to_string());
290        let result = parse_counter_value_from_string(value);
291        assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::InvalidResponseValue));
292    }
293}