Skip to main content

torrust_metrics/
sample.rs

1use chrono::{DateTime, Utc};
2use serde::{Deserialize, Deserializer, Serialize, Serializer, de};
3use torrust_clock::DurationSinceUnixEpoch;
4
5use super::counter::Counter;
6use super::gauge::Gauge;
7use super::label::LabelSet;
8use super::prometheus::PrometheusSerializable;
9
10#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
11pub struct Sample<T> {
12    #[serde(flatten)]
13    measurement: Measurement<T>,
14
15    #[serde(rename = "labels")]
16    label_set: LabelSet,
17}
18
19impl<T> Sample<T> {
20    #[must_use]
21    pub fn new(value: T, recorded_at: DurationSinceUnixEpoch, label_set: LabelSet) -> Self {
22        let data = Measurement { value, recorded_at };
23
24        Self {
25            measurement: data,
26            label_set,
27        }
28    }
29
30    #[must_use]
31    pub fn measurement(&self) -> &Measurement<T> {
32        &self.measurement
33    }
34
35    #[must_use]
36    pub fn value(&self) -> &T {
37        &self.measurement.value
38    }
39
40    #[must_use]
41    pub fn recorded_at(&self) -> DurationSinceUnixEpoch {
42        self.measurement.recorded_at
43    }
44
45    #[must_use]
46    pub fn labels(&self) -> &LabelSet {
47        &self.label_set
48    }
49}
50
51impl<T: PrometheusSerializable> PrometheusSerializable for Sample<T> {
52    fn to_prometheus(&self) -> String {
53        if self.label_set.is_empty() {
54            format!(" {}", self.measurement.to_prometheus())
55        } else {
56            format!("{} {}", self.label_set.to_prometheus(), self.measurement.to_prometheus())
57        }
58    }
59}
60
61impl Sample<Counter> {
62    pub fn increment(&mut self, time: DurationSinceUnixEpoch) {
63        self.measurement.increment(time);
64    }
65}
66
67impl Sample<Gauge> {
68    pub fn set(&mut self, value: f64, time: DurationSinceUnixEpoch) {
69        self.measurement.set(value, time);
70    }
71
72    pub fn increment(&mut self, time: DurationSinceUnixEpoch) {
73        self.measurement.increment(time);
74    }
75
76    pub fn decrement(&mut self, time: DurationSinceUnixEpoch) {
77        self.measurement.decrement(time);
78    }
79}
80
81#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
82pub struct Measurement<T> {
83    /// The value of the sample.
84    value: T,
85
86    /// The time when the sample was last updated.
87    #[serde(serialize_with = "serialize_duration", deserialize_with = "deserialize_duration")]
88    recorded_at: DurationSinceUnixEpoch,
89}
90
91impl<T> Measurement<T> {
92    #[must_use]
93    pub fn new(value: T, recorded_at: DurationSinceUnixEpoch) -> Self {
94        Self { value, recorded_at }
95    }
96
97    #[must_use]
98    pub fn value(&self) -> &T {
99        &self.value
100    }
101
102    #[must_use]
103    pub fn recorded_at(&self) -> DurationSinceUnixEpoch {
104        self.recorded_at
105    }
106
107    fn set_recorded_at(&mut self, time: DurationSinceUnixEpoch) {
108        self.recorded_at = time;
109    }
110}
111
112impl<T> From<Sample<T>> for (LabelSet, Measurement<T>) {
113    fn from(sample: Sample<T>) -> Self {
114        (sample.label_set, sample.measurement)
115    }
116}
117
118impl<T: PrometheusSerializable> PrometheusSerializable for Measurement<T> {
119    fn to_prometheus(&self) -> String {
120        self.value.to_prometheus()
121    }
122}
123
124impl Measurement<Counter> {
125    pub fn increment(&mut self, time: DurationSinceUnixEpoch) {
126        self.value.increment(1);
127        self.set_recorded_at(time);
128    }
129
130    pub fn absolute(&mut self, value: u64, time: DurationSinceUnixEpoch) {
131        self.value.absolute(value);
132        self.set_recorded_at(time);
133    }
134}
135
136impl Measurement<Gauge> {
137    pub fn set(&mut self, value: f64, time: DurationSinceUnixEpoch) {
138        self.value.set(value);
139        self.set_recorded_at(time);
140    }
141
142    pub fn increment(&mut self, time: DurationSinceUnixEpoch) {
143        self.value.increment(1.0);
144        self.set_recorded_at(time);
145    }
146
147    pub fn decrement(&mut self, time: DurationSinceUnixEpoch) {
148        self.value.decrement(1.0);
149        self.set_recorded_at(time);
150    }
151}
152
153/// Serializes the `recorded_at` field as a string in ISO 8601 format (RFC 3339).
154///
155/// # Errors
156///
157/// Returns an error if:
158/// - The conversion from `u64` to `i64` fails.
159/// - The timestamp is invalid.
160fn serialize_duration<S>(duration: &DurationSinceUnixEpoch, serializer: S) -> Result<S::Ok, S::Error>
161where
162    S: Serializer,
163{
164    let secs = i64::try_from(duration.as_secs()).map_err(|_| serde::ser::Error::custom("Timestamp too large"))?;
165    let nanos = duration.subsec_nanos();
166
167    let datetime = DateTime::from_timestamp(secs, nanos).ok_or_else(|| serde::ser::Error::custom("Invalid timestamp"))?;
168
169    serializer.serialize_str(&datetime.to_rfc3339()) // Serializes as ISO 8601 (RFC 3339)
170}
171
172fn deserialize_duration<'de, D>(deserializer: D) -> Result<DurationSinceUnixEpoch, D::Error>
173where
174    D: Deserializer<'de>,
175{
176    // Deserialize theISO 8601 (RFC 3339) formatted string
177    let datetime_str = String::deserialize(deserializer)?;
178
179    let datetime =
180        DateTime::parse_from_rfc3339(&datetime_str).map_err(|e| de::Error::custom(format!("Invalid datetime format: {e}")))?;
181
182    let datetime_utc = datetime.with_timezone(&Utc);
183
184    let secs = u64::try_from(datetime_utc.timestamp()).map_err(|_| de::Error::custom("Timestamp out of range"))?;
185
186    Ok(DurationSinceUnixEpoch::new(secs, datetime_utc.timestamp_subsec_nanos()))
187}
188
189#[cfg(test)]
190mod tests {
191    use torrust_clock::DurationSinceUnixEpoch;
192
193    use super::*;
194
195    // Helper function to create a sample update time.
196    fn updated_at_time() -> DurationSinceUnixEpoch {
197        DurationSinceUnixEpoch::from_secs(1_743_552_000)
198    }
199
200    #[test]
201    fn it_should_have_a_value() {
202        let sample = Sample::new(
203            42,
204            DurationSinceUnixEpoch::from_secs(1_743_552_000),
205            LabelSet::from(vec![("test", "label")]),
206        );
207
208        assert_eq!(sample.value(), &42);
209    }
210
211    #[test]
212    fn it_should_record_the_latest_update_time() {
213        let sample = Sample::new(
214            42,
215            DurationSinceUnixEpoch::from_secs(1_743_552_000),
216            LabelSet::from(vec![("test", "label")]),
217        );
218
219        assert_eq!(sample.recorded_at(), updated_at_time());
220    }
221
222    #[test]
223    fn it_should_include_a_label_set() {
224        let sample = Sample::new(
225            42,
226            DurationSinceUnixEpoch::from_secs(1_743_552_000),
227            LabelSet::from(vec![("test", "label")]),
228        );
229
230        assert_eq!(sample.labels(), &LabelSet::from(vec![("test", "label")]));
231    }
232
233    #[test]
234    fn it_should_expose_measurement() {
235        let time = DurationSinceUnixEpoch::from_secs(1_743_552_000);
236        let sample = Sample::new(42_u32, time, LabelSet::from(vec![("k", "v")]));
237
238        let measurement = sample.measurement();
239
240        assert_eq!(measurement.value(), &42_u32);
241        assert_eq!(measurement.recorded_at(), time);
242    }
243
244    #[test]
245    fn it_should_allow_creating_measurement_directly() {
246        let time = DurationSinceUnixEpoch::from_secs(1_743_552_000);
247        let measurement = Measurement::new(99_u32, time);
248
249        assert_eq!(measurement.value(), &99_u32);
250        assert_eq!(measurement.recorded_at(), time);
251    }
252
253    #[test]
254    fn it_should_allow_converting_sample_into_label_set_and_measurement() {
255        let time = DurationSinceUnixEpoch::from_secs(1_743_552_000);
256        let label_set = LabelSet::from(vec![("env", "prod")]);
257        let sample = Sample::new(7_u32, time, label_set.clone());
258
259        let (labels, meas): (LabelSet, Measurement<u32>) = sample.into();
260
261        assert_eq!(labels, label_set);
262        assert_eq!(meas.value(), &7_u32);
263        assert_eq!(meas.recorded_at(), time);
264    }
265
266    mod for_counter_type_sample {
267        use torrust_clock::DurationSinceUnixEpoch;
268
269        use crate::label::LabelSet;
270        use crate::prometheus::PrometheusSerializable;
271        use crate::sample::tests::updated_at_time;
272        use crate::sample::{Counter, Sample};
273
274        #[test]
275        fn it_should_allow_a_counter_type_value() {
276            let sample = Sample::new(
277                Counter::new(42),
278                DurationSinceUnixEpoch::from_secs(1_743_552_000),
279                LabelSet::from(vec![("label_name", "label vale")]),
280            );
281
282            assert_eq!(sample.value(), &Counter::new(42));
283        }
284
285        #[test]
286        fn it_should_allow_incrementing_the_counter() {
287            let mut sample = Sample::new(Counter::default(), DurationSinceUnixEpoch::default(), LabelSet::default());
288
289            sample.increment(updated_at_time());
290
291            assert_eq!(sample.value(), &Counter::new(1));
292        }
293
294        #[test]
295        fn it_should_record_the_latest_update_time_when_the_counter_is_incremented() {
296            let mut sample = Sample::new(Counter::default(), DurationSinceUnixEpoch::default(), LabelSet::default());
297
298            let time = updated_at_time();
299
300            sample.increment(time);
301
302            assert_eq!(sample.recorded_at(), time);
303        }
304
305        #[test]
306        fn it_should_allow_exporting_to_prometheus_format() {
307            let counter = Counter::new(42);
308
309            let labels = LabelSet::from(vec![("label_name", "label_value"), ("method", "GET")]);
310
311            let sample = Sample::new(counter, DurationSinceUnixEpoch::default(), labels);
312
313            assert_eq!(sample.to_prometheus(), r#"{label_name="label_value",method="GET"} 42"#);
314        }
315
316        #[test]
317        fn it_should_allow_exporting_to_prometheus_format_with_empty_label_set() {
318            let counter = Counter::new(42);
319
320            let sample = Sample::new(counter, DurationSinceUnixEpoch::default(), LabelSet::default());
321
322            assert_eq!(sample.to_prometheus(), " 42");
323        }
324    }
325    mod for_gauge_type_sample {
326        use torrust_clock::DurationSinceUnixEpoch;
327
328        use crate::label::LabelSet;
329        use crate::prometheus::PrometheusSerializable;
330        use crate::sample::tests::updated_at_time;
331        use crate::sample::{Gauge, Sample};
332
333        #[test]
334        fn it_should_allow_a_counter_type_value() {
335            let sample = Sample::new(
336                Gauge::new(42.0),
337                DurationSinceUnixEpoch::from_secs(1_743_552_000),
338                LabelSet::from(vec![("label_name", "label vale")]),
339            );
340
341            assert_eq!(sample.value(), &Gauge::new(42.0));
342        }
343
344        #[test]
345        fn it_should_allow_setting_a_value() {
346            let mut sample = Sample::new(Gauge::default(), DurationSinceUnixEpoch::default(), LabelSet::default());
347
348            sample.set(1.0, updated_at_time());
349
350            assert_eq!(sample.value(), &Gauge::new(1.0));
351        }
352
353        #[test]
354        fn it_should_allow_incrementing_the_value() {
355            let mut sample = Sample::new(Gauge::new(0.0), DurationSinceUnixEpoch::default(), LabelSet::default());
356
357            sample.increment(updated_at_time());
358
359            assert_eq!(sample.value(), &Gauge::new(1.0));
360        }
361
362        #[test]
363        fn it_should_allow_decrementing_the_value() {
364            let mut sample = Sample::new(Gauge::new(1.0), DurationSinceUnixEpoch::default(), LabelSet::default());
365
366            sample.decrement(updated_at_time());
367
368            assert_eq!(sample.value(), &Gauge::new(0.0));
369        }
370
371        #[test]
372        fn it_should_record_the_latest_update_time_when_the_counter_is_incremented() {
373            let mut sample = Sample::new(Gauge::default(), DurationSinceUnixEpoch::default(), LabelSet::default());
374
375            let time = updated_at_time();
376
377            sample.set(1.0, time);
378
379            assert_eq!(sample.recorded_at(), time);
380        }
381
382        #[test]
383        fn it_should_allow_exporting_to_prometheus_format() {
384            let counter = Gauge::new(42.0);
385
386            let labels = LabelSet::from(vec![("label_name", "label_value"), ("method", "GET")]);
387
388            let sample = Sample::new(counter, DurationSinceUnixEpoch::default(), labels);
389
390            assert_eq!(sample.to_prometheus(), r#"{label_name="label_value",method="GET"} 42"#);
391        }
392
393        #[test]
394        fn it_should_allow_exporting_to_prometheus_format_with_empty_label_set() {
395            let gauge = Gauge::new(42.0);
396
397            let sample = Sample::new(gauge, DurationSinceUnixEpoch::default(), LabelSet::default());
398
399            assert_eq!(sample.to_prometheus(), " 42");
400        }
401    }
402
403    mod serialization_to_json {
404        use pretty_assertions::assert_eq;
405        use serde_json::json;
406        use torrust_clock::DurationSinceUnixEpoch;
407
408        use crate::label::LabelSet;
409        use crate::sample::Sample;
410        use crate::sample::tests::updated_at_time;
411
412        #[test]
413        fn test_serialization_round_trip() {
414            let original = Sample::new(42, updated_at_time(), LabelSet::from(vec![("test", "serialization")]));
415
416            let json = serde_json::to_string(&original).unwrap();
417            let deserialized: Sample<i32> = serde_json::from_str(&json).unwrap();
418
419            assert_eq!(original.measurement.value, deserialized.measurement.value);
420            assert_eq!(original.measurement.recorded_at, deserialized.measurement.recorded_at);
421            assert_eq!(original.label_set, deserialized.label_set);
422        }
423
424        #[test]
425        fn test_rfc3339_serialization_format_for_update_time() {
426            let sample = Sample::new(
427                42,
428                DurationSinceUnixEpoch::new(1_743_552_000, 100),
429                LabelSet::from(vec![("label_name", "label value")]),
430            );
431
432            let json = serde_json::to_string(&sample).unwrap();
433
434            let expected_json = r#"
435                {
436                    "value": 42,
437                    "recorded_at": "2025-04-02T00:00:00.000000100+00:00",
438                    "labels": [
439                        {
440                        "name": "label_name",
441                        "value": "label value"
442                        }
443                    ]
444                }
445            "#;
446
447            assert_eq!(
448                serde_json::from_str::<serde_json::Value>(&json).unwrap(),
449                serde_json::from_str::<serde_json::Value>(expected_json).unwrap()
450            );
451        }
452
453        #[test]
454        fn test_invalid_update_timestamp_serialization() {
455            let timestamp_too_large = DurationSinceUnixEpoch::new(i64::MAX as u64 + 1, 0);
456
457            let sample = Sample::new(42, timestamp_too_large, LabelSet::from(vec![("label_name", "label value")]));
458
459            let result = serde_json::to_string(&sample);
460
461            assert!(result.is_err());
462            assert!(result.unwrap_err().to_string().contains("Timestamp too large"));
463        }
464
465        #[test]
466        fn test_invalid_update_datetime_deserialization() {
467            let invalid_json = json!(
468                r#"
469                {
470                    "value": 42,
471                    "recorded_at": "1-1-2023T25:00:00Z",
472                    "labels": [
473                        {
474                        "name": "label_name",
475                        "value": "label value"
476                        }
477                    ]
478                }
479                "#
480            );
481
482            let result: Result<DurationSinceUnixEpoch, _> = serde_json::from_value(invalid_json);
483
484            assert!(result.unwrap_err().to_string().contains("invalid type"));
485        }
486
487        #[test]
488        fn test_update_datetime_high_precision_nanoseconds() {
489            let sample = Sample::new(
490                42,
491                DurationSinceUnixEpoch::new(1_743_552_000, 100),
492                LabelSet::from(vec![("label_name", "label value")]),
493            );
494
495            let json = serde_json::to_string(&sample).unwrap();
496
497            let deserialized: Sample<i32> = serde_json::from_str(&json).unwrap();
498
499            assert_eq!(deserialized, sample);
500        }
501
502        #[test]
503        fn test_serialization_round_trip_with_pretty_formatter() {
504            // Use serde_json::to_string_pretty to exercise the PrettyFormatter
505            // monomorphisation of serialize_duration.
506            let sample = Sample::new(
507                42,
508                DurationSinceUnixEpoch::new(1_743_552_000, 0),
509                LabelSet::from(vec![("env", "prod")]),
510            );
511
512            let json = serde_json::to_string_pretty(&sample).unwrap();
513            let deserialized: Sample<i32> = serde_json::from_str(&json).unwrap();
514
515            assert_eq!(deserialized, sample);
516        }
517    }
518}