modio-logger 0.5.2

modio-logger Dbus service
Documentation
// Author: D.S. Ljungmark <spider@skuggor.se>, Modio AB
// SPDX-License-Identifier: AGPL-3.0-or-later
#![allow(clippy::module_name_repetitions)]

use log::{debug, error, info, warn};

use fsipc::legacy::{PreparedPoint, Transaction};
use fsipc::unixtime;

use super::timefail;
use modio_logger_db::Datastore;

use zbus::{dbus_interface, SignalContext};

mod errors;
mod keys;
pub use errors::LogErr;

mod ping;
pub use ping::LoggerPing;
mod builder;
pub use builder::Builder;

pub struct Logger {
    ds: Datastore,
    timefail: timefail::Timefail,
}

impl Logger {
    pub async fn new(timefail: timefail::Timefail, ds: Datastore) -> Result<Self, LogErr> {
        if timefail.is_timefail() {
            info!("Failing all pending change requests due to TIMEFAIL");
            ds.transaction_fail_pending().await?;
        }
        ds.fail_queued_transactions().await?;
        Ok(Self { ds, timefail })
    }

    async fn periodic(&self) -> Result<(), LogErr> {
        info!("Doing periodic stuff like timefail");
        self.ds.persist_data().await?;
        if self.timefail.is_adjust() {
            if let Some(adjust) = self.timefail.get_adjust().await {
                info!("Time jump has happened, adjusting data with {}", adjust);
                self.ds.fix_timefail(adjust).await?;
                self.timefail.remove_adjust().await?;
            }
        }
        let count = self.ds.fail_queued_transactions().await?;
        info!("Failed {} queued transactions from db", count);

        let count = self.ds.delete_old_transactions().await?;
        info!("Removed {} old transactions from db", count);

        let count = self.ds.delete_old_logdata().await?;
        info!("Removed {} old log values from db", count);

        self.ds.need_vacuum_or_shrink().await?;
        Ok(())
    }
    #[must_use]
    pub fn builder() -> Builder {
        Builder::new()
    }
}

async fn get_mac() -> String {
    use async_std::fs;
    let wan = std::path::Path::new("/sys/class/net/wan/address");
    let mut res = match fs::read_to_string(&wan).await {
        Ok(data) => data.to_lowercase(),
        Err(_) => String::from("00:00:00:00:00:00"),
    };
    res.retain(|c| matches!(c, '0'..='9' | 'a'..='f'));
    res
}

#[dbus_interface(name = "se.modio.logger.fsipc")]
impl Logger {
    #[allow(clippy::unused_self)]
    const fn ping(&self) -> &str {
        "Ping? Pong"
    }
    #[allow(clippy::unused_self)]
    fn valid_key(&mut self, key: &str) -> bool {
        keys::valid_key(key).is_ok()
    }

    #[allow(clippy::unused_self)]
    async fn get_boxid(&self) -> String {
        get_mac().await
    }

    async fn retrieve(&mut self, key: &str) -> Result<(fsipc::legacy::Measure,), LogErr> {
        keys::valid_key(key)?;
        let res = self.ds.get_last_datapoint(key).await;
        match res {
            Ok(dat) => {
                let val = fsipc::legacy::Measure::from(dat);
                Ok((val,))
            }
            Err(_e) => Err(LogErr::NotFound("Key not found in storage".to_string())),
        }
    }

    async fn retrieve_all(&mut self) -> Result<Vec<fsipc::legacy::Measure>, LogErr> {
        let mut result = self.ds.get_latest_logdata().await?;
        let res: Vec<fsipc::legacy::Measure> = result
            .drain(0..)
            .map(fsipc::legacy::Measure::from)
            .collect();
        Ok(res)
    }

    /// Signal sent when a new value is stored
    #[dbus_interface(signal)]
    async fn store_signal(
        ctxt: &SignalContext<'_>,
        key: &str,
        value: &str,
        when: u64,
    ) -> zbus::Result<()>;

    async fn store(
        &self,
        #[zbus(signal_context)] ctxt: SignalContext<'_>,
        key: &str,
        value: &str,
    ) -> Result<(), LogErr> {
        keys::valid_key(key)?;
        let when = unixtime();
        let timefail = self.timefail.is_timefail();
        // if time wraps we have another problem.
        #[allow(clippy::cast_possible_wrap)]
        let db_when = when as i64;
        self.ds.insert(key, value, db_when, timefail).await?;
        if !key.starts_with("modio.") {
            Self::store_signal(&ctxt, key, value, when).await?;
        };
        Ok(())
    }

