modio_logger_db/datastore/
ds.rs

1// Author: D.S. Ljungmark <spider@skuggor.se>, Modio AB
2// SPDX-License-Identifier: AGPL-3.0-or-later
3use super::{raw, TagType};
4use crate::buffer::fxtime;
5use crate::buffer::{TimeStatus, VecBuffer};
6use crate::types::{Metric, Statistics, TXMetric};
7use crate::Error;
8use async_std::sync::Mutex;
9use async_std::task;
10use radix_trie::{Trie, TrieCommon};
11use sqlx::SqlitePool;
12use std::sync::Arc;
13use tempfile::NamedTempFile;
14use tracing::instrument;
15use tracing::{debug, error, info, warn};
16
17#[derive(Clone)]
18pub struct Datastore {
19    pool: SqlitePool,
20    buffer: VecBuffer,
21    names: Arc<Mutex<Trie<String, ()>>>,
22    drop_event: Arc<tokio::sync::Notify>,
23}
24
25// To avoid printing out _all_ the spam in drop_event, we implement a custom Debug here.
26impl std::fmt::Debug for Datastore {
27    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28        let opts = &self.pool.options();
29        let mut d = f.debug_struct("Datastore");
30        d.field("pool.size", &self.pool.size())
31            .field("pool.num_idle", &self.pool.num_idle())
32            .field("pool.is_closed", &self.pool.is_closed())
33            .field("pool.acquire_timeout", &opts.get_acquire_timeout())
34            .field("buffer", &self.buffer);
35        match self.names.try_lock() {
36            Some(guard) => {
37                d.field("names", &guard.len());
38            }
39            _ => {
40                d.field("names", &"<Locked>");
41            }
42        }
43        d.finish_non_exhaustive()
44    }
45}
46
47const MAX_PERSIST_AGE: f64 = 57.0;
48
49impl Datastore {
50    /// Construct a datastore
51    /// Takes an pre-configured and migrated DB pool and opens a temp buffer.
52    ///
53    /// # Errors
54    ///    Any `SQLx` Error is returned.
55    pub async fn new(pool: SqlitePool) -> Result<Self, Error> {
56        let buffer = VecBuffer::new();
57        let drop_event = Self::drop_event(pool.clone(), buffer.clone(), None);
58        let names = Arc::new(Mutex::new(Trie::new()));
59        let res = Self {
60            pool,
61            buffer,
62            names,
63            drop_event,
64        };
65        Ok(res)
66    }
67
68    #[must_use]
69    pub fn pool(&self) -> SqlitePool {
70        self.pool.clone()
71    }
72    #[cfg(test)]
73    async fn get_name(&self, key: &str) -> Result<String, Error> {
74        raw::get_name(&self.pool, key).await
75    }
76
77    #[cfg(test)]
78    async fn check_tempdata(&self) -> Result<(), Error> {
79        if let Some(oldest) = self.buffer.oldest() {
80            info!("Oldst value is {}", oldest);
81        } else {
82            info!("Buffer was locked or otherwise occupied.");
83        }
84        Ok(())
85    }
86    #[cfg(test)]
87    async fn sensor_id(&self, key: &str) -> Result<i64, Error> {
88        self.add_sensor(key).await?;
89        raw::sensor_id(&self.pool, key).await
90    }
91    /// Helper, count the remaining transaction in the table.
92    ///
93    /// # Errors
94    ///
95    /// Will return Err in case of database errors (connections, not existing, etc)
96    #[cfg(test)]
97    pub(crate) async fn count_transactions(&self) -> Result<i32, Error> {
98        let count = raw::count_transactions(&self.pool).await?;
99        Ok(count)
100    }
101
102    /// Return statistics for the database, meant to be used for logging and similar.
103    ///
104    /// # Errors
105    ///
106    /// Will return Err in case of database errors (connections, not existing, etc)
107    #[instrument]
108    pub async fn get_statistics(&self) -> Result<Statistics, Error> {
109        let mut res = raw::get_statistics(&self.pool).await?;
110        // Replace the "buffered" value with a fresh count, mapping "None" (locked, etc) to -1, and
111        // casting the usize result to i32
112        res.buffered = self.buffer.count().map_or(-1, |v| v as i32);
113        Ok(res)
114    }
115    /// Insert a single data-point into the database or buffer
116    ///
117    /// # Errors
118    ///   Any SQL error.
119    #[instrument(level = "debug", skip(value, time, timefail))]
120    pub async fn insert(
121        &self,
122        key: &str,
123        value: &str,
124        time: f64,
125        timefail: bool,
126    ) -> Result<(), Error> {
127        let arf = vec![Metric {
128            name: key.into(),
129            value: value.into(),
130            time,
131        }];
132        self.insert_bulk(arf, timefail).await
133    }
134
135    /// Insert a batch of data-points into the buffer store
136    ///
137    /// # Errors
138    ///   Any SQL error.
139    async fn insert_bulk_buffer(&self, data: Vec<Metric>, status: TimeStatus) -> Result<(), Error> {
140        for metric in data {
141            self.buffer.add_metric(metric, status).await;
142        }
143        Ok(())
144    }
145    #[instrument(level = "debug")]
146    async fn add_sensor(&self, key: &str) -> Result<(), Error> {
147        {
148            let mut guard = self.names.lock().await;
149            if guard.get(key).is_none() {
150                raw::add_sensor(&self.pool, key).await?;
151                guard.insert(key.to_string(), ());
152            }
153        }
154        Ok(())
155    }
156
157    /// Insert a batch of data-points into the datastore
158    /// Stores all the keys to permanent storage, but may buffer actual log values.
159    ///
160    /// # Errors
161    ///   Any SQL error.
162    #[instrument(level = "debug", skip(data))]
163    pub async fn insert_bulk(&self, data: Vec<Metric>, timefail: bool) -> Result<(), Error> {
164        // Start by adding all the names.
165        for metric in &data {
166            self.add_sensor(&metric.name).await?;
167        }
168        let status = TimeStatus::from_bool(timefail);
169        self.insert_bulk_buffer(data, status).await?;
170        Ok(())
171    }
172    /// Check if we should persist data to disk according to heurestic
173    ///
174    /// # Errors
175    /// Any SQL errors are returned.
176    #[instrument]
177    pub async fn should_persist(&self) -> Result<bool, Error> {
178        const CUTOFF: usize = 100;
179        #[allow(clippy::option_if_let_else)]
180        if let Some(num) = self.buffer.count() {
181            Ok(num > CUTOFF)
182        } else {
183            warn!("Failed to count data in buffer, assuming full.");
184            Ok(true)
185        }
186    }
187
188    /// Check if we should persist data to disk according to age heurestic
189    ///
190    /// # Errors
191    /// Any SQL errors are returned.
192    #[instrument]
193    pub async fn should_persist_age(&self) -> Result<bool, Error> {
194        let now = fxtime();
195        #[allow(clippy::option_if_let_else)]
196        match self.buffer.oldest() {
197            Some(eldest) => {
198                let age = now - eldest;
199                debug!("Oldest value in buffer is {} ( age: {}s )", &eldest, &age);
200                Ok(age > MAX_PERSIST_AGE)
201            }
202            None => {
203                error!("Buffer locked, assuming we should persist");
204                Ok(true)
205            }
206        }
207    }
208    // Persist data to disk
209    #[instrument(level = "info", skip(self))]
210    pub async fn persist_data(&self) -> Result<(), Error> {
211        let delete_buffer = self.buffer.consume_metrics().await;
212        debug!("Persisting data, count={}", delete_buffer.len());
213        raw::persist_data(&self.pool, &delete_buffer)
214            .await
215            .inspect_err(|e| error!("Failed to persist, err={e:?}, pool={:?}", &self.pool))?;
216        Ok(())
217    }
218
219    // Maybe perist data to disk based on age
220    #[instrument(level = "info", skip(self))]
221    pub async fn maybe_persist_data(&self) -> Result<(), Error> {
222        if self.should_persist_age().await? {
223            let delete_buffer = self.buffer.consume_metrics().await;
224            if let Err(e) = raw::persist_data(&self.pool, &delete_buffer).await {
225                // Re-insert the data into the buffer.
226                self.buffer.add_metrics(delete_buffer).await;
227                warn!("Failed to persist data, continuing. err={:?}", e);
228            };
229        }
230        Ok(())
231    }
232
233    // When dropping the datastore, Drop is called in a Sync context, which may not be using
234    // async features, giving us a head-ache.
235    //
236    // This function spawns a task that waits for the notifier that we should shut down and save
237    // buffer, and only exits after that.
238    //
239    // By signalling the drop event during Drop, we can then fake a callback style of async code
240    // to write data to disk when the Datastore is deallocated.
241    fn drop_event(
242        pool: SqlitePool,
243        buffer: VecBuffer,
244        dbfile: Option<Arc<NamedTempFile>>,
245    ) -> Arc<tokio::sync::Notify> {
246        let notify = Arc::new(tokio::sync::Notify::new());
247        let waiting = notify.clone();
248        task::spawn(async move {
249            waiting.notified().await;
250            info!(
251                "Persisting Buffered data to disk due to deallocation, file={:?}",
252                dbfile
253            );
254            let delete_buffer = buffer.consume_metrics().await;
255            if !delete_buffer.is_empty() {
256                debug!("Persisting count={} buffered values", delete_buffer.len());
257                if let Err(e) = raw::persist_data(&pool, &delete_buffer).await {
258                    error!(
259                        "Failed to write to database {:?} {:?} file={:?}",
260                        &pool, e, &dbfile
261                    );
262                    if let Some(fil) = dbfile {
263                        error!("File: exists: {:?}", fil.path().try_exists());
264                        error!("File: metadata: {:?}", fil.path().metadata());
265                    }
266                    panic!("Failed to persist data {e}");
267                }
268            };
269            info!("Closing pool due to permanence.");
270            pool.close().await;
271        });
272        notify
273    }
274}
275
276impl Drop for Datastore {
277    fn drop(&mut self) {
278        let size = self.pool.size();
279        let closed = self.pool.is_closed();
280        debug!("Dropping datastore size={size}, closed={closed}");
281        // As drop is always called in a SYNC context, and it is hairy to recurse and find an async
282        // executor that would allow us to write buffered data to disk, instead we have a task
283        // waiting when creating the Datastore, which we notify here. That task will then run the
284        // job of writing data to disk and closing files properly.
285        self.drop_event.notify_waiters();
286    }
287}
288
289// Batches of data
290impl Datastore {
291    #[instrument]
292    pub async fn get_batch(&self, size: u32) -> Result<Vec<TXMetric>, Error> {
293        let res = raw::get_batch(&self.pool, size).await?;
294        if res.len() >= (size as usize) {
295            return Ok(res);
296        }
297        // If we have < size metrics in our datastore,
298        // we _may_ persist some data to disk.
299        //
300        // As we do not wish to transmit data without storing it,
301        // and we do not want a loop of:
302        //    buffer: 1 data, to send: 0 data
303        //    submit calls "get batch"
304        //    write buffer to disk,  give to submit
305        //    submit stores result of that work (1 data)
306        //    submit sees "we had data", asks for data again
307        //    write buffer to disk, ...
308        //
309        // As that would ruin our disk performance, we only consider writing the buffer
310        // in case we do not have enough metrics to give, and the buffer should be synced.
311        // (time or fill-rate)
312        self.maybe_persist_data().await?;
313        let res = raw::get_batch(&self.pool, size).await?;
314        Ok(res)
315    }
316
317    #[instrument]
318    pub async fn get_internal_batch(&self, size: u32) -> Result<Vec<TXMetric>, Error> {
319        let res = raw::get_internal_batch(&self.pool, size).await?;
320        // See the comment in get_batch for why we check the count like this.
321        if res.len() >= (size as usize) {
322            return Ok(res);
323        }
324        self.maybe_persist_data().await?;
325        let res = raw::get_internal_batch(&self.pool, size).await?;
326        Ok(res)
327    }
328
329    #[instrument(level = "debug", skip(ids))]
330    pub async fn drop_batch(&self, ids: &[i64]) -> Result<(), Error> {
331        raw::drop_batch(&self.pool, ids).await?;
332        Ok(())
333    }
334
335    // Handling timefail
336    #[instrument(level = "debug")]
337    pub async fn fix_timefail(&self, adjust: f32) -> Result<u64, Error> {
338        self.persist_data().await?;
339        let count = raw::fix_timefail(&self.pool, adjust).await?;
340        info!(
341            "Updated TIMEFAIL status for count={} items with offset={}",
342            &count, adjust
343        );
344        Ok(count)
345    }
346    #[instrument(level = "debug")]
347    pub async fn get_last_datapoint(&self, key: &str) -> Result<Metric, Error> {
348        // If the metric is in our buffer, return it early and do not cause extra write to disk.
349        if let Some(metric) = self.buffer.get_metric(key).await {
350            return Ok(metric);
351        }
352        let res = raw::get_last_datapoint(&self.pool, key).await?;
353        Ok(res)
354    }
355    // Return the last point for all keys
356    #[instrument(level = "debug", skip_all, fields(self))]
357    pub async fn get_latest_logdata(&self) -> Result<Vec<Metric>, Error> {
358        // Even if we are asked to get latest logdata, we do not retrieve the ones from our
359        // internal buffer.
360        //
361        // Except if the data is about to expire.
362        self.maybe_persist_data().await?;
363        let res = raw::get_latest_logdata(&self.pool).await?;
364        Ok(res)
365    }
366}
367
368// Transaction (changes)
369mod changes {
370    use super::raw;
371    use super::Datastore;
372    use crate::buffer::TimeStatus;
373    use crate::types::Transaction;
374    use crate::Error;
375    use tracing::instrument;
376    use tracing::{debug, info};
377
378    impl Datastore {
379        #[instrument(level = "debug")]
380        pub async fn has_transaction(&self, token: &str) -> Result<bool, Error> {
381            let res = raw::has_transaction(&self.pool, token).await?;
382            Ok(res)
383        }
384
385        #[instrument(level = "info", skip(expected, target, token))]
386        pub async fn transaction_add(
387            &self,
388            key: &str,
389            expected: &str,
390            target: &str,
391            token: &str,
392        ) -> Result<(), Error> {
393            let internal_key = format!("mytemp.internal.change.{key}");
394            self.add_sensor(key).await?;
395            self.add_sensor(&internal_key).await?;
396            info!(
397                "Storing transaction {}, '{}' => '{}'",
398                &key, &expected, &target
399            );
400            raw::transaction_add(&self.pool, key, expected, target, token).await?;
401            Ok(())
402        }
403        #[instrument(level = "info")]
404        pub async fn transaction_get(&self, prefix: &str) -> Result<Vec<Transaction>, Error> {
405            let res = raw::transaction_get(&self.pool, prefix).await?;
406            Ok(res)
407        }
408        #[instrument]
409        pub async fn transaction_fail(
410            &self,
411            transaction_id: i64,
412            timefail: bool,
413        ) -> Result<u64, Error> {
414            use crate::datastore::TransactionStatus::Failed;
415            debug!("Failing transaction with id={}", transaction_id);
416            let timefail = TimeStatus::from_bool(timefail);
417            let count = raw::transaction_mark(&self.pool, transaction_id, Failed, timefail).await?;
418            Ok(count)
419        }
420
421        #[instrument]
422        pub async fn transaction_pass(
423            &self,
424            transaction_id: i64,
425            timefail: bool,
426        ) -> Result<u64, Error> {
427            use crate::datastore::TransactionStatus::Success;
428            debug!("Passing transaction with id={}", transaction_id);
429            let timefail = TimeStatus::from_bool(timefail);
430            let count =
431                raw::transaction_mark(&self.pool, transaction_id, Success, timefail).await?;
432            Ok(count)
433        }
434
435        /// Fail all pending transactions
436        #[instrument(level = "info")]
437        pub async fn transaction_fail_pending(&self) -> Result<u64, Error> {
438            let count = raw::transaction_fail_pending(&self.pool).await?;
439            info!("Marked {} pending transactions as failed", count);
440            Ok(count)
441        }
442    }
443}
444
445mod clean {
446    use super::raw;
447    use super::Datastore;
448    use crate::CleanResult;
449    use crate::Error;
450    use sqlx::{Connection, SqlitePool};
451    use tracing::instrument;
452    use tracing::{debug, error};
453
454    impl Datastore {
455        #[instrument(level = "info", skip(pool))]
456        pub async fn clean_maintenance(pool: SqlitePool) -> Result<CleanResult, Error> {
457            let mut conn = pool
458                .acquire()
459                .await
460                .inspect_err(|e| error!(err=?e, "Failed to get connection"))?;
461            conn.ping().await?;
462            // These functions take a Sqliteconnection as one cannot perform "VACUUM" and "PRAGMA" on
463            // the pool, but need to use a raw connection (and no transaction)
464            let trans_failed = raw::fail_queued_transactions(&mut conn)
465                .await
466                .inspect_err(|e| error!("Failed to mark queued transactions. err={:?}", e))
467                .unwrap_or(-1);
468            let trans_old = raw::delete_old_transactions(&mut conn)
469                .await
470                .inspect_err(|e| error!("Failed to delete old transactions. err={:?}", e))
471                .unwrap_or(-1);
472            let data_deleted = raw::delete_old_logdata(&mut conn)
473                .await
474                .inspect_err(|e| error!("Failed to delete old logdata. err={:?}", e))
475                .unwrap_or(-1);
476            raw::need_vacuum_or_shrink(&mut conn)
477                .await
478                .inspect_err(|e| error!("Failed to vacuum / shrink. err={:?}", e))?;
479            conn.close().await?;
480            let res = CleanResult {
481                trans_failed,
482                trans_old,
483                data_deleted,
484            };
485            Ok(res)
486        }
487
488        #[cfg(test)]
489        /// Wrapper of delete_old_transactions_raw, for use with tests.
490        pub(crate) async fn delete_old_transactions(&self) -> Result<i32, Error> {
491            debug!("delete_old_transactions, grabbing pool connection");
492            let mut conn = self
493                .pool
494                .acquire()
495                .await
496                .inspect_err(|e| error!(err=?e, "Failed to acquire connection"))?;
497            let res = raw::delete_old_transactions(&mut conn).await?;
498            conn.close().await?;
499            Ok(res)
500        }
501
502        /// Wrapper of fail_queued_transactions_raw
503        #[instrument]
504        pub async fn fail_queued_transactions(&self) -> Result<i32, Error> {
505            debug!("fail_queued_transactions, grabbing pool connection");
506            let mut conn = self
507                .pool
508                .acquire()
509                .await
510                .inspect_err(|e| error!(err=?e, "Failed to acquire connection"))?;
511            let res = raw::fail_queued_transactions(&mut conn).await?;
512            conn.close().await?;
513            Ok(res)
514        }
515
516        /// Wrapper of the private delete_old_logdata_raw for use with tests.
517        #[cfg(test)]
518        pub(crate) async fn delete_old_logdata(&self) -> Result<i32, Error> {
519            debug!("delete_old_logdata, grabbing pool connection");
520            let mut conn = self
521                .pool
522                .acquire()
523                .await
524                .inspect_err(|e| error!(err=?e, "Failed to acquire connection"))?;
525            let res = raw::delete_old_logdata(&mut conn).await?;
526            conn.close().await?;
527            Ok(res)
528        }
529
530        /// Delete random data
531        #[cfg(test)]
532        pub async fn delete_random_data(&self) -> Result<i32, Error> {
533            debug!("delete_random_data, grabbing pool connection");
534            let mut conn = self
535                .pool
536                .acquire()
537                .await
538                .inspect_err(|e| error!(err=?e, "Failed to acquire connection"))?;
539            let res = raw::delete_random_data(&mut conn).await?;
540            conn.close().await?;
541            Ok(res)
542        }
543    }
544}
545
546impl Datastore {
547    /// Create a new datastore with a new temporary file
548    ///
549    /// Panic:
550    ///   will panic if anything fails
551    pub async fn temporary() -> Self {
552        let dbfile = tempfile::Builder::new()
553            .prefix("database")
554            .suffix(".sqlite")
555            .tempfile()
556            .expect("Error on tempfile");
557        Self::new_tempfile(dbfile.into()).await
558    }
559
560    /// Create a new datastore from an existing temporary file
561    ///
562    /// Panic:
563    ///   will panic if anything fails
564    pub async fn new_tempfile(dbfile: Arc<NamedTempFile>) -> Self {
565        use crate::db::SqlitePoolBuilder;
566        let pool = SqlitePoolBuilder::new()
567            .db_path(dbfile.path())
568            .migrate(true)
569            .build()
570            .await
571            .expect("Failed to build pool");
572
573        let buffer = VecBuffer::new();
574        let drop_event = Self::drop_event(pool.clone(), buffer.clone(), Some(dbfile));
575        let names = Arc::new(Mutex::new(Trie::new()));
576        Self {
577            pool,
578            buffer,
579            names,
580            drop_event,
581        }
582    }
583}
584
585#[cfg(test)]
586fn metrc(name: &str, value: &str, time: f64) -> crate::Metric {
587    crate::Metric {
588        name: name.into(),
589        value: value.into(),
590        time,
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use super::*;
597    use crate::db::SqlitePoolBuilder;
598    use std::error::Error;
599    use test_log::test;
600    use timeout_macro::timeouttest;
601    type TestResult = Result<(), Box<dyn Error>>;
602
603    #[test(timeouttest)]
604    async fn has_file_database() -> TestResult {
605        let tempfile = tempfile::Builder::new()
606            .prefix("loggerdb_has_file_database")
607            .suffix(".sqlite")
608            .tempfile()?;
609        let named_path = tempfile.path();
610        let pool = SqlitePoolBuilder::new()
611            .db_path(named_path)
612            .migrate(true)
613            .build()
614            .await
615            .unwrap();
616        sqlx::query("SELECT * FROM sensor where sensor.name = 'mytemp.internal.sensors'")
617            .fetch_one(&pool)
618            .await?;
619        // Close the pool so we do not leave sqlite's .shm and .wal files around
620        pool.close().await;
621        drop(pool);
622        Ok(())
623    }
624
625    #[test(timeouttest)]
626    async fn datastore_tempdata() {
627        let ds = Datastore::temporary().await;
628        ds.check_tempdata()
629            .await
630            .expect("Should be able to get data from temp table");
631    }
632
633    #[test(timeouttest)]
634    async fn datastore_names() -> TestResult {
635        let ds = Datastore::temporary().await;
636        let res = ds.get_name("mytemp.internal.sensors").await?;
637        assert_eq!(res, "mytemp.internal.sensors");
638        Ok(())
639    }
640
641    #[test(timeouttest)]
642    async fn add_one_name() -> TestResult {
643        let ds = Datastore::temporary().await;
644        let res = ds.sensor_id("test.test.test").await?;
645        assert_eq!(res, 2);
646        Ok(())
647    }
648
649    #[test(timeouttest)]
650    async fn add_two_names() -> TestResult {
651        let ds = Datastore::temporary().await;
652        let res = ds.sensor_id("test.test.test").await?;
653        assert_eq!(res, 2);
654        let res = ds.sensor_id("test.test.test").await?;
655        assert_eq!(res, 2);
656        let res = ds.sensor_id("test.test.test").await?;
657        assert_eq!(res, 2);
658        let res = ds.get_name("test.test.test").await?;
659        assert_eq!(res, "test.test.test");
660        Ok(())
661    }
662
663    #[test(timeouttest)]
664    async fn insert_timefail() -> TestResult {
665        let ds = Datastore::temporary().await;
666        ds.insert("test.test.ok", "value", 1_620_850_252.0, false)
667            .await?;
668        ds.insert("test.test.ok", "value1", 1_620_850_253.0, false)
669            .await?;
670        ds.insert("test.test.ok", "value2", 1_620_850_255.0, false)
671            .await?;
672        ds.insert("test.test.fimefail", "value", 1_620_850_252.0, true)
673            .await?;
674        Ok(())
675    }
676
677    #[test(timeouttest)]
678    async fn retrieve_last_empty() -> TestResult {
679        let ds = Datastore::temporary().await;
680        let pool = ds.pool();
681        ds.insert("test.test.ok", "value", 1_620_850_252.0, false)
682            .await?;
683        let before_sync = ds.get_last_datapoint("test.test.ok").await;
684        assert!(before_sync.is_ok(), "Should have a value");
685
686        ds.persist_data().await?;
687
688        let after_sync = ds.get_last_datapoint("test.test.ok").await;
689        assert!(after_sync.is_ok(), "Should have a value");
690
691        let delete_all = sqlx::query("DELETE FROM logdata");
692        delete_all.execute(&pool).await?;
693
694        let after = ds.get_last_datapoint("test.test.ok").await;
695        assert!(after.is_err(), "Should fail");
696        Ok(())
697    }
698
699    #[test(timeouttest)]
700    async fn retrieve_sorting() -> TestResult {
701        let ds = Datastore::temporary().await;
702        ds.insert("test.test.one", "value0", 1_620_850_000.0, false)
703            .await?;
704        ds.insert("test.test.one", "value1", 1_620_850_111.0, true)
705            .await?;
706        ds.insert("test.test.one", "value3", 1_620_850_222.0, false)
707            .await?;
708        ds.insert("test.test.two", "value3", 1_620_850_333.0, true)
709            .await?;
710        ds.insert("test.test.two", "value1", 1_620_850_222.0, false)
711            .await?;
712        ds.insert("test.test.two", "value0", 1_620_850_111.0, true)
713            .await?;
714        let res = ds.get_last_datapoint("test.test.two").await?;
715        assert_eq!(res.name, "test.test.two");
716        assert_eq!(res.value, "value3");
717        assert_eq!(res.time, 1_620_850_333.0);
718
719        let res = ds.get_last_datapoint("test.test.one").await?;
720        assert_eq!(res.name, "test.test.one");
721        assert_eq!(res.value, "value3");
722        assert_eq!(res.time, 1_620_850_222.0);
723        Ok(())
724    }
725
726    #[test(timeouttest)]
727    async fn insert_bulk() -> TestResult {
728        let ds = Datastore::temporary().await;
729
730        let vals = vec![
731            metrc("test.test.one", "etta", 16_208_501_111.0),
732            metrc("test.test.two", "etta", 16_208_501_111.0),
733            metrc("test.test.three", "etta", 16_208_501_112.0),
734            metrc("test.test.one", "tvåa", 16_208_502_222.0),
735            metrc("test.test.two", "tvåa", 16_208_502_223.0),
736            metrc("test.test.three", "tvåa", 16_208_502_222.0),
737            metrc("test.test.one", "trea", 16_208_503_333.0),
738            metrc("test.test.two", "trea", 16_208_503_331.0),
739            metrc("test.test.three", "trea", 16_208_503_333.0),
740            metrc("test.test.one", "fyra", 16_208_504_444.0),
741        ];
742        ds.insert_bulk(vals, true).await?;
743        let res = ds.get_last_datapoint("test.test.one").await?;
744        assert_eq!(res.value, "fyra");
745        let res = ds.get_last_datapoint("test.test.three").await?;
746        assert_eq!(res.value, "trea");
747        let res = ds.get_last_datapoint("test.test.two").await?;
748        assert_eq!(res.value, "trea");
749        Ok(())
750    }
751
752    #[test(timeouttest)]
753    async fn insert_persist() -> TestResult {
754        let ds = Datastore::temporary().await;
755
756        let vals = vec![
757            metrc("test.test.one", "etta", 16_208_501_111.0),
758            metrc("test.test.two", "etta", 16_208_501_111.0),
759            metrc("test.test.three", "etta", 16_208_501_112.0),
760        ];
761        // Generate a bulk of 1000 keys
762        let seq = 0..1000;
763        let more: Vec<Metric> = seq
764            .map(|x| metrc("test.test.three", &format!("{x}"), fxtime()))
765            .collect();
766
767        ds.insert_bulk(vals, true).await?;
768        let first = ds.should_persist().await?;
769        assert!(!first, "Should not need persist with only 3 values");
770
771        let should = ds.buffer.get_metric("test.test.one").await.is_some();
772        assert!(
773            should,
774            "Should need persist because test.test.one is in buffer"
775        );
776
777        // Insert the bulk of the data
778        ds.insert_bulk(more, true).await?;
779        let second = ds.should_persist().await?;
780        assert!(second, "Should need persist with more values");
781
782        // Check that persist works
783        let res = ds.persist_data().await;
784        assert!(res.is_ok(), "We should have succesfully persisted data");
785
786        // We should no longer need persist
787        let third = ds.should_persist().await?;
788        assert!(!third, "We should not need persist again");
789
790        let should = ds.buffer.get_metric("test.test.one").await.is_some();
791        assert!(!should, "This key should no longer be in the buffer");
792
793        // Calling persist on an empty set should be ok.
794        ds.persist_data().await?;
795        Ok(())
796    }
797
798    #[test(timeouttest)]
799    async fn persist_delete_and_purge() -> TestResult {
800        let ds = Datastore::temporary().await;
801        let vals: Vec<Metric> = (0..1000)
802            .map(|x| {
803                metrc(
804                    &format!("test.test.bulk.{}", x % 7),
805                    &x.to_string(),
806                    fxtime(),
807                )
808            })
809            .collect();
810        ds.insert_bulk(vals, true).await?;
811        let should_count = ds.should_persist().await?;
812        let should_age = ds.should_persist_age().await?;
813        assert!(should_count, "Should persist based on count");
814        assert!(!should_age, "Oldest should not be that big");
815        ds.persist_data().await?;
816        let count = ds.delete_random_data().await?;
817        assert!(count > 0, "Should have deleted some data");
818        Ok(())
819    }
820
821    #[test(timeouttest)]
822    async fn delete_old_logged_values() -> TestResult {
823        let ds = Datastore::temporary().await;
824        ds.insert("test.test.ok", "one", 1_620_850_000.0, false)
825            .await?;
826        ds.insert("test.test.ok", "two", 1_999_950_251.0, false)
827            .await?;
828
829        // We should have a value
830        let metric = ds.get_last_datapoint("test.test.ok").await?;
831        assert_eq!(metric.value, "two");
832
833        // Submitter comes and does it's stuff here
834        let to_xmit = ds.get_batch(50).await?;
835        let mut to_remove = Vec::<i64>::with_capacity(5);
836        for i in to_xmit {
837            to_remove.push(i.id);
838        }
839        ds.drop_batch(&to_remove).await?;
840        //  End submitter part
841
842        // We should still have a value after this
843        let metric = ds.get_last_datapoint("test.test.ok").await?;
844        assert_eq!(metric.value, "two");
845
846        // Cleanout routine should run on the device
847        let count = ds.delete_old_logdata().await?;
848        assert_eq!(count, 1);
849
850        // After dropping data, we should still be able to get the last value out.
851        let metric = ds.get_last_datapoint("test.test.ok").await?;
852        assert_eq!(metric.value, "two");
853
854        // Cleanout routine should run on the device
855        let count = ds.delete_old_logdata().await?;
856        // We should not have removed any rows now.
857        assert_eq!(count, 0);
858
859        Ok(())
860    }
861
862    /// Inserting multiple items and then returning "latest" should return one point for each value
863    /// and nothing more.
864    #[test(timeouttest)]
865    async fn get_latest_datapoints() -> TestResult {
866        let ds = Datastore::temporary().await;
867
868        let vals = vec![
869            metrc("test.test.one", "etta", 1_620_850_111.0),
870            metrc("test.test.two", "etta", 1_620_850_111.0),
871            metrc("test.test.three", "etta", 1_620_850_111.0),
872            metrc("test.test.one", "tvåa", 1_620_850_222.0),
873            metrc("test.test.two", "tvåa", 1_620_850_222.0),
874            metrc("test.test.three", "tvåa", 1_620_850_222.0),
875            metrc("test.test.one", "trea", 1_620_850_333.0),
876            metrc("test.test.three", "trea", 1_620_850_333.0),
877            metrc("test.test.one", "fyra", 1_620_850_444.0),
878        ];
879        ds.insert_bulk(vals, true).await?;
880        ds.persist_data().await?;
881        let res = ds.get_latest_logdata().await?;
882        assert_eq!(res[0].name, "test.test.two");
883        assert_eq!(res[0].value, "tvåa");
884        assert_eq!(res[1].name, "test.test.three");
885        assert_eq!(res[1].value, "trea");
886        assert_eq!(res[2].name, "test.test.one");
887        assert_eq!(res[2].value, "fyra");
888        assert_eq!(res.len(), 3);
889        Ok(())
890    }
891
892    /// Multiple values for a key should persist if the oldest is too old.
893    #[test(timeouttest)]
894    async fn get_latest_datapoints_persists() -> TestResult {
895        let ds = Datastore::temporary().await;
896
897        let vals = vec![
898            metrc("test.test.one", "etta", fxtime() - MAX_PERSIST_AGE),
899            metrc("test.test.one", "tvåa", fxtime() - MAX_PERSIST_AGE / 2.0),
900            metrc("test.test.one", "trea", fxtime() - 1.0),
901        ];
902        ds.insert_bulk(vals, true).await?;
903        let res = ds.get_latest_logdata().await?;
904        assert_eq!(res[0].name, "test.test.one");
905        assert_eq!(res[0].value, "trea");
906        assert_eq!(res.len(), 1);
907
908        // If the data in the buffer is "too fresh" it should not be persisted by the call to
909        // get_latest_logdata() and we should thus get the previous value ("trea")
910        let vals = vec![metrc("test.test.one", "fyra", fxtime())];
911        ds.insert_bulk(vals, true).await?;
912        let res = ds.get_latest_logdata().await?;
913        assert_eq!(res[0].name, "test.test.one");
914        assert_eq!(res[0].value, "trea");
915        assert_eq!(res.len(), 1);
916
917        // After a persist, we should get the latest value
918        ds.persist_data().await?;
919        let res = ds.get_latest_logdata().await?;
920        assert_eq!(res[0].name, "test.test.one");
921        assert_eq!(res[0].value, "fyra");
922        assert_eq!(res.len(), 1);
923
924        Ok(())
925    }
926
927    #[test(timeouttest)]
928    async fn get_transactions() -> TestResult {
929        let ds = Datastore::temporary().await;
930
931        let vals = vec![
932            metrc("test.test.one", "etta", 16_208_501_111.0),
933            metrc("test.test.two", "tvåa", 16_208_504_444.0),
934        ];
935        ds.insert_bulk(vals, false).await?;
936        ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
937            .await?;
938        let res = ds.transaction_get("test.test.one").await?;
939        assert_eq!(res.len(), 1, "Expecting only one result");
940        assert_eq!(
941            res[0].name, "test.test.one",
942            "Expecting key to match added transaction"
943        );
944        assert_eq!(res[0].expected, "etta", "Epected value mismatch");
945        assert_eq!(res[0].target, "ettatvåa", "Target value mismatch");
946        assert_eq!(res[0].t_id, 1, "Transaction ID mismatch");
947        Ok(())
948    }
949
950    async fn with_keys() -> Datastore {
951        let ds = Datastore::temporary().await;
952        let vals = vec![
953            metrc("test.test.one", "etta", 16_208_501_111.0),
954            metrc("test.test.two", "etta", 16_208_501_111.0),
955            metrc("test.test.three", "etta", 16_208_501_112.0),
956            metrc("test.test.two", "tvåa", 16_208_502_223.0),
957            metrc("test.test.three", "tvåa", 16_208_502_222.0),
958            metrc("test.test.three", "trea", 16_208_503_333.0),
959        ];
960
961        ds.insert_bulk(vals, false).await.unwrap();
962        ds
963    }
964
965    #[test(timeouttest)]
966    async fn transmit_drop() -> TestResult {
967        let ds = with_keys().await;
968        ds.insert("modio.test.one", "modio-ett", 16_208_502_222.0, false)
969            .await?;
970        ds.insert("modio.test.one", "modio-ett", 16_208_502_222.0, true)
971            .await?;
972        ds.persist_data().await?;
973        let to_xmit = ds.get_batch(50).await?;
974        let mut to_remove = Vec::<i64>::with_capacity(50);
975        for i in to_xmit {
976            to_remove.push(i.id);
977        }
978        ds.drop_batch(&to_remove).await?;
979        let second = ds.get_batch(10).await?;
980        assert_eq!(second.len(), 0, "No elements should remain");
981        let third = ds.get_internal_batch(5).await?;
982        assert_eq!(
983            third.len(),
984            1,
985            "Expecting an internal value, because one is timefailed."
986        );
987        Ok(())
988    }
989    #[test(timeouttest)]
990    async fn timefail_handling_internal() -> TestResult {
991        let ds = with_keys().await;
992        ds.insert("modio.test.one", "modio-ett", 16_208_502_222.0, true)
993            .await?;
994        ds.insert("modio.test.one", "modio-ett", 16_208_502_221.0, true)
995            .await?;
996        let res = ds.get_internal_batch(10).await?;
997        assert_eq!(res.len(), 0);
998        ds.fix_timefail(10.0).await?;
999        let res = ds.get_internal_batch(10).await?;
1000        assert_eq!(res.len(), 2);
1001        Ok(())
1002    }
1003
1004    #[test(timeouttest)]
1005    async fn timefail_handling() -> TestResult {
1006        let ds = with_keys().await;
1007        ds.insert("test.test.one", "modio-ett", 1_620_850_222.0, true)
1008            .await?;
1009        ds.insert("test.test.two", "modio-två", 1_620_850_222.0, true)
1010            .await?;
1011        let res = ds.get_batch(10).await?;
1012        assert_eq!(res.len(), 6);
1013        ds.fix_timefail(10.0).await?;
1014        let res = ds.get_batch(10).await?;
1015        assert_eq!(res.len(), 8);
1016        Ok(())
1017    }
1018
1019    #[test(timeouttest)]
1020    async fn fail_transactions() -> TestResult {
1021        use serde::Deserialize;
1022        #[derive(Deserialize)]
1023        struct TransJson {
1024            id: String,
1025            clock: i64,
1026            status: String,
1027        }
1028        let ds = with_keys().await;
1029
1030        ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
1031            .await?;
1032        ds.transaction_add("test.test.two", "etta", "ettatvåa", "YYyyyyYYY")
1033            .await?;
1034        let first = ds.transaction_get("test.test.one").await?;
1035        assert_eq!(first.len(), 1, "Expecting only one result");
1036        assert_eq!(
1037            first[0].name, "test.test.one",
1038            "Expecting key to match added transaction"
1039        );
1040        ds.transaction_fail(first[0].t_id, false).await?;
1041        let logrow = ds
1042            .get_last_datapoint("mytemp.internal.change.test.test.one")
1043            .await?;
1044        let v1: TransJson = serde_json::from_str(&logrow.value)?;
1045        assert_eq!(v1.status, "FAILED");
1046        assert_eq!(v1.id, "xxxXXxx");
1047        assert!(v1.clock > 0);
1048
1049        let second = ds.transaction_get("test.test.one").await?;
1050        assert_eq!(second.len(), 0, "Should not have pending transactions");
1051
1052        let third = ds.transaction_get("test.test").await?;
1053        assert_eq!(third.len(), 1, "Should have pending for test.test.two");
1054        assert_eq!(
1055            third[0].name, "test.test.two",
1056            "Expecting key to match transaction two"
1057        );
1058        Ok(())
1059    }
1060    #[test(timeouttest)]
1061    async fn empty_fetch() {
1062        let ds = with_keys().await;
1063        let res = ds.get_last_datapoint("abc.def.ghi").await;
1064        assert!(res.is_err(), "Should have an error from absent keys");
1065    }
1066
1067    #[test(timeouttest)]
1068    async fn pass_transactions() -> TestResult {
1069        use serde::Deserialize;
1070        let ds = with_keys().await;
1071
1072        #[derive(Deserialize)]
1073        struct TransJson {
1074            id: String,
1075            clock: i64,
1076            status: String,
1077        }
1078
1079        ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
1080            .await?;
1081        ds.transaction_add("test.test.two", "etta", "ettatvåa", "YYyyyyYYY")
1082            .await?;
1083        let first = ds.transaction_get("test.test.one").await?;
1084
1085        assert_eq!(first.len(), 1, "Expecting only one result");
1086        assert_eq!(
1087            first[0].name, "test.test.one",
1088            "Expecting key to match added transaction"
1089        );
1090        ds.transaction_pass(first[0].t_id, false).await?;
1091        let logrow = ds
1092            .get_last_datapoint("mytemp.internal.change.test.test.one")
1093            .await?;
1094        let v1: TransJson = serde_json::from_str(&logrow.value)?;
1095        assert_eq!(v1.status, "SUCCESS");
1096        assert_eq!(v1.id, "xxxXXxx");
1097        assert!(v1.clock > 0);
1098
1099        let second = ds.transaction_get("test.test.one").await?;
1100        assert_eq!(second.len(), 0, "Should not have pending transactions");
1101
1102        let third = ds.transaction_get("test.test").await?;
1103        assert_eq!(third.len(), 1, "Should have pending for test.test.two");
1104        assert_eq!(
1105            third[0].name, "test.test.two",
1106            "Expecting key to match transaction two"
1107        );
1108        // this row should not exist in the database.
1109        let third_row = ds
1110            .get_last_datapoint("mytemp.internal.change.test.test.two")
1111            .await;
1112        assert!(third_row.is_err());
1113        Ok(())
1114    }
1115
1116    #[test(timeouttest)]
1117    async fn delete_old_transactions() -> TestResult {
1118        let ds = with_keys().await;
1119
1120        ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
1121            .await?;
1122        ds.transaction_add("test.test.one", "tvåa", "ettatvåa", "zzZZZzzz")
1123            .await?;
1124        ds.transaction_add("test.test.two", "etta", "ettatvåa", "YYyyyyYYY")
1125            .await?;
1126
1127        assert_eq!(ds.count_transactions().await?, 3);
1128        // Get and pass the transaction
1129        let trans = ds.transaction_get("test.test.one").await?;
1130        ds.transaction_pass(trans[0].t_id, false).await?;
1131        ds.transaction_fail(trans[1].t_id, false).await?;
1132
1133        assert_eq!(ds.count_transactions().await?, 3);
1134
1135        // Submitter comes and does it's stuff here
1136        let to_xmit = ds.get_batch(50).await?;
1137        let mut to_remove = Vec::<i64>::with_capacity(5);
1138        for i in to_xmit {
1139            to_remove.push(i.id);
1140        }
1141        ds.drop_batch(&to_remove).await?;
1142        //  End submitter part
1143        assert_eq!(ds.count_transactions().await?, 3);
1144        // Cleanout routine should run on the device
1145        let count = ds.delete_old_transactions().await?;
1146        assert_eq!(count, 1);
1147
1148        // We expect to have 2 remaining transactions still. One "PENDING" for test.test.two, and
1149        // one for "test.test.one" that is in "SUCCESS" or "FAIL" state.
1150        assert_eq!(ds.count_transactions().await?, 2);
1151
1152        // Cleanout routine should run on the device
1153        let count = ds.delete_old_transactions().await?;
1154        // We should not have removed any rows now.
1155        assert_eq!(count, 0);
1156        Ok(())
1157    }
1158
1159    #[test(timeouttest)]
1160    async fn fail_queued_transactions() -> TestResult {
1161        use uuid::Uuid;
1162
1163        let ds = with_keys().await;
1164
1165        for x in 0..18 {
1166            let tok = Uuid::new_v4().hyphenated().to_string();
1167            let val = format!("newval{x}");
1168            ds.transaction_add("test.test.one", "etta", &val, &tok)
1169                .await?;
1170        }
1171        ds.transaction_add("test.test.two", "etta", "ettatvåa", "xxxXXxx")
1172            .await?;
1173        ds.transaction_add("test.test.three", "tvåa", "ettatvåa", "zzZZZzzz")
1174            .await?;
1175        ds.transaction_add("test.test.four", "etta", "ettatvåa", "YYyyyyYYY")
1176            .await?;
1177        ds.transaction_add("test.test.one", "etta", "lasttarget", "xxXXxxxXXxxx")
1178            .await?;
1179        assert_eq!(ds.count_transactions().await?, 22);
1180
1181        // Now run the routine to mark deep queues of transactions to failed
1182        let count = ds.fail_queued_transactions().await?;
1183        assert_eq!(count, 3, "3 should be marked as failed");
1184
1185        ds.delete_old_transactions().await?;
1186        assert_eq!(ds.count_transactions().await?, 19, "19 should exist");
1187
1188        let res = ds.transaction_get("test.test.one").await?;
1189        let lastval = res.last().unwrap();
1190        assert_eq!(
1191            lastval.target, "lasttarget",
1192            "Expecting last transaction to be the last added"
1193        );
1194        Ok(())
1195    }
1196}
1197
1198mod metadata {
1199    use super::{raw, TagType};
1200    use super::{Datastore, Error};
1201    use crate::types::DataType;
1202    use crate::types::Metadata;
1203    use crate::types::SensorMode;
1204    use crate::types::ValueMap;
1205    use std::collections::{BTreeMap, HashMap};
1206    use tracing::info;
1207    use tracing::instrument;
1208
1209    #[derive(Debug)]
1210    enum KeyType {
1211        Modio,
1212        Customer,
1213    }
1214    impl Datastore {
1215        #[instrument]
1216        pub async fn metadata_get_names(&self) -> Result<Vec<Metadata>, Error> {
1217            info!("Requested all names");
1218            let vals = self.metadata_get_tag(TagType::Name).await?;
1219            let res = vals
1220                .into_iter()
1221                .map(|(key, name)| Metadata::builder(key).name(name).build())
1222                .collect();
1223            Ok(res)
1224        }
1225
1226        #[instrument]
1227        pub async fn metadata_get_units(&self) -> Result<Vec<Metadata>, Error> {
1228            info!("Requested all units");
1229            let vals = self.metadata_get_tag(TagType::Unit).await?;
1230            let res = vals
1231                .into_iter()
1232                .map(|(key, unit)| Metadata::builder(key).unit(unit).build())
1233                .collect();
1234            Ok(res)
1235        }
1236
1237        #[instrument]
1238        pub async fn metadata_get_descriptions(&self) -> Result<Vec<Metadata>, Error> {
1239            info!("Requested all units");
1240            let vals = self.metadata_get_tag(TagType::Description).await?;
1241            let res = vals
1242                .into_iter()
1243                .map(|(key, description)| Metadata::builder(key).description(description).build())
1244                .collect();
1245            Ok(res)
1246        }
1247
1248        #[instrument]
1249        pub async fn metadata_get_enum(&self) -> Result<Vec<Metadata>, Error> {
1250            info!("Requested all value maps");
1251            let vals = self.metadata_get_tag(TagType::Enum).await?;
1252            let res = vals
1253                .into_iter()
1254                .map(|(key, stringy)| Metadata::builder(key).value_map_string(stringy).build())
1255                .collect();
1256            Ok(res)
1257        }
1258        #[instrument(level = "info")]
1259        pub async fn get_metadata(&self, key: &str) -> Result<Option<Metadata>, Error> {
1260            let pairs = raw::get_metadata(&self.pool, key).await?;
1261            // Early exit so we don't return an empty Metadata for
1262            // a key that has no metadata
1263            if pairs.is_empty() {
1264                return Ok(None);
1265            }
1266            let mut builder = Metadata::builder(key);
1267            for (tag, value) in pairs {
1268                builder = builder.pair(&tag, value);
1269            }
1270            let res = builder.build();
1271            Ok(Some(res))
1272        }
1273        // Grab all metadata with only a pool, returns a hashmap of Key=>Metadata::Builder()
1274        #[instrument(level = "info")]
1275        async fn get_all_metadata_inner(&self, keytype: KeyType) -> Result<Vec<Metadata>, Error> {
1276            let mut map = HashMap::new();
1277            let mut offset: u32 = 0;
1278            info!("Requested all metadata from DB");
1279            loop {
1280                let pairs = match keytype {
1281                    KeyType::Modio => raw::get_all_metadata_internal(&self.pool, offset).await?,
1282                    KeyType::Customer => raw::get_all_metadata_customer(&self.pool, offset).await?,
1283                };
1284                if pairs.is_empty() {
1285                    // No more data
1286                    break;
1287                }
1288                offset += pairs.len() as u32;
1289                for val in pairs {
1290                    // If there is a matching builder in the hashmap, use it
1291                    let mut builder = map
1292                        .remove(&val.key)
1293                        // Or create if absent.
1294                        .unwrap_or_else(|| Metadata::builder(&val.key));
1295                    builder = builder.pair(&val.tag, val.value);
1296                    map.insert(val.key, builder);
1297                }
1298            }
1299            let res: Vec<Metadata> = map.drain().map(|(_, builder)| builder.build()).collect();
1300            Ok(res)
1301        }
1302
1303        #[instrument(level = "debug")]
1304        pub async fn get_all_metadata(&self) -> Result<Vec<Metadata>, Error> {
1305            let res = self.get_all_metadata_inner(KeyType::Customer).await?;
1306            Ok(res)
1307        }
1308
1309        #[instrument(level = "debug")]
1310        pub async fn get_all_internal_metadata(&self) -> Result<Vec<Metadata>, Error> {
1311            let res = self.get_all_metadata_inner(KeyType::Modio).await?;
1312            Ok(res)
1313        }
1314        /// Set a tag. Replace it if it was already set
1315        ///
1316        /// Returns bool, truth means it was set,
1317        /// false means it was already set to that value and nothing changed
1318        #[instrument(level = "info")]
1319        async fn metadata_replace_tag(
1320            &self,
1321            key: &str,
1322            tag: TagType,
1323            value: &str,
1324        ) -> Result<bool, Error> {
1325            // Check if we can just skip the entire thing.
1326            if self.metadata_tag_equals(key, tag, value).await {
1327                return Ok(false);
1328            }
1329            // Ensure the name is stored first.
1330            self.add_sensor(key).await?;
1331            raw::metadata_replace_tag(&self.pool, key, tag, value).await?;
1332            Ok(true)
1333        }
1334        /// Set a tag. fail if it is already set.
1335        #[instrument(level = "info")]
1336        async fn metadata_set_tag(
1337            &self,
1338            key: &str,
1339            tag: TagType,
1340            value: &str,
1341        ) -> Result<(), Error> {
1342            self.add_sensor(key).await?;
1343            raw::metadata_set_tag(&self.pool, key, tag, value).await?;
1344            Ok(())
1345        }
1346        #[instrument(level = "info")]
1347        async fn metadata_get_tag(&self, tag: TagType) -> Result<Vec<(String, String)>, Error> {
1348            let res = raw::metadata_get_tag(&self.pool, tag).await?;
1349            Ok(res)
1350        }
1351        // Returns an error if the key doesnt exist.
1352        #[instrument(level = "info")]
1353        async fn metadata_get_single_tag(&self, key: &str, tag: TagType) -> Result<String, Error> {
1354            let res = raw::metadata_get_single_tag(&self.pool, key, tag).await?;
1355            Ok(res)
1356        }
1357
1358        /// Sometimes we just want to check if the tags are already in place and look the same.
1359        /// This function treats errors as "not equal" and hopes you deal with it.
1360        async fn metadata_tag_equals(&self, key: &str, tag: TagType, value: &str) -> bool {
1361            self.metadata_get_single_tag(key, tag)
1362                .await
1363                .is_ok_and(|old_val| old_val == value)
1364        }
1365
1366        /// Set the name for a sensor
1367        /// If the name was already set, replaces it.
1368        pub async fn metadata_set_name(&self, key: &str, name: &str) -> Result<bool, Error> {
1369            self.metadata_replace_tag(key, TagType::Name, name).await
1370        }
1371
1372        /// Set the unit for an key.
1373        /// If the unit is already set, will return some kind of Error.
1374        /// Returns true if it wrote to database, false if not
1375        pub async fn metadata_set_unit(&self, key: &str, unit: &str) -> Result<bool, Error> {
1376            // Take a peek if we already have a unit for this key first
1377            if let Some(meta) = self.get_metadata(key).await? {
1378                if let Some(u) = meta.u {
1379                    if u == unit {
1380                        // The two are equal, return early rather than cause a Unique error.
1381                        return Ok(false);
1382                    }
1383                }
1384            }
1385            self.metadata_set_tag(key, TagType::Unit, unit).await?;
1386            Ok(true)
1387        }
1388
1389        /// Set the description for a sensor
1390        /// If the description was already set, replaces it.
1391        #[instrument]
1392        pub async fn metadata_set_description(
1393            &self,
1394            key: &str,
1395            description: &str,
1396        ) -> Result<bool, Error> {
1397            self.metadata_replace_tag(key, TagType::Description, description)
1398                .await
1399        }
1400
1401        /// Set the mode for a sensor. (`ReadOnly`, `ReadWrite`, `WriteOnly`)
1402        /// If the mode was already set, replaces it.
1403        #[instrument]
1404        pub async fn metadata_set_mode(&self, key: &str, mode: &SensorMode) -> Result<bool, Error> {
1405            self.metadata_replace_tag(key, TagType::Mode, mode.as_str())
1406                .await
1407        }
1408
1409        /// Set the enum (lookup table)  for a sensor
1410        /// If the enum was already set, replaces it.
1411        #[instrument]
1412        pub async fn metadata_set_enum(
1413            &self,
1414            key: &str,
1415            value_map: &ValueMap,
1416        ) -> Result<bool, Error> {
1417            let into = {
1418                // Convert to a BTreeMap to guarantee order of values
1419                // This will create a map of <&u32, &String>, which is fine for use with serde_json,
1420                // but is in general a data-structure with a nightmare of a lifetime
1421                let sorted_map: BTreeMap<_, _> = value_map.iter().collect();
1422                serde_json::to_string(&sorted_map)?
1423            };
1424            self.metadata_replace_tag(key, TagType::Enum, &into).await
1425        }
1426
1427        /// Set the key as a row
1428        /// Row data is expected to be an Array of Datatype that will then be stored and parsed
1429        /// from strings (json)
1430        /// Length of the array must match the incoming data for this key.
1431        /// Returns true if it wrote to database, false if not
1432        #[instrument]
1433        pub async fn metadata_set_row(
1434            &self,
1435            key: &str,
1436            row_types: Vec<DataType>,
1437        ) -> Result<bool, Error> {
1438            let rowdata = serde_json::to_string(&row_types)?;
1439            // If we have row metadata, and it's the same, we return equality.
1440            // The logic is similar as for Unit,  we want to allow _set_ and set again with the
1441            // same value, but we forbid changing it.
1442            if let Ok(old_row) = self.metadata_get_single_tag(key, TagType::Row).await {
1443                if old_row == rowdata {
1444                    return Ok(false);
1445                }
1446            }
1447            self.metadata_set_tag(key, TagType::Row, &rowdata).await?;
1448            Ok(true)
1449        }
1450
1451        // Check if the name exists in the sensor table and is a bulk-looking key
1452        #[instrument]
1453        pub async fn metadata_get_row(&self, key: &str) -> Result<Vec<DataType>, Error> {
1454            let value = self.metadata_get_single_tag(key, TagType::Row).await?;
1455            let res = DataType::vec_from_str(&value)?;
1456            Ok(res)
1457        }
1458    }
1459
1460    #[cfg(test)]
1461    mod tests {
1462        use super::*;
1463        use std::error::Error as StdError;
1464        use test_log::test;
1465        use timeout_macro::timeouttest;
1466        type TestResult = Result<(), Box<dyn StdError>>;
1467
1468        #[test(timeouttest)]
1469        async fn get_empty_metadata() -> TestResult {
1470            let ds = Datastore::temporary().await;
1471            ds.insert("modio.test.key", "one", 1_620_850_000.0, false)
1472                .await?;
1473            ds.insert("public.test.key", "two", 1_620_850_000.0, false)
1474                .await?;
1475
1476            let res = ds.metadata_get_names().await?;
1477            assert!(res.is_empty(), "should be empty");
1478
1479            let res = ds.get_metadata("modio.test.key").await?;
1480            assert!(res.is_none(), "should not exist");
1481            Ok(())
1482        }
1483
1484        #[test(timeouttest)]
1485        async fn get_metadata_name() -> TestResult {
1486            let ds = Datastore::temporary().await;
1487
1488            // First time should be Ok(true)
1489            let res = ds
1490                .metadata_set_name("modio.test.key", "Modio Test Key")
1491                .await?;
1492            assert!(res);
1493
1494            // Second time should be Ok(false)
1495            let res = ds
1496                .metadata_set_name("modio.test.key", "Modio Test Key")
1497                .await?;
1498            assert!(!res);
1499
1500            let res = ds.metadata_get_names().await?;
1501            assert_eq!(res.len(), 1, "should have one key");
1502            assert_eq!(res[0].name, Some("Modio Test Key".into()), "Should match");
1503            assert_eq!(res[0].n, "modio.test.key", "Should be our key");
1504
1505            ds.metadata_set_name("modio.test.key", "Modio Test Key Two")
1506                .await?;
1507            let res = ds.metadata_get_names().await?;
1508            assert_eq!(res.len(), 1, "should have one key");
1509            assert_eq!(
1510                res[0].name,
1511                Some("Modio Test Key Two".into()),
1512                "Should match"
1513            );
1514            Ok(())
1515        }
1516        #[test(timeouttest)]
1517        async fn get_metadata_description() -> TestResult {
1518            let ds = Datastore::temporary().await;
1519
1520            ds.metadata_set_description("modio.test.key", "Modio Test description")
1521                .await?;
1522            let res = ds.metadata_get_descriptions().await?;
1523            assert_eq!(res.len(), 1, "should have one key");
1524            assert_eq!(
1525                res[0].description,
1526                Some("Modio Test description".into()),
1527                "Should match"
1528            );
1529            assert_eq!(res[0].n, "modio.test.key", "Should be our key");
1530
1531            ds.metadata_set_description(
1532                "modio.test.key",
1533                "The second  update is to change the description",
1534            )
1535            .await?;
1536            let res = ds.metadata_get_descriptions().await?;
1537            assert_eq!(res.len(), 1, "should have one key");
1538            Ok(())
1539        }
1540
1541        #[test(timeouttest)]
1542        async fn get_metadata_unit() -> TestResult {
1543            let ds = Datastore::temporary().await;
1544
1545            ds.metadata_set_unit("modio.test.key", "Cel").await?;
1546
1547            let res = ds.metadata_get_units().await?;
1548            assert_eq!(res.len(), 1, "should be one item");
1549            assert_eq!(res[0].u, Some("Cel".into()), "Should be Celsius");
1550            assert_eq!(res[0].n, "modio.test.key", "Should be our key");
1551
1552            let status = ds.metadata_set_unit("modio.test.key", "m").await;
1553            assert!(status.is_err(), "Should not be able to replace unit");
1554            let res = ds.metadata_get_units().await?;
1555            assert_eq!(res[0].u, Some("Cel".into()), "Should still be Celsius");
1556            Ok(())
1557        }
1558
1559        #[test(timeouttest)]
1560        async fn set_unit_unique() -> TestResult {
1561            let ds = Datastore::temporary().await;
1562            ds.metadata_set_unit("modio.test.key", "Cel").await?;
1563            // Calling it again with the same unit should work
1564            ds.metadata_set_unit("modio.test.key", "Cel").await?;
1565
1566            let err = ds
1567                .metadata_set_unit("modio.test.key", "m")
1568                .await
1569                .expect_err("Should get unique constraint failed");
1570            assert_eq!(err.to_string(), "Unique constraint failed");
1571            Ok(())
1572        }
1573
1574        #[test(timeouttest)]
1575        async fn get_metadata_enum() -> TestResult {
1576            let ds = Datastore::temporary().await;
1577
1578            let value_map = ValueMap::from([
1579                (0, "error".to_string()),
1580                (1, "enabled".to_string()),
1581                (2, "disabled".to_string()),
1582            ]);
1583            ds.metadata_set_enum("modio.test.key", &value_map).await?;
1584            let mut res = ds.metadata_get_enum().await?;
1585            assert_eq!(res.len(), 1, "Should have one value");
1586            assert_eq!(res[0].n, "modio.test.key");
1587            assert!(res[0].value_map.is_some());
1588            let entry = res.pop().unwrap();
1589            let vmap = entry.value_map.unwrap();
1590            assert_eq!(vmap.get(&0).unwrap(), &"error".to_string());
1591            assert_eq!(vmap.get(&1).unwrap(), &"enabled".to_string());
1592            assert_eq!(vmap.get(&2).unwrap(), &"disabled".to_string());
1593
1594            let humm = ds.get_metadata("modio.test.key").await?;
1595            assert!(humm.is_some());
1596            let humm = humm.unwrap();
1597            assert_eq!(humm.n, "modio.test.key");
1598            assert!(humm.value_map.is_some());
1599            // Unpacking it again should also look the same as above.
1600            let vmap = humm.value_map.unwrap();
1601            assert_eq!(vmap.get(&0).unwrap(), &"error".to_string());
1602            assert_eq!(vmap.get(&1).unwrap(), &"enabled".to_string());
1603            assert_eq!(vmap.get(&2).unwrap(), &"disabled".to_string());
1604
1605            Ok(())
1606        }
1607
1608        #[test(timeouttest)]
1609        async fn test_get_all_internal_metadata() -> TestResult {
1610            let ds = Datastore::temporary().await;
1611            let value_map = ValueMap::from([
1612                (0, "error".to_string()),
1613                (1, "enabled".to_string()),
1614                (2, "disabled".to_string()),
1615            ]);
1616            ds.metadata_set_name("customer.data", "Customer name")
1617                .await?;
1618            ds.metadata_set_name("modio.test.dupe", "Modio Test Another Key")
1619                .await?;
1620            ds.metadata_set_enum("modio.test.key", &value_map).await?;
1621            ds.metadata_set_name("modio.test.key", "Modio Test Key")
1622                .await?;
1623            ds.metadata_set_unit("modio.test.key", "Cel").await?;
1624            ds.metadata_set_description("modio.test.key", "Our Description")
1625                .await?;
1626            let res = ds.get_all_internal_metadata().await?;
1627            assert_eq!(res.len(), 2, "should have one key");
1628
1629            let mut filt: Vec<Metadata> = res
1630                .into_iter()
1631                .filter(|x| x.n == "modio.test.key")
1632                .collect();
1633            let obj = filt.pop().unwrap();
1634            assert_eq!(obj.n, "modio.test.key");
1635            assert_eq!(obj.name, Some("Modio Test Key".into()));
1636            assert_eq!(obj.description, Some("Our Description".into()));
1637            assert_eq!(obj.u, Some("Cel".into()));
1638            assert_eq!(obj.value_map.unwrap().get(&0), Some(&"error".to_string()));
1639
1640            let mut cust = ds.get_all_metadata().await?;
1641            assert_eq!(cust.len(), 1, "should have one key");
1642            let c_item = cust.pop().unwrap();
1643            assert_eq!(c_item.n, "customer.data", "Should be a customer key");
1644            assert_eq!(
1645                c_item.name,
1646                Some("Customer name".into()),
1647                "Should have a customer name"
1648            );
1649            Ok(())
1650        }
1651    }
1652}