use log::error;
use crate::types::Metric;
use async_std::sync::{Arc, Mutex};
#[derive(thiserror::Error, Debug)]
pub enum VecError {
#[error("Mutex is poisoned")]
Mutex,
}
use async_trait::async_trait;
type BufMetric = (Metric, String);
type BufMetrics = Vec<BufMetric>;
#[async_trait]
pub(crate) trait Buffer {
type Error: std::error::Error + Send + 'static;
type Store: Send + 'static;
async fn new() -> Result<Self::Store, Self::Error>;
async fn add_metric(&self, metric: Metric, status: &str) -> Result<(), Self::Error>;
async fn consume_metrics(&self) -> Result<BufMetrics, Self::Error>;
async fn has_name(&self, name: &str) -> Result<bool, Self::Error>;
async fn count(&self) -> Result<usize, Self::Error>;
async fn oldest(&self) -> Result<i64, Self::Error>;
}
pub(crate) struct VecBuffer {
data: Arc<Mutex<BufMetrics>>,
}
#[async_trait]
impl Buffer for VecBuffer {
type Error = VecError;
type Store = VecBuffer;
async fn new() -> Result<Self, Self::Error> {
let inner: BufMetrics = Vec::with_capacity(100);
let data = Arc::new(Mutex::new(inner));
Ok(Self { data })
}
async fn add_metric(&self, metric: Metric, status: &str) -> Result<(), Self::Error> {
let pair = (metric, status.to_string());
{
let inner = &mut self.data.lock().await;
inner.push(pair);
}
Ok(())
}
async fn has_name(&self, name: &str) -> Result<bool, Self::Error> {
let res = {
let inner = &self.data.lock().await;
inner.iter().any(|m| m.0.name == name)
};
Ok(res)
}
async fn count(&self) -> Result<usize, Self::Error> {
let res = {
let inner = &self.data.lock().await;
inner.len()
};
Ok(res)
}
async fn oldest(&self) -> Result<i64, Self::Error> {
let value = {
let inner = &self.data.lock().await;
let eldest = inner.iter().min_by_key(|m| m.0.time);
if let Some(tup) = eldest {
tup.0.time
} else {
0
}
};
Ok(value)
}
async fn consume_metrics(&self) -> Result<BufMetrics, Self::Error> {
let res = {
let inner = &mut self.data.lock().await;
inner.drain(..).collect()
};
Ok(res)
}
}
#[must_use]
pub fn inixtime() -> i64 {
use std::time::SystemTime;
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
#[allow(clippy::cast_possible_wrap)]
Ok(n) => n.as_secs() as i64,
Err(_) => 0,
}
}
#[must_use]
pub fn unixtime() -> u64 {
use std::time::SystemTime;
match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
Ok(n) => n.as_secs(),
Err(_) => 0,
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::error::Error;
fn mtrc(name: &str, value: &str) -> Metric {
Metric {
name: name.to_string(),
value: value.to_string(),
time: inixtime(),
}
}
#[async_std::test]
async fn test_insert_remove() -> Result<(), Box<dyn Error>> {
let ds = VecBuffer::new().await?;
do_insert_remove(ds).await?;
Ok(())
}
async fn do_insert_remove<T>(ds: T) -> Result<(), Box<dyn Error>>
where
T: Buffer,
{
let m = mtrc("test.case", "test.value");
ds.add_metric(m, "NONE").await?;
let m = mtrc("test.case", "test.value");
ds.add_metric(m, "TIMEFAIL").await?;
assert_eq!(2, ds.count().await?);
let drain = ds.consume_metrics().await?;
assert_eq!(drain.len(), 2);
assert_eq!(0, ds.count().await?);
let drain = ds.consume_metrics().await?;
assert_eq!(drain.len(), 0);
Ok(())
}
#[async_std::test]
async fn test_keys() -> Result<(), Box<dyn Error>> {
let ds = VecBuffer::new().await?;
println!("Testing vectors");
do_test_keys(ds).await?;
Ok(())
}
async fn do_test_keys<Store>(ds: Store) -> Result<(), Box<dyn Error>>
where
Store: Buffer,
{
let m = Metric {
name: "test.case.one".to_string(),
value: "null".to_string(),
time: 1234,
};
ds.add_metric(m, "NONE").await?;
let m = mtrc("test.case.two", "test.value");
ds.add_metric(m, "TIMEFAIL").await?;
assert!(
ds.has_name("test.case.one").await?,
"Test case one should exist"
);
assert!(
ds.has_name("test.case.two").await?,
"Test case two should exist"
);
assert!(
!ds.has_name("test.case.three").await?,
"Three should not exist"
);
assert_eq!(1234, ds.oldest().await?, "1234 should be the oldest value");
Ok(())
}
}