    async fn store_with_time(
        &mut self,
        #[zbus(signal_context)] ctxt: SignalContext<'_>,
        key: &str,
        value: &str,
        when: u64,
    ) -> Result<(), LogErr> {
        keys::valid_key(key)?;
        // timefail is always false when storing with timestamp, as the timestamp is expected to be
        // correct from another source.
        let timefail = false;

        // if time wraps we have another problem.
        #[allow(clippy::cast_possible_wrap)]
        let db_when = when as i64;
        self.ds.insert(key, value, db_when, timefail).await?;
        if !key.starts_with("modio.") {
            Self::store_signal(&ctxt, key, value, when).await?;
        };
        Ok(())
    }
    /// Signal sent when a new transaction is added
    #[dbus_interface(signal)]
    async fn transaction_added(ctxt: &SignalContext<'_>, key: &str) -> zbus::Result<()>;

    async fn transaction_add(
        &mut self,
        #[zbus(signal_context)] ctxt: SignalContext<'_>,
        key: &str,
        expected: &str,
        target: &str,
        token: &str,
    ) -> Result<(), LogErr> {
        keys::valid_key(key)?;
        keys::valid_token(token)?;
        if self.ds.has_transaction(token).await? {
            warn!(
                "Duplicate transaction (key: {}, token: {}) Ignoring for backwards compatibility.",
                key, token
            );
            return Ok(());
        }
        self.ds
            .transaction_add(key, expected, target, token)
            .await?;
        Self::transaction_added(&ctxt, key).await?;
        Ok(())
    }

    async fn transaction_get(&mut self, prefix: &str) -> Result<Vec<Transaction>, LogErr> {
        debug!("Retrieving transactions beginning with {}", prefix);
        let mut res = self.ds.transaction_get(prefix).await?;

        let res: Vec<Transaction> = res.drain(0..).map(Transaction::from).collect();
        Ok(res)
    }

    async fn transaction_fail(&mut self, t_id: u64) -> Result<(), LogErr> {
        debug!("Marking transaction: t_id={}, failed", t_id);
        let timefail = self.timefail.is_timefail();

        // if it wraps, that is fine.
        #[allow(clippy::cast_possible_wrap)]
        let t_id = t_id as i64;
        let count = self.ds.transaction_fail(t_id, timefail).await?;
        if count > 0 {
            Ok(())
        } else {
            Err(LogErr::NotFound("No such ID".into()))
        }
    }

    async fn transaction_pass(&mut self, t_id: u64) -> Result<(), LogErr> {
        debug!("Marking transaction: t_id={}, passed ", t_id);
        let timefail = self.timefail.is_timefail();
        // if it wraps, that is fine.
        #[allow(clippy::cast_possible_wrap)]
        let t_id = t_id as i64;
        let count = self.ds.transaction_pass(t_id, timefail).await?;
        if count > 0 {
            Ok(())
        } else {
            Err(LogErr::NotFound("No such ID".into()))
        }
    }

    async fn prepare_datapoints(&mut self, maximum: u32) -> Result<Vec<PreparedPoint>, LogErr> {
        prepare_range_check(maximum)?;
        let mut data = self.ds.get_batch(maximum).await?;
        let result: Vec<PreparedPoint> = data.drain(0..).map(PreparedPoint::from).collect();
        Ok(result)
    }

    async fn prepare_modio_datapoints(
        &mut self,
        maximum: u32,
    ) -> Result<Vec<PreparedPoint>, LogErr> {
        prepare_range_check(maximum)?;
        let mut data = self.ds.get_internal_batch(maximum).await?;
        let result: Vec<PreparedPoint> = data.drain(0..).map(PreparedPoint::from).collect();
        Ok(result)
    }

    async fn remove_prepared(&mut self, items: Vec<i64>) -> Result<(), LogErr> {
        if items.is_empty() {
            return Err(LogErr::NotFound("Empty set".to_string()));
        }
        if !items.iter().all(|x| *x >= 0_i64) {
            return Err(LogErr::NotFound("Invalid index".to_string()));
        }
        self.ds.drop_batch(&items).await?;
        Ok(())
    }
}

/// Range check helper for `prepare_datapoints` and `prepare_modio_datapoints`
///
/// Errors:
///    `LogErr::NotFound` error with string reason
fn prepare_range_check(num: u32) -> Result<(), LogErr> {
    match num {
        0 => Err(LogErr::NotFound("Too small".to_string())),
        1..=250 => Ok(()),
        _ => Err(LogErr::NotFound("Too big".to_string())),
    }
}

pub async fn call_periodic(iface: zbus::InterfaceDerefMut<'_, Logger>) -> Result<(), zbus::Error> {
    let res = iface.periodic().await;
    //  This should be removed and we should implement the trait on LogErr or just
    //  return a zbus::Error from the periodic function.
    match res {
        Ok(o) => Ok(o),
        Err(e) => {
            error!("Periodic task failed {}", e);
            let err = zbus::Error::Unsupported;
            Err(err)
        }
    }
}

#[cfg(test)]
pub mod tests {
    use super::*;

