1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
3use torrust_clock::DurationSinceUnixEpoch;
4
5use super::counter::Counter;
6use super::gauge::Gauge;
7use super::label::LabelSet;
8use super::prometheus::PrometheusSerializable;
9
10#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
11pub struct Sample<T> {
12 #[serde(flatten)]
13 measurement: Measurement<T>,
14
15 #[serde(rename = "labels")]
16 label_set: LabelSet,
17}
18
19impl<T> Sample<T> {
20 #[must_use]
21 pub fn new(value: T, recorded_at: DurationSinceUnixEpoch, label_set: LabelSet) -> Self {
22 let data = Measurement { value, recorded_at };
23
24 Self {
25 measurement: data,
26 label_set,
27 }
28 }
29
30 #[must_use]
31 pub fn measurement(&self) -> &Measurement<T> {
32 &self.measurement
33 }
34
35 #[must_use]
36 pub fn value(&self) -> &T {
37 &self.measurement.value
38 }
39
40 #[must_use]
41 pub fn recorded_at(&self) -> DurationSinceUnixEpoch {
42 self.measurement.recorded_at
43 }
44
45 #[must_use]
46 pub fn labels(&self) -> &LabelSet {
47 &self.label_set
48 }
49}
50
51impl<T: PrometheusSerializable> PrometheusSerializable for Sample<T> {
52 fn to_prometheus(&self) -> String {
53 if self.label_set.is_empty() {
54 format!(" {}", self.measurement.to_prometheus())
55 } else {
56 format!("{} {}", self.label_set.to_prometheus(), self.measurement.to_prometheus())
57 }
58 }
59}
60
61impl Sample<Counter> {
62 pub fn increment(&mut self, time: DurationSinceUnixEpoch) {
63 self.measurement.increment(time);
64 }
65}
66
67impl Sample<Gauge> {
68 pub fn set(&mut self, value: f64, time: DurationSinceUnixEpoch) {
69 self.measurement.set(value, time);
70 }
71
72 pub fn increment(&mut self, time: DurationSinceUnixEpoch) {
73 self.measurement.increment(time);
74 }
75
76 pub fn decrement(&mut self, time: DurationSinceUnixEpoch) {
77 self.measurement.decrement(time);
78 }
79}
80
81#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
82pub struct Measurement<T> {
83 value: T,
85
86 #[serde(serialize_with = "serialize_duration", deserialize_with = "deserialize_duration")]
88 recorded_at: DurationSinceUnixEpoch,
89}
90
91impl<T> Measurement<T> {
92 #[must_use]
93 pub fn new(value: T, recorded_at: DurationSinceUnixEpoch) -> Self {
94 Self { value, recorded_at }
95 }
96
97 #[must_use]
98 pub fn value(&self) -> &T {
99 &self.value
100 }
101
102 #[must_use]
103 pub fn recorded_at(&self) -> DurationSinceUnixEpoch {
104 self.recorded_at
105 }
106
107 fn set_recorded_at(&mut self, time: DurationSinceUnixEpoch) {
108 self.recorded_at = time;
109 }
110}
111
112impl<T> From<Sample<T>> for (LabelSet, Measurement<T>) {
113 fn from(sample: Sample<T>) -> Self {
114 (sample.label_set, sample.measurement)
115 }
116}
117
118impl<T: PrometheusSerializable> PrometheusSerializable for Measurement<T> {
119 fn to_prometheus(&self) -> String {
120 self.value.to_prometheus()
121 }
122}
123
124impl Measurement<Counter> {
125 pub fn increment(&mut self, time: DurationSinceUnixEpoch) {
126 self.value.increment(1);
127 self.set_recorded_at(time);
128 }
129
130 pub fn absolute(&mut self, value: u64, time: DurationSinceUnixEpoch) {
131 self.value.absolute(value);
132 self.set_recorded_at(time);
133 }
134}
135
136impl Measurement<Gauge> {
137 pub fn set(&mut self, value: f64, time: DurationSinceUnixEpoch) {
138 self.value.set(value);
139 self.set_recorded_at(time);
140 }
141
142 pub fn increment(&mut self, time: DurationSinceUnixEpoch) {
143 self.value.increment(1.0);
144 self.set_recorded_at(time);
145 }
146
147 pub fn decrement(&mut self, time: DurationSinceUnixEpoch) {
148 self.value.decrement(1.0);
149 self.set_recorded_at(time);
150 }
151}
152
153fn serialize_duration<S>(duration: &DurationSinceUnixEpoch, serializer: S) -> Result<S::Ok, S::Error>
161where
162 S: Serializer,
163{
164 let secs = i64::try_from(duration.as_secs()).map_err(|_| serde::ser::Error::custom("Timestamp too large"))?;
165 let nanos = duration.subsec_nanos();
166
167 let datetime = DateTime::from_timestamp(secs, nanos).ok_or_else(|| serde::ser::Error::custom("Invalid timestamp"))?;
168
169 serializer.serialize_str(&datetime.to_rfc3339()) }
171
172fn deserialize_duration<'de, D>(deserializer: D) -> Result<DurationSinceUnixEpoch, D::Error>
173where
174 D: Deserializer<'de>,
175{
176 let datetime_str = String::deserialize(deserializer)?;
178
179 let datetime =
180 DateTime::parse_from_rfc3339(&datetime_str).map_err(|e| de::Error::custom(format!("Invalid datetime format: {e}")))?;
181
182 let datetime_utc = datetime.with_timezone(&Utc);
183
184 let secs = u64::try_from(datetime_utc.timestamp()).map_err(|_| de::Error::custom("Timestamp out of range"))?;
185
186 Ok(DurationSinceUnixEpoch::new(secs, datetime_utc.timestamp_subsec_nanos()))
187}
188
189#[cfg(test)]
190mod tests {
191 use torrust_clock::DurationSinceUnixEpoch;
192
193 use super::*;
194
195 fn updated_at_time() -> DurationSinceUnixEpoch {
197 DurationSinceUnixEpoch::from_secs(1_743_552_000)
198 }
199
200 #[test]
201 fn it_should_have_a_value() {
202 let sample = Sample::new(
203 42,
204 DurationSinceUnixEpoch::from_secs(1_743_552_000),
205 LabelSet::from(vec![("test", "label")]),
206 );
207
208 assert_eq!(sample.value(), &42);
209 }
210
211 #[test]
212 fn it_should_record_the_latest_update_time() {
213 let sample = Sample::new(
214 42,
215 DurationSinceUnixEpoch::from_secs(1_743_552_000),
216 LabelSet::from(vec![("test", "label")]),
217 );
218
219 assert_eq!(sample.recorded_at(), updated_at_time());
220 }
221
222 #[test]
223 fn it_should_include_a_label_set() {
224 let sample = Sample::new(
225 42,
226 DurationSinceUnixEpoch::from_secs(1_743_552_000),
227 LabelSet::from(vec![("test", "label")]),
228 );
229
230 assert_eq!(sample.labels(), &LabelSet::from(vec![("test", "label")]));
231 }
232
233 #[test]
234 fn it_should_expose_measurement() {
235 let time = DurationSinceUnixEpoch::from_secs(1_743_552_000);
236 let sample = Sample::new(42_u32, time, LabelSet::from(vec![("k", "v")]));
237
238 let measurement = sample.measurement();
239
240 assert_eq!(measurement.value(), &42_u32);
241 assert_eq!(measurement.recorded_at(), time);
242 }
243
244 #[test]
245 fn it_should_allow_creating_measurement_directly() {
246 let time = DurationSinceUnixEpoch::from_secs(1_743_552_000);
247 let measurement = Measurement::new(99_u32, time);
248
249 assert_eq!(measurement.value(), &99_u32);
250 assert_eq!(measurement.recorded_at(), time);
251 }
252
253 #[test]
254 fn it_should_allow_converting_sample_into_label_set_and_measurement() {
255 let time = DurationSinceUnixEpoch::from_secs(1_743_552_000);
256 let label_set = LabelSet::from(vec![("env", "prod")]);
257 let sample = Sample::new(7_u32, time, label_set.clone());
258
259 let (labels, meas): (LabelSet, Measurement<u32>) = sample.into();
260
261 assert_eq!(labels, label_set);
262 assert_eq!(meas.value(), &7_u32);
263 assert_eq!(meas.recorded_at(), time);
264 }
265
266 mod for_counter_type_sample {
267 use torrust_clock::DurationSinceUnixEpoch;
268
269 use crate::label::LabelSet;
270 use crate::prometheus::PrometheusSerializable;
271 use crate::sample::tests::updated_at_time;
272 use crate::sample::{Counter, Sample};
273
274 #[test]
275 fn it_should_allow_a_counter_type_value() {
276 let sample = Sample::new(
277 Counter::new(42),
278 DurationSinceUnixEpoch::from_secs(1_743_552_000),
279 LabelSet::from(vec![("label_name", "label vale")]),
280 );
281
282 assert_eq!(sample.value(), &Counter::new(42));
283 }
284
285 #[test]
286 fn it_should_allow_incrementing_the_counter() {
287 let mut sample = Sample::new(Counter::default(), DurationSinceUnixEpoch::default(), LabelSet::default());
288
289 sample.increment(updated_at_time());
290
291 assert_eq!(sample.value(), &Counter::new(1));
292 }
293
294 #[test]
295 fn it_should_record_the_latest_update_time_when_the_counter_is_incremented() {
296 let mut sample = Sample::new(Counter::default(), DurationSinceUnixEpoch::default(), LabelSet::default());
297
298 let time = updated_at_time();
299
300 sample.increment(time);
301
302 assert_eq!(sample.recorded_at(), time);
303 }
304
305 #[test]
306 fn it_should_allow_exporting_to_prometheus_format() {
307 let counter = Counter::new(42);
308
309 let labels = LabelSet::from(vec![("label_name", "label_value"), ("method", "GET")]);
310
311 let sample = Sample::new(counter, DurationSinceUnixEpoch::default(), labels);
312
313 assert_eq!(sample.to_prometheus(), r#"{label_name="label_value",method="GET"} 42"#);
314 }
315
316 #[test]
317 fn it_should_allow_exporting_to_prometheus_format_with_empty_label_set() {
318 let counter = Counter::new(42);
319
320 let sample = Sample::new(counter, DurationSinceUnixEpoch::default(), LabelSet::default());
321
322 assert_eq!(sample.to_prometheus(), " 42");
323 }
324 }
325 mod for_gauge_type_sample {
326 use torrust_clock::DurationSinceUnixEpoch;
327
328 use crate::label::LabelSet;
329 use crate::prometheus::PrometheusSerializable;
330 use crate::sample::tests::updated_at_time;
331 use crate::sample::{Gauge, Sample};
332
333 #[test]
334 fn it_should_allow_a_counter_type_value() {
335 let sample = Sample::new(
336 Gauge::new(42.0),
337 DurationSinceUnixEpoch::from_secs(1_743_552_000),
338 LabelSet::from(vec![("label_name", "label vale")]),
339 );
340
341 assert_eq!(sample.value(), &Gauge::new(42.0));
342 }
343
344 #[test]
345 fn it_should_allow_setting_a_value() {
346 let mut sample = Sample::new(Gauge::default(), DurationSinceUnixEpoch::default(), LabelSet::default());
347
348 sample.set(1.0, updated_at_time());
349
350 assert_eq!(sample.value(), &Gauge::new(1.0));
351 }
352
353 #[test]
354 fn it_should_allow_incrementing_the_value() {
355 let mut sample = Sample::new(Gauge::new(0.0), DurationSinceUnixEpoch::default(), LabelSet::default());
356
357 sample.increment(updated_at_time());
358
359 assert_eq!(sample.value(), &Gauge::new(1.0));
360 }
361
362 #[test]
363 fn it_should_allow_decrementing_the_value() {
364 let mut sample = Sample::new(Gauge::new(1.0), DurationSinceUnixEpoch::default(), LabelSet::default());
365
366 sample.decrement(updated_at_time());
367
368 assert_eq!(sample.value(), &Gauge::new(0.0));
369 }
370
371 #[test]
372 fn it_should_record_the_latest_update_time_when_the_counter_is_incremented() {
373 let mut sample = Sample::new(Gauge::default(), DurationSinceUnixEpoch::default(), LabelSet::default());
374
375 let time = updated_at_time();
376
377 sample.set(1.0, time);
378
379 assert_eq!(sample.recorded_at(), time);
380 }
381
382 #[test]
383 fn it_should_allow_exporting_to_prometheus_format() {
384 let counter = Gauge::new(42.0);
385
386 let labels = LabelSet::from(vec![("label_name", "label_value"), ("method", "GET")]);
387
388 let sample = Sample::new(counter, DurationSinceUnixEpoch::default(), labels);
389
390 assert_eq!(sample.to_prometheus(), r#"{label_name="label_value",method="GET"} 42"#);
391 }
392
393 #[test]
394 fn it_should_allow_exporting_to_prometheus_format_with_empty_label_set() {
395 let gauge = Gauge::new(42.0);
396
397 let sample = Sample::new(gauge, DurationSinceUnixEpoch::default(), LabelSet::default());
398
399 assert_eq!(sample.to_prometheus(), " 42");
400 }
401 }
402
403 mod serialization_to_json {
404 use pretty_assertions::assert_eq;
405 use serde_json::json;
406 use torrust_clock::DurationSinceUnixEpoch;
407
408 use crate::label::LabelSet;
409 use crate::sample::Sample;
410 use crate::sample::tests::updated_at_time;
411
412 #[test]
413 fn test_serialization_round_trip() {
414 let original = Sample::new(42, updated_at_time(), LabelSet::from(vec![("test", "serialization")]));
415
416 let json = serde_json::to_string(&original).unwrap();
417 let deserialized: Sample<i32> = serde_json::from_str(&json).unwrap();
418
419 assert_eq!(original.measurement.value, deserialized.measurement.value);
420 assert_eq!(original.measurement.recorded_at, deserialized.measurement.recorded_at);
421 assert_eq!(original.label_set, deserialized.label_set);
422 }
423
424 #[test]
425 fn test_rfc3339_serialization_format_for_update_time() {
426 let sample = Sample::new(
427 42,
428 DurationSinceUnixEpoch::new(1_743_552_000, 100),
429 LabelSet::from(vec![("label_name", "label value")]),
430 );
431
432 let json = serde_json::to_string(&sample).unwrap();
433
434 let expected_json = r#"
435 {
436 "value": 42,
437 "recorded_at": "2025-04-02T00:00:00.000000100+00:00",
438 "labels": [
439 {
440 "name": "label_name",
441 "value": "label value"
442 }
443 ]
444 }
445 "#;
446
447 assert_eq!(
448 serde_json::from_str::<serde_json::Value>(&json).unwrap(),
449 serde_json::from_str::<serde_json::Value>(expected_json).unwrap()
450 );
451 }
452
453 #[test]
454 fn test_invalid_update_timestamp_serialization() {
455 let timestamp_too_large = DurationSinceUnixEpoch::new(i64::MAX as u64 + 1, 0);
456
457 let sample = Sample::new(42, timestamp_too_large, LabelSet::from(vec![("label_name", "label value")]));
458
459 let result = serde_json::to_string(&sample);
460
461 assert!(result.is_err());
462 assert!(result.unwrap_err().to_string().contains("Timestamp too large"));
463 }
464
465 #[test]
466 fn test_invalid_update_datetime_deserialization() {
467 let invalid_json = json!(
468 r#"
469 {
470 "value": 42,
471 "recorded_at": "1-1-2023T25:00:00Z",
472 "labels": [
473 {
474 "name": "label_name",
475 "value": "label value"
476 }
477 ]
478 }
479 "#
480 );
481
482 let result: Result<DurationSinceUnixEpoch, _> = serde_json::from_value(invalid_json);
483
484 assert!(result.unwrap_err().to_string().contains("invalid type"));
485 }
486
487 #[test]
488 fn test_update_datetime_high_precision_nanoseconds() {
489 let sample = Sample::new(
490 42,
491 DurationSinceUnixEpoch::new(1_743_552_000, 100),
492 LabelSet::from(vec![("label_name", "label value")]),
493 );
494
495 let json = serde_json::to_string(&sample).unwrap();
496
497 let deserialized: Sample<i32> = serde_json::from_str(&json).unwrap();
498
499 assert_eq!(deserialized, sample);
500 }
501
502 #[test]
503 fn test_serialization_round_trip_with_pretty_formatter() {
504 let sample = Sample::new(
507 42,
508 DurationSinceUnixEpoch::new(1_743_552_000, 0),
509 LabelSet::from(vec![("env", "prod")]),
510 );
511
512 let json = serde_json::to_string_pretty(&sample).unwrap();
513 let deserialized: Sample<i32> = serde_json::from_str(&json).unwrap();
514
515 assert_eq!(deserialized, sample);
516 }
517 }
518}