modio_logger_db/
buffer.rs

1// Author: D.S. Ljungmark <spider@skuggor.se>, Modio AB
2// SPDX-License-Identifier: AGPL-3.0-or-later
3use crate::types::Metric;
4use async_std::sync::Mutex;
5use std::sync::Arc;
6use tracing::instrument;
7
8type BufMetric = (Metric, TimeStatus);
9type BufMetrics = Vec<BufMetric>;
10
11#[derive(Debug, Clone, Copy)]
12pub(crate) enum TimeStatus {
13    TimeFail,
14    None,
15}
16impl TimeStatus {
17    pub const fn as_str(&self) -> &str {
18        match self {
19            Self::TimeFail => "TIMEFAIL",
20            Self::None => "NONE",
21        }
22    }
23    pub const fn from_bool(timefail: bool) -> Self {
24        if timefail {
25            Self::TimeFail
26        } else {
27            Self::None
28        }
29    }
30}
31
32#[derive(Clone)]
33pub(crate) struct VecBuffer {
34    data: Arc<Mutex<BufMetrics>>,
35}
36// To avoid printing out _all_ the values in a buffer, or the spammy Arc<Mutex> state, we implement
37// a simplified Debug.
38impl std::fmt::Debug for VecBuffer {
39    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40        let mut d = f.debug_struct("VecBuffer");
41        match self.data.try_lock() {
42            Some(inner) => {
43                d.field("len", &inner.len());
44            }
45            _ => {
46                d.field("len", &"<Locked>");
47            }
48        }
49        d.finish()
50    }
51}
52
53fn metric_cmp(a: &BufMetric, b: &BufMetric) -> std::cmp::Ordering {
54    a.0.time.partial_cmp(&b.0.time).expect("NaN or Inf time")
55}
56
57impl VecBuffer {
58    pub fn new() -> Self {
59        let inner = Vec::with_capacity(100);
60        let data = Arc::new(Mutex::new(inner));
61        Self { data }
62    }
63
64    #[instrument(skip_all)]
65    pub async fn add_metric(&self, metric: Metric, status: TimeStatus) {
66        let pair = (metric, status);
67        {
68            let inner = &mut self.data.lock().await;
69            inner.push(pair);
70        }
71    }
72
73    #[instrument(skip_all)]
74    pub async fn get_metric(&self, name: &str) -> Option<Metric> {
75        let res = {
76            let inner = &self.data.lock().await;
77            inner
78                .iter()
79                .filter(|m| m.0.name == name) // Only matching keys
80                .max_by(|a, b| metric_cmp(a, b)) // Order by timestamp
81                .map(|m| m.0.clone()) // Copy the data out
82        };
83        res
84    }
85
86    #[instrument(skip_all)]
87    pub fn count(&self) -> Option<usize> {
88        // async_std::{Mutex,RwLock}::try_(lock|read|write)() returns an Option, bubble "None" out as if nothing
89        // was found.
90        let res = {
91            let inner = &self.data.try_lock()?;
92            inner.len()
93        };
94        Some(res)
95    }
96
97    // Oldest does not wait, if it is blocked we return None
98    #[instrument(skip_all)]
99    pub fn oldest(&self) -> Option<f64> {
100        let res = {
101            let inner = &self.data.try_lock()?;
102            inner
103                .iter()
104                .map(|m| m.0.time)
105                .reduce(f64::min)
106                .map_or(0.0, |ts| ts)
107        };
108        Some(res)
109    }
110
111    #[instrument(skip_all)]
112    pub async fn consume_metrics(&self) -> BufMetrics {
113        {
114            let inner = &mut self.data.lock().await;
115            inner.drain(..).collect()
116        }
117    }
118
119    #[instrument(skip_all)]
120    pub async fn add_metrics(&self, mut data: BufMetrics) {
121        data.sort_unstable_by(metric_cmp);
122        {
123            let inner = &mut self.data.lock().await;
124            inner.append(&mut data);
125            // We want the buffer to be sorted by time, with freshest at the end.
126            inner.sort_unstable_by(metric_cmp);
127        }
128    }
129}
130
131#[must_use]
132pub fn inixtime() -> i64 {
133    use std::time::SystemTime;
134    // If time wraps we have a bigger problem...
135    #[allow(clippy::cast_possible_wrap)]
136    SystemTime::now()
137        .duration_since(SystemTime::UNIX_EPOCH)
138        .map_or(0, |n| n.as_secs() as i64)
139}
140
141#[must_use]
142pub fn unixtime() -> u64 {
143    use std::time::SystemTime;
144    SystemTime::now()
145        .duration_since(SystemTime::UNIX_EPOCH)
146        .map_or(0, |n| n.as_secs())
147}
148
149#[must_use]
150pub fn fxtime() -> f64 {
151    use std::time::SystemTime;
152    // If time wraps we have a bigger problem than clock bein 0.0
153    SystemTime::now()
154        .duration_since(SystemTime::UNIX_EPOCH)
155        .map_or(0.0, |n| n.as_secs_f64())
156}
157
158#[must_use]
159/// round time away from too many digits of precision
160pub fn fxtime_ms() -> f64 {
161    use std::time::SystemTime;
162    #[allow(clippy::cast_precision_loss)]
163    SystemTime::now()
164        .duration_since(SystemTime::UNIX_EPOCH)
165        .map_or(0.0, |n| {
166            (n.as_secs() as f64) + (f64::from(n.subsec_millis()) / 1_000.0)
167        })
168}
169
170#[cfg(test)]
171mod tests {
172    use super::*;
173    use std::error::Error;
174    use test_log::test;
175    use timeout_macro::timeouttest;
176
177    fn mtrc(name: &str, value: &str) -> Metric {
178        Metric {
179            name: name.to_string(),
180            value: value.to_string(),
181            time: fxtime(),
182        }
183    }
184
185    #[test(timeouttest)]
186    async fn test_insert_remove() -> Result<(), Box<dyn Error>> {
187        let ds = VecBuffer::new();
188        do_insert_remove(ds).await?;
189
190        Ok(())
191    }
192
193    async fn do_insert_remove(ds: VecBuffer) -> Result<(), Box<dyn Error>> {
194        let m = mtrc("test.case", "test.value");
195        ds.add_metric(m, TimeStatus::None).await;
196        let m = mtrc("test.case", "test.value");
197        ds.add_metric(m, TimeStatus::TimeFail).await;
198        // Should have two metrcis
199        assert_eq!(2, ds.count().expect("Locked?"));
200
201        // Drain should have two metrics too
202        let drain = ds.consume_metrics().await;
203        assert_eq!(drain.len(), 2);
204
205        // Should have zero metrics
206        assert_eq!(0, ds.count().expect("Locked?"));
207
208        // Should be zero when draining too
209        let drain = ds.consume_metrics().await;
210        assert_eq!(drain.len(), 0);
211        Ok(())
212    }
213
214    #[test(timeouttest)]
215    async fn test_consume_append() -> Result<(), Box<dyn Error>> {
216        let ds = VecBuffer::new();
217        let m = mtrc("test.case", "test.value");
218        ds.add_metric(m, TimeStatus::None).await;
219        let m = mtrc("test.case", "test.value");
220        ds.add_metric(m, TimeStatus::TimeFail).await;
221        // Should have two metrcis
222        assert_eq!(2, ds.count().expect("Locked?"));
223
224        // Drain should have two metrics too
225        let drain = ds.consume_metrics().await;
226        assert_eq!(drain.len(), 2);
227
228        // Should have zero metrics
229        assert_eq!(0, ds.count().expect("Locked?"));
230
231        // Re-add metrics
232        ds.add_metrics(drain).await;
233
234        // Should have two metrics again
235        assert_eq!(2, ds.count().expect("Locked?"));
236        Ok(())
237    }
238
239    #[test(timeouttest)]
240    async fn test_keys() -> Result<(), Box<dyn Error>> {
241        let ds = VecBuffer::new();
242        do_test_keys(ds).await?;
243        Ok(())
244    }
245
246    async fn do_test_keys(ds: VecBuffer) -> Result<(), Box<dyn Error>> {
247        let m = Metric {
248            name: "test.case.one".to_string(),
249            value: "null".to_string(),
250            time: 1234.0,
251        };
252        ds.add_metric(m, TimeStatus::None).await;
253        let m = mtrc("test.case.two", "test.value");
254        ds.add_metric(m, TimeStatus::TimeFail).await;
255
256        // Should exist
257        assert!(
258            ds.get_metric("test.case.one").await.is_some(),
259            "Test case one should exist"
260        );
261        assert!(
262            ds.get_metric("test.case.two").await.is_some(),
263            "Test case two should exist"
264        );
265        // Should not exist
266        assert!(
267            ds.get_metric("test.case.three").await.is_none(),
268            "Three should not exist"
269        );
270        assert_eq!(
271            1234.0,
272            ds.oldest().expect("Locked?"),
273            "1234 should be the oldest value"
274        );
275        Ok(())
276    }
277}