    use crate::testing::{launch_server, test_server_with_paths, Tempbase, TestPaths};
    use fsipc::legacy::fsipcProxy;
    use std::error::Error;

    type TestResult = Result<(), Box<dyn Error>>;
    use futures_util::future::FutureExt;
    use std::future::Future;

    /// Run a testcase, testcase is given as either a function or an async closure that takes a
    /// "fsipcProxy" with the lifetime 'a   and returns a `TestResult`
    ///
    /// This handles spawning the server, and tearing it down neatly when done.
    async fn run_testcase<'a, F, T>(line: u32, func: F) -> TestResult
    where
        F: FnOnce(fsipcProxy<'a>) -> T,
        T: Future<Output = TestResult> + Send + 'static,
    {
        let (logger, done, task) = launch_server(line).await?;
        let res = func(logger).await;
        done.notify(1);
        task.cancel().await;
        res
    }

    #[async_std::test]
    async fn use_run_testcase() -> TestResult {
        async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
            let res = proxy.done().await;
            assert!(res.is_err());
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn ping_pong_test() -> TestResult {
        async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
            let first = proxy.ping().await?;
            let second = proxy.ping().await?;
            assert_eq!(first, "Ping? Pong");
            assert_eq!(second, "Ping? Pong");
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn done_gives_error_test() -> TestResult {
        async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
            let res = proxy.done().await;
            assert!(res.is_err());
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn store_retrieve() -> TestResult {
        async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
            proxy.store("test.key", "abc123").await?;
            let m = proxy.retrieve("test.key").await?;
            assert_eq!(m.key, "test.key");
            assert_eq!(m.value, "abc123");
            Ok(())
        }

        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn store_buffer() -> TestResult {
        async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
            proxy
                .store_with_time("test.key", "abc123", 1_494_602_107)
                .await?;
            proxy
                .store_with_time("test.key", "abc1234", 1_494_602_108)
                .await?;
            let m = proxy.retrieve("test.key").await?;
            assert_eq!(m.key, "test.key");
            assert_eq!(m.value, "abc1234");
            assert_eq!(m.timestamp, 1_494_602_108);
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn retrieve_all_test() -> TestResult {
        async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
            proxy
                .store_with_time("test.key", "abc123", 1_494_602_107)
                .await?;
            proxy
                .store_with_time("test.key", "abc1234", 1_494_602_108)
                .await?;
            proxy.store("test.key2", "abcdefg").await?;

            let all = proxy.retrieve_all().await?;
            assert_eq!(all.len(), 2);
            let m0 = all.get(0).expect("Should have value");
            assert_eq!(m0.key, "test.key");
            assert_eq!(m0.value, "abc1234");
            assert_eq!(m0.timestamp, 1_494_602_108);
            let m1 = all.get(1).expect("Should have value");
            assert_eq!(m1.key, "test.key2");
            assert_eq!(m1.value, "abcdefg");
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn transaction_adding_test() -> TestResult {
        async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
            proxy
                .transaction_add("test.test.one", "first", "second", "012")
                .await?;
            proxy
                .transaction_add("dummy.test.one", "should not", "be present", "013")
                .await?;
            let transactions = proxy.transaction_get("test.test").await?;
            assert_eq!(transactions.len(), 1);
            let res = &transactions[0];
            assert_eq!(res.key, "test.test.one");
            assert_eq!(res.t_id, 1, "Transaction ID mismatch");
            Ok(())
        }

        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn transaction_dupe_adding_test() -> TestResult {
        async fn transaction_dupe_adding(proxy: fsipcProxy<'_>) -> TestResult {
            proxy
                .transaction_add("test.test.one", "first", "second", "1638290048")
                .await?;
            let res = proxy
                .transaction_add("test.test.one", "first", "second", "1638290048")
                .await;
            res.expect("duplicated tokens should not cause error");
            let transactions = proxy.transaction_get("test.test").await?;
            assert_eq!(transactions.len(), 1);
            Ok(())
        }

        run_testcase(line!(), transaction_dupe_adding).await
    }

    use futures_util::stream::StreamExt;

    #[async_std::test]
    async fn transaction_signal_test() -> TestResult {
        async fn inner(logger: fsipcProxy<'_>) -> TestResult {
            let mut stream = logger.receive_transaction_added().await?;
            let sender =
                logger.transaction_add("test.test.transaction_signal", "first", "second", "012");
            let recver = stream.next();

            let (_, recv) = futures::future::join(sender, recver).await;
            assert_eq!(
                recv.unwrap().args().unwrap().key,
                "test.test.transaction_signal"
            );
            Ok(())
        }
        run_testcase(line!(), inner).await
    }
    #[async_std::test]
    async fn transaction_passing_test() -> TestResult {
        async fn inner(logger: fsipcProxy<'_>) -> TestResult {
            logger
                .transaction_add("test.test.one", "first", "second", "012")
                .await?;
            logger
                .transaction_add("test.test.two", "uno", "dos", "0113")
                .await?;
            let trans = logger.transaction_get("test.test").await?;
            logger.transaction_fail(trans[0].t_id).await?;
            logger.transaction_pass(trans[1].t_id).await?;
            logger
                .transaction_add("test.test.three", "etta", "tvåa", "0114")
                .await?;
            let transactions = logger.transaction_get("test.test").await?;
            assert_eq!(transactions.len(), 1);
            let res = &transactions[0];
            assert_eq!(res.key, "test.test.three");
            assert_eq!(res.t_id, 3, "Transaction id mismatch");
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn retrieving_data_test() -> TestResult {
        async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
            ipc.store("test.test.one", "first").await?;
            ipc.store("test.test.one", "second").await?;
            ipc.store("test.test.one", "third").await?;

            ipc.store("test.test.two", "1").await?;
            ipc.store("test.test.two", "2").await?;
            ipc.store("test.test.two", "3").await?;

            let res = ipc.retrieve_all().await?;
            for measure in &res {
                let data = ipc.retrieve(&measure.key).await?;
                assert_eq!(data.key, measure.key);
            }
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn valid_key_test() -> TestResult {
        async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
            assert!(ipc.valid_key("modio.software.development").await?);
            assert!(ipc.valid_key("abc").await?);
            assert!(ipc.valid_key("a.b.c").await?);
            assert!(ipc.valid_key("a_b.c").await?);
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn invalid_key_test() -> TestResult {
        async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
            assert!(!ipc.valid_key("modio..invalid").await?);
            assert!(!ipc.valid_key(".modio..invalid").await?);
            assert!(!ipc.valid_key("modio.invalid.").await?);
            assert!(!ipc.valid_key("modio. invalid").await?);
            assert!(!ipc.valid_key("modio.in valid").await?);
            assert!(!ipc.valid_key("modio.invalid ").await?);
            assert!(!ipc.valid_key(" modio.invalid").await?);
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn transaction_double() -> TestResult {
        async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
            let key = "test.test.one";
            let first = "first";
            let second = "second";
            let guid4 = zbus::Guid::generate();
            // Store the key
            ipc.store(key, first).await?;

            ipc.transaction_add(key, first, second, guid4.as_str())
                .await?;
            let transactions = ipc.transaction_get(key).await?;
            let first_transaction = transactions
                .get(0)
                .expect("Should have at least one transaction");
            let res = ipc.transaction_pass(first_transaction.t_id).await;
            assert!(res.is_ok());
            let res = ipc.transaction_pass(first_transaction.t_id).await;
            assert!(res.is_err());
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn transaction_tests() -> TestResult {
        async fn inner_transactions(ipc: fsipcProxy<'_>) -> TestResult {
            let key = "test.test.one";
            let first = "first";
            let mut our_value = "first";
            let second = "second";

            ipc.store(key, our_value).await?;
            let when = unixtime();

            ipc.store_with_time(key, our_value, when).await?;

            let guid1 = zbus::Guid::generate(); // Transactions "id" are either a UUID or an timestamp-as-string
            ipc.transaction_add(key, first, second, guid1.as_str())
                .await?;
            let guid2 = zbus::Guid::generate();
            ipc.transaction_add(key, first, second, guid2.as_str())
                .await?;
            let guid3 = zbus::Guid::generate();
            ipc.transaction_add(key, first, second, guid3.as_str())
                .await?;

            let transactions = ipc.transaction_get(key).await?;
            // The "id" here is an internal ID in the current logger instance, not the same as
            // our GUID ID we entered above.
            for trn in &transactions {
                if trn.key == key {
                    if our_value == trn.expected {
                        our_value = &trn.target;
                        ipc.transaction_pass(trn.t_id).await?;
                    } else {
                        ipc.transaction_fail(trn.t_id).await?;
                    }
                }
                ipc.store(key, our_value).await?;
            }
            // Transaction  get should _succeed_ but return empty data.
            let res = ipc.transaction_get(key).await?;
            assert_eq!(res.len(), 0);

            Ok(())
        }
        run_testcase(line!(), inner_transactions).await
    }

    #[async_std::test]
    async fn no_modio_signals() -> TestResult {
        async fn no_modio_signals_inner(ipc: fsipcProxy<'_>) -> TestResult {
            let mut stream = ipc.receive_store_signal().await?;
            // Send a transactiona
            ipc.store("test.test.test", "value").await?;
            let first = stream.next().await.unwrap();
            assert_eq!(first.args()?.key, "test.test.test");

            // A store to a "modio." prefix key should not end up in a signal
            // Therefore we should have an empty result here.
            ipc.store("modio.test.test", "value").await?;
            let second = stream.next().now_or_never();
            assert!(second.is_none());
            Ok(())
        }

        run_testcase(line!(), no_modio_signals_inner).await
    }

    #[async_std::test]
    async fn submit_consume() -> TestResult {
        async fn inner_submit_consume(ipc: fsipcProxy<'_>) -> TestResult {
            for x in 0..5 {
                ipc.store("test.foo", &x.to_string()).await?;
            }
            let vals = ipc.prepare_datapoints(10).await?;
            assert_eq!(vals.len(), 5);

            let point = &vals[0];
            assert_eq!(point.key, "test.foo");
            assert_eq!(point.value, "0");

            let more = ipc.prepare_datapoints(10).await?;
            // Should be available still
            assert_eq!(more.len(), 5);

            let last = &vals[4];
            assert_eq!(last.key, "test.foo");
            assert_eq!(last.value, "4");

            let mut to_remove = Vec::new();
            for m in &vals {
                to_remove.push(m.id);
            }
            ipc.remove_prepared(to_remove).await?;
            let after = ipc.prepare_datapoints(30).await?;
            assert!(after.is_empty());
            Ok(())
        }

        run_testcase(line!(), inner_submit_consume).await
    }

    #[async_std::test]
    async fn submit_modio_consume() -> TestResult {
        async fn inner_submit_modio_consume(ipc: fsipcProxy<'_>) -> TestResult {
            for x in 0..5 {
                ipc.store("test.foo", &x.to_string()).await?;
                ipc.store("modio.test.foo", &x.to_string()).await?;
            }

            let vals = ipc.prepare_modio_datapoints(20).await?;
            assert_eq!(vals.len(), 5);
            for point in &vals {
                assert_eq!(&point.key, "modio.test.foo");
            }

            let more = ipc.prepare_modio_datapoints(10).await?;
            // Should be available still
            assert_eq!(more.len(), 5);

            // Figure out how to remove the ID's here
            //
            let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
            ipc.remove_prepared(to_remove).await?;

            let modio_after = ipc.prepare_modio_datapoints(30).await?;
            assert!(modio_after.is_empty());

            let after = ipc.prepare_datapoints(30).await?;
            assert!(!after.is_empty());
            Ok(())
        }

        run_testcase(line!(), inner_submit_modio_consume).await
    }

    #[async_std::test]
    async fn test_get_batch() -> TestResult {
        async fn inner_get_batch(ipc: fsipcProxy<'_>) -> TestResult {
            for x in 0..25 {
                ipc.store("test.foo", &x.to_string()).await?;
            }

            let vals = ipc.prepare_datapoints(10).await?;
            assert_eq!(vals.len(), 10);
            let point = &vals[0];
            assert_eq!(point.key, "test.foo");
            assert_eq!(point.value, "0");

            let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
            ipc.remove_prepared(to_remove).await?;

            // Requst more than we have left, should return only the 25-10=15 items
            let vals = ipc.prepare_datapoints(30).await?;
            assert_eq!(vals.len(), 15);
            let point = &vals[0];
            assert_eq!(point.key, "test.foo");
            assert_eq!(point.value, "10");

            let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
            ipc.remove_prepared(to_remove).await?;

            let vals = ipc.prepare_datapoints(30).await?;
            assert!(vals.is_empty());
            Ok(())
        }

        run_testcase(line!(), inner_get_batch).await
    }

    #[async_std::test]
    /// Technically, this test might fail if you have an ethernet interface named "wan"
    /// I'm ok with that for now as that is unlikely to happen, if it does, disable the test, or
    /// check it as something better
    async fn test_get_mac() -> TestResult {
        async fn inner(logger: fsipcProxy<'_>) -> TestResult {
            let res = logger.get_boxid().await?;
            assert_eq!(res, "000000000000");
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn test_empty_transactions() -> TestResult {
        async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
            let transactions = ipc.transaction_get("test").await?;
            assert_eq!(transactions.len(), 0);
            let values = ipc.retrieve_all().await?;
            assert_eq!(values.len(), 0);
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    async fn test_transaction_get_prefix() -> TestResult {
        async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
            let transactions = ipc.transaction_get("mbus.").await?;
            assert_eq!(transactions.len(), 0);
            Ok(())
        }
        run_testcase(line!(), inner).await
    }

    #[async_std::test]
    // This test is speshul.
    // It dupes the logic for the temp server in order to start it several times and compare
    // results between executions, in order to make sure that temp data (ram only) is accessible
    // later.
    async fn resume_database_test(ipc: fsipcProxy<'_>) -> TestResult {
        async fn spawn_server(
            server_name: &str,
            path: TestPaths,
        ) -> Result<
            (
                fsipcProxy<'static>,
                event_listener::Event,
                async_std::task::JoinHandle<()>,
            ),
            Box<dyn Error>,
        > {
            use crate::LOGGER_PATH;

            let test_complete = event_listener::Event::new();
            let test_waiter = test_complete.listen();

            let server = event_listener::Event::new();
            let server_ready = server.listen();

            dbg!(&server_name);
            let client_name = server_name.to_string();
            let task = async_std::task::spawn(test_server_with_paths(
                server_name.to_string(),
                server,
                test_waiter,
                path,
            ));
            server_ready.await;

            let conn = zbus::Connection::session().await?;
            let proxy = fsipcProxy::builder(&conn)
                .path(LOGGER_PATH)?
                .destination(client_name)?
                .build()
                .await?;
            proxy.ping().await?;
            Ok((proxy, test_complete, task))
        }

        let base = Tempbase::new();
        let server_name = format!("se.modio.logger.TestCase{}", line!());

        let expected = vec![
            ("test.test.one", "1"),
            ("test.test.two", "2"),
            ("test.test.three", "3"),
            ("test.test.four", "4"),
            ("test.test.five", "5"),
            ("test.test.six", "6"),
        ];

        // Spawn server, store some data and then shut it down
        {
            let (logger, done, task) = spawn_server(&server_name, base.paths.clone()).await?;
            {
                for (key, val) in &expected {
                    logger.store(key, val).await?;
                }
                // Read the data to ensure we have it in storage
                for (key, value) in &expected {
                    let data = logger.retrieve(key).await?;
                    assert_eq!(&data.value, value);
                    assert_eq!(&data.key, key);
                }
            }
            done.notify(1);
            task.await;
        };

        // Spawn another server, retrieving the previous data and ensure it is the same.
        {
            let (logger, done, task) = spawn_server(&server_name, base.paths.clone()).await?;
            for (key, value) in &expected {
                let data = logger.retrieve(key).await?;
                assert_eq!(&data.value, value);
                assert_eq!(&data.key, key);
            }
            done.notify(1);
            task.await;
        }
        Ok(())
    }
}