modio-logger-db 0.5.2

modio-logger Dbus service
Documentation
// Author: D.S. Ljungmark <spider@skuggor.se>, Modio AB
// SPDX-License-Identifier: AGPL-3.0-or-later
use log::error;

use crate::types::Metric;

use async_std::sync::{Arc, Mutex};

#[derive(thiserror::Error, Debug)]
pub enum VecError {
    #[error("Mutex is poisoned")]
    Mutex,
}

use async_trait::async_trait;
// Async traits are unstable, and writing them manually is ugly as sin.  I cheat and use a macro

type BufMetric = (Metric, String);
type BufMetrics = Vec<BufMetric>;

#[async_trait]
pub(crate) trait Buffer {
    type Error: std::error::Error + Send + 'static;
    type Store: Send + 'static;
    async fn new() -> Result<Self::Store, Self::Error>;
    async fn add_metric(&self, metric: Metric, status: &str) -> Result<(), Self::Error>;
    async fn consume_metrics(&self) -> Result<BufMetrics, Self::Error>;
    async fn has_name(&self, name: &str) -> Result<bool, Self::Error>;
    async fn count(&self) -> Result<usize, Self::Error>;
    async fn oldest(&self) -> Result<i64, Self::Error>;
}

pub(crate) struct VecBuffer {
    data: Arc<Mutex<BufMetrics>>,
}

#[async_trait]
impl Buffer for VecBuffer {
    type Error = VecError;
    type Store = VecBuffer;

    async fn new() -> Result<Self, Self::Error> {
        let inner: BufMetrics = Vec::with_capacity(100);
        let data = Arc::new(Mutex::new(inner));

        Ok(Self { data })
    }

    async fn add_metric(&self, metric: Metric, status: &str) -> Result<(), Self::Error> {
        let pair = (metric, status.to_string());
        {
            let inner = &mut self.data.lock().await;
            inner.push(pair);
        }
        Ok(())
    }

    async fn has_name(&self, name: &str) -> Result<bool, Self::Error> {
        let res = {
            let inner = &self.data.lock().await;
            inner.iter().any(|m| m.0.name == name)
        };
        Ok(res)
    }

    async fn count(&self) -> Result<usize, Self::Error> {
        let res = {
            let inner = &self.data.lock().await;
            inner.len()
        };
        Ok(res)
    }

    async fn oldest(&self) -> Result<i64, Self::Error> {
        let value = {
            let inner = &self.data.lock().await;
            let eldest = inner.iter().min_by_key(|m| m.0.time);
            if let Some(tup) = eldest {
                tup.0.time
            } else {
                0
            }
        };
        Ok(value)
    }

    async fn consume_metrics(&self) -> Result<BufMetrics, Self::Error> {
        let res = {
            let inner = &mut self.data.lock().await;
            inner.drain(..).collect()
        };
        Ok(res)
    }
}

#[must_use]
pub fn inixtime() -> i64 {
    use std::time::SystemTime;
    match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
        // If time wraps we have a bigger problem...
        #[allow(clippy::cast_possible_wrap)]
        Ok(n) => n.as_secs() as i64,
        Err(_) => 0,
    }
}

#[must_use]
pub fn unixtime() -> u64 {
    use std::time::SystemTime;
    match SystemTime::now().duration_since(SystemTime::UNIX_EPOCH) {
        Ok(n) => n.as_secs(),
        Err(_) => 0,
    }
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::error::Error;

    fn mtrc(name: &str, value: &str) -> Metric {
        Metric {
            name: name.to_string(),
            value: value.to_string(),
            time: inixtime(),
        }
    }

    #[async_std::test]
    async fn test_insert_remove() -> Result<(), Box<dyn Error>> {
        let ds = VecBuffer::new().await?;
        do_insert_remove(ds).await?;

        Ok(())
    }

    async fn do_insert_remove<T>(ds: T) -> Result<(), Box<dyn Error>>
    where
        T: Buffer,
    {
        let m = mtrc("test.case", "test.value");
        ds.add_metric(m, "NONE").await?;
        let m = mtrc("test.case", "test.value");
        ds.add_metric(m, "TIMEFAIL").await?;
        // Should have two metrcis
        assert_eq!(2, ds.count().await?);

        // Drain should have two metrics too
        let drain = ds.consume_metrics().await?;
        assert_eq!(drain.len(), 2);

        // Should have zero metrics
        assert_eq!(0, ds.count().await?);

        // Should be zero when draining too
        let drain = ds.consume_metrics().await?;
        assert_eq!(drain.len(), 0);
        Ok(())
    }

    #[async_std::test]
    async fn test_keys() -> Result<(), Box<dyn Error>> {
        let ds = VecBuffer::new().await?;
        println!("Testing vectors");
        do_test_keys(ds).await?;
        Ok(())
    }

    async fn do_test_keys<Store>(ds: Store) -> Result<(), Box<dyn Error>>
    where
        Store: Buffer,
    {
        let m = Metric {
            name: "test.case.one".to_string(),
            value: "null".to_string(),
            time: 1234,
        };
        ds.add_metric(m, "NONE").await?;
        let m = mtrc("test.case.two", "test.value");
        ds.add_metric(m, "TIMEFAIL").await?;

        // Should exist
        assert!(
            ds.has_name("test.case.one").await?,
            "Test case one should exist"
        );
        assert!(
            ds.has_name("test.case.two").await?,
            "Test case two should exist"
        );
        // Should not exist
        assert!(
            !ds.has_name("test.case.three").await?,
            "Three should not exist"
        );
        assert_eq!(1234, ds.oldest().await?, "1234 should be the oldest value");
        Ok(())
    }
}