1use crate::CounterSources;
17use crate::errors::{CounterError, CounterErrorKind, Result};
18use async_nats::HeaderMap;
19use num_bigint::BigInt;
20use serde::{Deserialize, Serialize};
21use std::collections::HashMap;
22use std::str::FromStr;
23
24#[derive(Debug, Serialize, Deserialize)]
26struct CounterPayload {
27 val: String,
28}
29
30pub fn parse_counter_value(data: &[u8]) -> Result<BigInt> {
42 if data.is_empty() {
43 return Err(CounterError::new(CounterErrorKind::InvalidCounterValue));
44 }
45
46 let payload: CounterPayload = serde_json::from_slice(data)
47 .map_err(|e| CounterError::with_source(CounterErrorKind::Serialization, e))?;
48
49 BigInt::from_str(&payload.val)
50 .map_err(|_| CounterError::new(CounterErrorKind::InvalidResponseValue))
51}
52
53pub fn parse_counter_value_from_string(value: Option<String>) -> Result<BigInt> {
66 match value {
67 Some(val_str) => BigInt::from_str(&val_str)
68 .map_err(|_| CounterError::new(CounterErrorKind::InvalidResponseValue)),
69 None => Err(CounterError::new(CounterErrorKind::MissingResponseValue)),
70 }
71}
72
73pub fn parse_sources(headers: &HeaderMap) -> Result<CounterSources> {
86 let sources_header = headers.get(crate::COUNTER_SOURCES_HEADER);
87
88 match sources_header {
89 Some(header_value) => {
90 let header_str = header_value.as_str();
91
92 let sources_json: HashMap<String, HashMap<String, String>> =
94 serde_json::from_str(header_str)
95 .map_err(|e| CounterError::with_source(CounterErrorKind::Serialization, e))?;
96
97 let mut sources = HashMap::new();
99 for (source_id, subjects) in sources_json {
100 let mut subject_values = HashMap::new();
101 for (subject, value_str) in subjects {
102 let value = BigInt::from_str(&value_str)
103 .map_err(|_| CounterError::new(CounterErrorKind::InvalidSources))?;
104 subject_values.insert(subject, value);
105 }
106 sources.insert(source_id, subject_values);
107 }
108
109 Ok(sources)
110 }
111 None => Ok(HashMap::new()),
112 }
113}
114
115pub fn parse_increment(headers: &HeaderMap) -> Result<Option<BigInt>> {
127 let increment_header = headers.get(crate::COUNTER_INCREMENT_HEADER);
128
129 match increment_header {
130 Some(header_value) => {
131 let increment_str = header_value.as_str();
132
133 let increment = BigInt::from_str(increment_str)
134 .map_err(|_| CounterError::new(CounterErrorKind::InvalidResponseValue))?;
135
136 Ok(Some(increment))
137 }
138 None => Ok(None),
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 #[test]
147 fn test_parse_counter_value_valid() {
148 let data = br#"{"val": "123"}"#;
149 let result = parse_counter_value(data).unwrap();
150 assert_eq!(result, BigInt::from(123));
151 }
152
153 #[test]
154 fn test_parse_counter_value_negative() {
155 let data = br#"{"val": "-456"}"#;
156 let result = parse_counter_value(data).unwrap();
157 assert_eq!(result, BigInt::from(-456));
158 }
159
160 #[test]
161 fn test_parse_counter_value_large() {
162 let data = br#"{"val": "999999999999999999999999999999"}"#;
163 let result = parse_counter_value(data).unwrap();
164 assert_eq!(
165 result,
166 BigInt::from_str("999999999999999999999999999999").unwrap()
167 );
168 }
169
170 #[test]
171 fn test_parse_counter_value_empty() {
172 let data = b"";
173 let result = parse_counter_value(data);
174 assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::InvalidCounterValue));
175 }
176
177 #[test]
178 fn test_parse_counter_value_invalid_json() {
179 let data = b"not json";
180 let result = parse_counter_value(data);
181 assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::Serialization));
182 }
183
184 #[test]
185 fn test_parse_counter_value_invalid_number() {
186 let data = br#"{"val": "not a number"}"#;
187 let result = parse_counter_value(data);
188 assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::InvalidResponseValue));
189 }
190
191 #[test]
192 fn test_parse_sources_empty() {
193 let headers = HeaderMap::new();
194 let result = parse_sources(&headers).unwrap();
195 assert!(result.is_empty());
196 }
197
198 #[test]
199 fn test_parse_sources_valid() {
200 let mut headers = HeaderMap::new();
201 headers.insert(
202 crate::COUNTER_SOURCES_HEADER,
203 r#"{"stream1": {"subject1": "100", "subject2": "200"}}"#,
204 );
205
206 let result = parse_sources(&headers).unwrap();
207 assert_eq!(result.len(), 1);
208 assert!(result.contains_key("stream1"));
209
210 let stream1 = &result["stream1"];
211 assert_eq!(stream1.len(), 2);
212 assert_eq!(stream1["subject1"], BigInt::from(100));
213 assert_eq!(stream1["subject2"], BigInt::from(200));
214 }
215
216 #[test]
217 fn test_parse_sources_multiple_streams() {
218 let mut headers = HeaderMap::new();
219 headers.insert(
220 crate::COUNTER_SOURCES_HEADER,
221 r#"{"stream1": {"sub1": "10"}, "stream2": {"sub2": "20"}}"#,
222 );
223
224 let result = parse_sources(&headers).unwrap();
225 assert_eq!(result.len(), 2);
226 assert_eq!(result["stream1"]["sub1"], BigInt::from(10));
227 assert_eq!(result["stream2"]["sub2"], BigInt::from(20));
228 }
229
230 #[test]
231 fn test_parse_increment_present() {
232 let mut headers = HeaderMap::new();
233 headers.insert(crate::COUNTER_INCREMENT_HEADER, "42");
234
235 let result = parse_increment(&headers).unwrap();
236 assert_eq!(result, Some(BigInt::from(42)));
237 }
238
239 #[test]
240 fn test_parse_increment_negative() {
241 let mut headers = HeaderMap::new();
242 headers.insert(crate::COUNTER_INCREMENT_HEADER, "-10");
243
244 let result = parse_increment(&headers).unwrap();
245 assert_eq!(result, Some(BigInt::from(-10)));
246 }
247
248 #[test]
249 fn test_parse_increment_absent() {
250 let headers = HeaderMap::new();
251 let result = parse_increment(&headers).unwrap();
252 assert_eq!(result, None);
253 }
254
255 #[test]
256 fn test_parse_increment_invalid() {
257 let mut headers = HeaderMap::new();
258 headers.insert(crate::COUNTER_INCREMENT_HEADER, "not_a_number");
259
260 let result = parse_increment(&headers);
261 assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::InvalidResponseValue));
262 }
263
264 #[test]
265 fn test_parse_counter_value_from_string_valid() {
266 let value = Some("42".to_string());
267 let result = parse_counter_value_from_string(value).unwrap();
268 assert_eq!(result, BigInt::from(42));
269 }
270
271 #[test]
272 fn test_parse_counter_value_from_string_large() {
273 let value = Some("999999999999999999999999".to_string());
274 let result = parse_counter_value_from_string(value).unwrap();
275 assert_eq!(
276 result,
277 BigInt::from_str("999999999999999999999999").unwrap()
278 );
279 }
280
281 #[test]
282 fn test_parse_counter_value_from_string_none() {
283 let result = parse_counter_value_from_string(None);
284 assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::MissingResponseValue));
285 }
286
287 #[test]
288 fn test_parse_counter_value_from_string_invalid() {
289 let value = Some("not_a_number".to_string());
290 let result = parse_counter_value_from_string(value);
291 assert!(matches!(result, Err(e) if e.kind() == CounterErrorKind::InvalidResponseValue));
292 }
293}