modio_logger_db/
buffer.rs1use crate::types::Metric;
4use async_std::sync::Mutex;
5use std::sync::Arc;
6use tracing::instrument;
7
8type BufMetric = (Metric, TimeStatus);
9type BufMetrics = Vec<BufMetric>;
10
11#[derive(Debug, Clone, Copy)]
12pub(crate) enum TimeStatus {
13 TimeFail,
14 None,
15}
16impl TimeStatus {
17 pub const fn as_str(&self) -> &str {
18 match self {
19 Self::TimeFail => "TIMEFAIL",
20 Self::None => "NONE",
21 }
22 }
23 pub const fn from_bool(timefail: bool) -> Self {
24 if timefail {
25 Self::TimeFail
26 } else {
27 Self::None
28 }
29 }
30}
31
32#[derive(Clone)]
33pub(crate) struct VecBuffer {
34 data: Arc<Mutex<BufMetrics>>,
35}
36impl std::fmt::Debug for VecBuffer {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 let mut d = f.debug_struct("VecBuffer");
41 match self.data.try_lock() {
42 Some(inner) => {
43 d.field("len", &inner.len());
44 }
45 _ => {
46 d.field("len", &"<Locked>");
47 }
48 }
49 d.finish()
50 }
51}
52
53fn metric_cmp(a: &BufMetric, b: &BufMetric) -> std::cmp::Ordering {
54 a.0.time.partial_cmp(&b.0.time).expect("NaN or Inf time")
55}
56
57impl VecBuffer {
58 pub fn new() -> Self {
59 let inner = Vec::with_capacity(100);
60 let data = Arc::new(Mutex::new(inner));
61 Self { data }
62 }
63
64 #[instrument(skip_all)]
65 pub async fn add_metric(&self, metric: Metric, status: TimeStatus) {
66 let pair = (metric, status);
67 {
68 let inner = &mut self.data.lock().await;
69 inner.push(pair);
70 }
71 }
72
73 #[instrument(skip_all)]
74 pub async fn get_metric(&self, name: &str) -> Option<Metric> {
75 let res = {
76 let inner = &self.data.lock().await;
77 inner
78 .iter()
79 .filter(|m| m.0.name == name) .max_by(|a, b| metric_cmp(a, b)) .map(|m| m.0.clone()) };
83 res
84 }
85
86 #[instrument(skip_all)]
87 pub fn count(&self) -> Option<usize> {
88 let res = {
91 let inner = &self.data.try_lock()?;
92 inner.len()
93 };
94 Some(res)
95 }
96
97 #[instrument(skip_all)]
99 pub fn oldest(&self) -> Option<f64> {
100 let res = {
101 let inner = &self.data.try_lock()?;
102 inner
103 .iter()
104 .map(|m| m.0.time)
105 .reduce(f64::min)
106 .map_or(0.0, |ts| ts)
107 };
108 Some(res)
109 }
110
111 #[instrument(skip_all)]
112 pub async fn consume_metrics(&self) -> BufMetrics {
113 {
114 let inner = &mut self.data.lock().await;
115 inner.drain(..).collect()
116 }
117 }
118
119 #[instrument(skip_all)]
120 pub async fn add_metrics(&self, mut data: BufMetrics) {
121 data.sort_unstable_by(metric_cmp);
122 {
123 let inner = &mut self.data.lock().await;
124 inner.append(&mut data);
125 inner.sort_unstable_by(metric_cmp);
127 }
128 }
129}
130
131#[must_use]
132pub fn inixtime() -> i64 {
133 use std::time::SystemTime;
134 #[allow(clippy::cast_possible_wrap)]
136 SystemTime::now()
137 .duration_since(SystemTime::UNIX_EPOCH)
138 .map_or(0, |n| n.as_secs() as i64)
139}
140
141#[must_use]
142pub fn unixtime() -> u64 {
143 use std::time::SystemTime;
144 SystemTime::now()
145 .duration_since(SystemTime::UNIX_EPOCH)
146 .map_or(0, |n| n.as_secs())
147}
148
149#[must_use]
150pub fn fxtime() -> f64 {
151 use std::time::SystemTime;
152 SystemTime::now()
154 .duration_since(SystemTime::UNIX_EPOCH)
155 .map_or(0.0, |n| n.as_secs_f64())
156}
157
158#[must_use]
159pub fn fxtime_ms() -> f64 {
161 use std::time::SystemTime;
162 #[allow(clippy::cast_precision_loss)]
163 SystemTime::now()
164 .duration_since(SystemTime::UNIX_EPOCH)
165 .map_or(0.0, |n| {
166 (n.as_secs() as f64) + (f64::from(n.subsec_millis()) / 1_000.0)
167 })
168}
169
170#[cfg(test)]
171mod tests {
172 use super::*;
173 use std::error::Error;
174 use test_log::test;
175 use timeout_macro::timeouttest;
176
177 fn mtrc(name: &str, value: &str) -> Metric {
178 Metric {
179 name: name.to_string(),
180 value: value.to_string(),
181 time: fxtime(),
182 }
183 }
184
185 #[test(timeouttest)]
186 async fn test_insert_remove() -> Result<(), Box<dyn Error>> {
187 let ds = VecBuffer::new();
188 do_insert_remove(ds).await?;
189
190 Ok(())
191 }
192
193 async fn do_insert_remove(ds: VecBuffer) -> Result<(), Box<dyn Error>> {
194 let m = mtrc("test.case", "test.value");
195 ds.add_metric(m, TimeStatus::None).await;
196 let m = mtrc("test.case", "test.value");
197 ds.add_metric(m, TimeStatus::TimeFail).await;
198 assert_eq!(2, ds.count().expect("Locked?"));
200
201 let drain = ds.consume_metrics().await;
203 assert_eq!(drain.len(), 2);
204
205 assert_eq!(0, ds.count().expect("Locked?"));
207
208 let drain = ds.consume_metrics().await;
210 assert_eq!(drain.len(), 0);
211 Ok(())
212 }
213
214 #[test(timeouttest)]
215 async fn test_consume_append() -> Result<(), Box<dyn Error>> {
216 let ds = VecBuffer::new();
217 let m = mtrc("test.case", "test.value");
218 ds.add_metric(m, TimeStatus::None).await;
219 let m = mtrc("test.case", "test.value");
220 ds.add_metric(m, TimeStatus::TimeFail).await;
221 assert_eq!(2, ds.count().expect("Locked?"));
223
224 let drain = ds.consume_metrics().await;
226 assert_eq!(drain.len(), 2);
227
228 assert_eq!(0, ds.count().expect("Locked?"));
230
231 ds.add_metrics(drain).await;
233
234 assert_eq!(2, ds.count().expect("Locked?"));
236 Ok(())
237 }
238
239 #[test(timeouttest)]
240 async fn test_keys() -> Result<(), Box<dyn Error>> {
241 let ds = VecBuffer::new();
242 do_test_keys(ds).await?;
243 Ok(())
244 }
245
246 async fn do_test_keys(ds: VecBuffer) -> Result<(), Box<dyn Error>> {
247 let m = Metric {
248 name: "test.case.one".to_string(),
249 value: "null".to_string(),
250 time: 1234.0,
251 };
252 ds.add_metric(m, TimeStatus::None).await;
253 let m = mtrc("test.case.two", "test.value");
254 ds.add_metric(m, TimeStatus::TimeFail).await;
255
256 assert!(
258 ds.get_metric("test.case.one").await.is_some(),
259 "Test case one should exist"
260 );
261 assert!(
262 ds.get_metric("test.case.two").await.is_some(),
263 "Test case two should exist"
264 );
265 assert!(
267 ds.get_metric("test.case.three").await.is_none(),
268 "Three should not exist"
269 );
270 assert_eq!(
271 1234.0,
272 ds.oldest().expect("Locked?"),
273 "1234 should be the oldest value"
274 );
275 Ok(())
276 }
277}