1use super::{raw, TagType};
4use crate::buffer::fxtime;
5use crate::buffer::{TimeStatus, VecBuffer};
6use crate::types::{Metric, Statistics, TXMetric};
7use crate::Error;
8use async_std::sync::Mutex;
9use async_std::task;
10use radix_trie::{Trie, TrieCommon};
11use sqlx::SqlitePool;
12use std::sync::Arc;
13use tempfile::NamedTempFile;
14use tracing::instrument;
15use tracing::{debug, error, info, warn};
16
17#[derive(Clone)]
18pub struct Datastore {
19 pool: SqlitePool,
20 buffer: VecBuffer,
21 names: Arc<Mutex<Trie<String, ()>>>,
22 drop_event: Arc<tokio::sync::Notify>,
23}
24
25impl std::fmt::Debug for Datastore {
27 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
28 let opts = &self.pool.options();
29 let mut d = f.debug_struct("Datastore");
30 d.field("pool.size", &self.pool.size())
31 .field("pool.num_idle", &self.pool.num_idle())
32 .field("pool.is_closed", &self.pool.is_closed())
33 .field("pool.acquire_timeout", &opts.get_acquire_timeout())
34 .field("buffer", &self.buffer);
35 match self.names.try_lock() {
36 Some(guard) => {
37 d.field("names", &guard.len());
38 }
39 _ => {
40 d.field("names", &"<Locked>");
41 }
42 }
43 d.finish_non_exhaustive()
44 }
45}
46
47const MAX_PERSIST_AGE: f64 = 57.0;
48
49impl Datastore {
50 pub async fn new(pool: SqlitePool) -> Result<Self, Error> {
56 let buffer = VecBuffer::new();
57 let drop_event = Self::drop_event(pool.clone(), buffer.clone(), None);
58 let names = Arc::new(Mutex::new(Trie::new()));
59 let res = Self {
60 pool,
61 buffer,
62 names,
63 drop_event,
64 };
65 Ok(res)
66 }
67
68 #[must_use]
69 pub fn pool(&self) -> SqlitePool {
70 self.pool.clone()
71 }
72 #[cfg(test)]
73 async fn get_name(&self, key: &str) -> Result<String, Error> {
74 raw::get_name(&self.pool, key).await
75 }
76
77 #[cfg(test)]
78 async fn check_tempdata(&self) -> Result<(), Error> {
79 if let Some(oldest) = self.buffer.oldest() {
80 info!("Oldst value is {}", oldest);
81 } else {
82 info!("Buffer was locked or otherwise occupied.");
83 }
84 Ok(())
85 }
86 #[cfg(test)]
87 async fn sensor_id(&self, key: &str) -> Result<i64, Error> {
88 self.add_sensor(key).await?;
89 raw::sensor_id(&self.pool, key).await
90 }
91 #[cfg(test)]
97 pub(crate) async fn count_transactions(&self) -> Result<i32, Error> {
98 let count = raw::count_transactions(&self.pool).await?;
99 Ok(count)
100 }
101
102 #[instrument]
108 pub async fn get_statistics(&self) -> Result<Statistics, Error> {
109 let mut res = raw::get_statistics(&self.pool).await?;
110 res.buffered = self.buffer.count().map_or(-1, |v| v as i32);
113 Ok(res)
114 }
115 #[instrument(level = "debug", skip(value, time, timefail))]
120 pub async fn insert(
121 &self,
122 key: &str,
123 value: &str,
124 time: f64,
125 timefail: bool,
126 ) -> Result<(), Error> {
127 let arf = vec![Metric {
128 name: key.into(),
129 value: value.into(),
130 time,
131 }];
132 self.insert_bulk(arf, timefail).await
133 }
134
135 async fn insert_bulk_buffer(&self, data: Vec<Metric>, status: TimeStatus) -> Result<(), Error> {
140 for metric in data {
141 self.buffer.add_metric(metric, status).await;
142 }
143 Ok(())
144 }
145 #[instrument(level = "debug")]
146 async fn add_sensor(&self, key: &str) -> Result<(), Error> {
147 {
148 let mut guard = self.names.lock().await;
149 if guard.get(key).is_none() {
150 raw::add_sensor(&self.pool, key).await?;
151 guard.insert(key.to_string(), ());
152 }
153 }
154 Ok(())
155 }
156
157 #[instrument(level = "debug", skip(data))]
163 pub async fn insert_bulk(&self, data: Vec<Metric>, timefail: bool) -> Result<(), Error> {
164 for metric in &data {
166 self.add_sensor(&metric.name).await?;
167 }
168 let status = TimeStatus::from_bool(timefail);
169 self.insert_bulk_buffer(data, status).await?;
170 Ok(())
171 }
172 #[instrument]
177 pub async fn should_persist(&self) -> Result<bool, Error> {
178 const CUTOFF: usize = 100;
179 #[allow(clippy::option_if_let_else)]
180 if let Some(num) = self.buffer.count() {
181 Ok(num > CUTOFF)
182 } else {
183 warn!("Failed to count data in buffer, assuming full.");
184 Ok(true)
185 }
186 }
187
188 #[instrument]
193 pub async fn should_persist_age(&self) -> Result<bool, Error> {
194 let now = fxtime();
195 #[allow(clippy::option_if_let_else)]
196 match self.buffer.oldest() {
197 Some(eldest) => {
198 let age = now - eldest;
199 debug!("Oldest value in buffer is {} ( age: {}s )", &eldest, &age);
200 Ok(age > MAX_PERSIST_AGE)
201 }
202 None => {
203 error!("Buffer locked, assuming we should persist");
204 Ok(true)
205 }
206 }
207 }
208 #[instrument(level = "info", skip(self))]
210 pub async fn persist_data(&self) -> Result<(), Error> {
211 let delete_buffer = self.buffer.consume_metrics().await;
212 debug!("Persisting data, count={}", delete_buffer.len());
213 raw::persist_data(&self.pool, &delete_buffer)
214 .await
215 .inspect_err(|e| error!("Failed to persist, err={e:?}, pool={:?}", &self.pool))?;
216 Ok(())
217 }
218
219 #[instrument(level = "info", skip(self))]
221 pub async fn maybe_persist_data(&self) -> Result<(), Error> {
222 if self.should_persist_age().await? {
223 let delete_buffer = self.buffer.consume_metrics().await;
224 if let Err(e) = raw::persist_data(&self.pool, &delete_buffer).await {
225 self.buffer.add_metrics(delete_buffer).await;
227 warn!("Failed to persist data, continuing. err={:?}", e);
228 };
229 }
230 Ok(())
231 }
232
233 fn drop_event(
242 pool: SqlitePool,
243 buffer: VecBuffer,
244 dbfile: Option<Arc<NamedTempFile>>,
245 ) -> Arc<tokio::sync::Notify> {
246 let notify = Arc::new(tokio::sync::Notify::new());
247 let waiting = notify.clone();
248 task::spawn(async move {
249 waiting.notified().await;
250 info!(
251 "Persisting Buffered data to disk due to deallocation, file={:?}",
252 dbfile
253 );
254 let delete_buffer = buffer.consume_metrics().await;
255 if !delete_buffer.is_empty() {
256 debug!("Persisting count={} buffered values", delete_buffer.len());
257 if let Err(e) = raw::persist_data(&pool, &delete_buffer).await {
258 error!(
259 "Failed to write to database {:?} {:?} file={:?}",
260 &pool, e, &dbfile
261 );
262 if let Some(fil) = dbfile {
263 error!("File: exists: {:?}", fil.path().try_exists());
264 error!("File: metadata: {:?}", fil.path().metadata());
265 }
266 panic!("Failed to persist data {e}");
267 }
268 };
269 info!("Closing pool due to permanence.");
270 pool.close().await;
271 });
272 notify
273 }
274}
275
276impl Drop for Datastore {
277 fn drop(&mut self) {
278 let size = self.pool.size();
279 let closed = self.pool.is_closed();
280 debug!("Dropping datastore size={size}, closed={closed}");
281 self.drop_event.notify_waiters();
286 }
287}
288
289impl Datastore {
291 #[instrument]
292 pub async fn get_batch(&self, size: u32) -> Result<Vec<TXMetric>, Error> {
293 let res = raw::get_batch(&self.pool, size).await?;
294 if res.len() >= (size as usize) {
295 return Ok(res);
296 }
297 self.maybe_persist_data().await?;
313 let res = raw::get_batch(&self.pool, size).await?;
314 Ok(res)
315 }
316
317 #[instrument]
318 pub async fn get_internal_batch(&self, size: u32) -> Result<Vec<TXMetric>, Error> {
319 let res = raw::get_internal_batch(&self.pool, size).await?;
320 if res.len() >= (size as usize) {
322 return Ok(res);
323 }
324 self.maybe_persist_data().await?;
325 let res = raw::get_internal_batch(&self.pool, size).await?;
326 Ok(res)
327 }
328
329 #[instrument(level = "debug", skip(ids))]
330 pub async fn drop_batch(&self, ids: &[i64]) -> Result<(), Error> {
331 raw::drop_batch(&self.pool, ids).await?;
332 Ok(())
333 }
334
335 #[instrument(level = "debug")]
337 pub async fn fix_timefail(&self, adjust: f32) -> Result<u64, Error> {
338 self.persist_data().await?;
339 let count = raw::fix_timefail(&self.pool, adjust).await?;
340 info!(
341 "Updated TIMEFAIL status for count={} items with offset={}",
342 &count, adjust
343 );
344 Ok(count)
345 }
346 #[instrument(level = "debug")]
347 pub async fn get_last_datapoint(&self, key: &str) -> Result<Metric, Error> {
348 if let Some(metric) = self.buffer.get_metric(key).await {
350 return Ok(metric);
351 }
352 let res = raw::get_last_datapoint(&self.pool, key).await?;
353 Ok(res)
354 }
355 #[instrument(level = "debug", skip_all, fields(self))]
357 pub async fn get_latest_logdata(&self) -> Result<Vec<Metric>, Error> {
358 self.maybe_persist_data().await?;
363 let res = raw::get_latest_logdata(&self.pool).await?;
364 Ok(res)
365 }
366}
367
368mod changes {
370 use super::raw;
371 use super::Datastore;
372 use crate::buffer::TimeStatus;
373 use crate::types::Transaction;
374 use crate::Error;
375 use tracing::instrument;
376 use tracing::{debug, info};
377
378 impl Datastore {
379 #[instrument(level = "debug")]
380 pub async fn has_transaction(&self, token: &str) -> Result<bool, Error> {
381 let res = raw::has_transaction(&self.pool, token).await?;
382 Ok(res)
383 }
384
385 #[instrument(level = "info", skip(expected, target, token))]
386 pub async fn transaction_add(
387 &self,
388 key: &str,
389 expected: &str,
390 target: &str,
391 token: &str,
392 ) -> Result<(), Error> {
393 let internal_key = format!("mytemp.internal.change.{key}");
394 self.add_sensor(key).await?;
395 self.add_sensor(&internal_key).await?;
396 info!(
397 "Storing transaction {}, '{}' => '{}'",
398 &key, &expected, &target
399 );
400 raw::transaction_add(&self.pool, key, expected, target, token).await?;
401 Ok(())
402 }
403 #[instrument(level = "info")]
404 pub async fn transaction_get(&self, prefix: &str) -> Result<Vec<Transaction>, Error> {
405 let res = raw::transaction_get(&self.pool, prefix).await?;
406 Ok(res)
407 }
408 #[instrument]
409 pub async fn transaction_fail(
410 &self,
411 transaction_id: i64,
412 timefail: bool,
413 ) -> Result<u64, Error> {
414 use crate::datastore::TransactionStatus::Failed;
415 debug!("Failing transaction with id={}", transaction_id);
416 let timefail = TimeStatus::from_bool(timefail);
417 let count = raw::transaction_mark(&self.pool, transaction_id, Failed, timefail).await?;
418 Ok(count)
419 }
420
421 #[instrument]
422 pub async fn transaction_pass(
423 &self,
424 transaction_id: i64,
425 timefail: bool,
426 ) -> Result<u64, Error> {
427 use crate::datastore::TransactionStatus::Success;
428 debug!("Passing transaction with id={}", transaction_id);
429 let timefail = TimeStatus::from_bool(timefail);
430 let count =
431 raw::transaction_mark(&self.pool, transaction_id, Success, timefail).await?;
432 Ok(count)
433 }
434
435 #[instrument(level = "info")]
437 pub async fn transaction_fail_pending(&self) -> Result<u64, Error> {
438 let count = raw::transaction_fail_pending(&self.pool).await?;
439 info!("Marked {} pending transactions as failed", count);
440 Ok(count)
441 }
442 }
443}
444
445mod clean {
446 use super::raw;
447 use super::Datastore;
448 use crate::CleanResult;
449 use crate::Error;
450 use sqlx::{Connection, SqlitePool};
451 use tracing::instrument;
452 use tracing::{debug, error};
453
454 impl Datastore {
455 #[instrument(level = "info", skip(pool))]
456 pub async fn clean_maintenance(pool: SqlitePool) -> Result<CleanResult, Error> {
457 let mut conn = pool
458 .acquire()
459 .await
460 .inspect_err(|e| error!(err=?e, "Failed to get connection"))?;
461 conn.ping().await?;
462 let trans_failed = raw::fail_queued_transactions(&mut conn)
465 .await
466 .inspect_err(|e| error!("Failed to mark queued transactions. err={:?}", e))
467 .unwrap_or(-1);
468 let trans_old = raw::delete_old_transactions(&mut conn)
469 .await
470 .inspect_err(|e| error!("Failed to delete old transactions. err={:?}", e))
471 .unwrap_or(-1);
472 let data_deleted = raw::delete_old_logdata(&mut conn)
473 .await
474 .inspect_err(|e| error!("Failed to delete old logdata. err={:?}", e))
475 .unwrap_or(-1);
476 raw::need_vacuum_or_shrink(&mut conn)
477 .await
478 .inspect_err(|e| error!("Failed to vacuum / shrink. err={:?}", e))?;
479 conn.close().await?;
480 let res = CleanResult {
481 trans_failed,
482 trans_old,
483 data_deleted,
484 };
485 Ok(res)
486 }
487
488 #[cfg(test)]
489 pub(crate) async fn delete_old_transactions(&self) -> Result<i32, Error> {
491 debug!("delete_old_transactions, grabbing pool connection");
492 let mut conn = self
493 .pool
494 .acquire()
495 .await
496 .inspect_err(|e| error!(err=?e, "Failed to acquire connection"))?;
497 let res = raw::delete_old_transactions(&mut conn).await?;
498 conn.close().await?;
499 Ok(res)
500 }
501
502 #[instrument]
504 pub async fn fail_queued_transactions(&self) -> Result<i32, Error> {
505 debug!("fail_queued_transactions, grabbing pool connection");
506 let mut conn = self
507 .pool
508 .acquire()
509 .await
510 .inspect_err(|e| error!(err=?e, "Failed to acquire connection"))?;
511 let res = raw::fail_queued_transactions(&mut conn).await?;
512 conn.close().await?;
513 Ok(res)
514 }
515
516 #[cfg(test)]
518 pub(crate) async fn delete_old_logdata(&self) -> Result<i32, Error> {
519 debug!("delete_old_logdata, grabbing pool connection");
520 let mut conn = self
521 .pool
522 .acquire()
523 .await
524 .inspect_err(|e| error!(err=?e, "Failed to acquire connection"))?;
525 let res = raw::delete_old_logdata(&mut conn).await?;
526 conn.close().await?;
527 Ok(res)
528 }
529
530 #[cfg(test)]
532 pub async fn delete_random_data(&self) -> Result<i32, Error> {
533 debug!("delete_random_data, grabbing pool connection");
534 let mut conn = self
535 .pool
536 .acquire()
537 .await
538 .inspect_err(|e| error!(err=?e, "Failed to acquire connection"))?;
539 let res = raw::delete_random_data(&mut conn).await?;
540 conn.close().await?;
541 Ok(res)
542 }
543 }
544}
545
546impl Datastore {
547 pub async fn temporary() -> Self {
552 let dbfile = tempfile::Builder::new()
553 .prefix("database")
554 .suffix(".sqlite")
555 .tempfile()
556 .expect("Error on tempfile");
557 Self::new_tempfile(dbfile.into()).await
558 }
559
560 pub async fn new_tempfile(dbfile: Arc<NamedTempFile>) -> Self {
565 use crate::db::SqlitePoolBuilder;
566 let pool = SqlitePoolBuilder::new()
567 .db_path(dbfile.path())
568 .migrate(true)
569 .build()
570 .await
571 .expect("Failed to build pool");
572
573 let buffer = VecBuffer::new();
574 let drop_event = Self::drop_event(pool.clone(), buffer.clone(), Some(dbfile));
575 let names = Arc::new(Mutex::new(Trie::new()));
576 Self {
577 pool,
578 buffer,
579 names,
580 drop_event,
581 }
582 }
583}
584
585#[cfg(test)]
586fn metrc(name: &str, value: &str, time: f64) -> crate::Metric {
587 crate::Metric {
588 name: name.into(),
589 value: value.into(),
590 time,
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use super::*;
597 use crate::db::SqlitePoolBuilder;
598 use std::error::Error;
599 use test_log::test;
600 use timeout_macro::timeouttest;
601 type TestResult = Result<(), Box<dyn Error>>;
602
603 #[test(timeouttest)]
604 async fn has_file_database() -> TestResult {
605 let tempfile = tempfile::Builder::new()
606 .prefix("loggerdb_has_file_database")
607 .suffix(".sqlite")
608 .tempfile()?;
609 let named_path = tempfile.path();
610 let pool = SqlitePoolBuilder::new()
611 .db_path(named_path)
612 .migrate(true)
613 .build()
614 .await
615 .unwrap();
616 sqlx::query("SELECT * FROM sensor where sensor.name = 'mytemp.internal.sensors'")
617 .fetch_one(&pool)
618 .await?;
619 pool.close().await;
621 drop(pool);
622 Ok(())
623 }
624
625 #[test(timeouttest)]
626 async fn datastore_tempdata() {
627 let ds = Datastore::temporary().await;
628 ds.check_tempdata()
629 .await
630 .expect("Should be able to get data from temp table");
631 }
632
633 #[test(timeouttest)]
634 async fn datastore_names() -> TestResult {
635 let ds = Datastore::temporary().await;
636 let res = ds.get_name("mytemp.internal.sensors").await?;
637 assert_eq!(res, "mytemp.internal.sensors");
638 Ok(())
639 }
640
641 #[test(timeouttest)]
642 async fn add_one_name() -> TestResult {
643 let ds = Datastore::temporary().await;
644 let res = ds.sensor_id("test.test.test").await?;
645 assert_eq!(res, 2);
646 Ok(())
647 }
648
649 #[test(timeouttest)]
650 async fn add_two_names() -> TestResult {
651 let ds = Datastore::temporary().await;
652 let res = ds.sensor_id("test.test.test").await?;
653 assert_eq!(res, 2);
654 let res = ds.sensor_id("test.test.test").await?;
655 assert_eq!(res, 2);
656 let res = ds.sensor_id("test.test.test").await?;
657 assert_eq!(res, 2);
658 let res = ds.get_name("test.test.test").await?;
659 assert_eq!(res, "test.test.test");
660 Ok(())
661 }
662
663 #[test(timeouttest)]
664 async fn insert_timefail() -> TestResult {
665 let ds = Datastore::temporary().await;
666 ds.insert("test.test.ok", "value", 1_620_850_252.0, false)
667 .await?;
668 ds.insert("test.test.ok", "value1", 1_620_850_253.0, false)
669 .await?;
670 ds.insert("test.test.ok", "value2", 1_620_850_255.0, false)
671 .await?;
672 ds.insert("test.test.fimefail", "value", 1_620_850_252.0, true)
673 .await?;
674 Ok(())
675 }
676
677 #[test(timeouttest)]
678 async fn retrieve_last_empty() -> TestResult {
679 let ds = Datastore::temporary().await;
680 let pool = ds.pool();
681 ds.insert("test.test.ok", "value", 1_620_850_252.0, false)
682 .await?;
683 let before_sync = ds.get_last_datapoint("test.test.ok").await;
684 assert!(before_sync.is_ok(), "Should have a value");
685
686 ds.persist_data().await?;
687
688 let after_sync = ds.get_last_datapoint("test.test.ok").await;
689 assert!(after_sync.is_ok(), "Should have a value");
690
691 let delete_all = sqlx::query("DELETE FROM logdata");
692 delete_all.execute(&pool).await?;
693
694 let after = ds.get_last_datapoint("test.test.ok").await;
695 assert!(after.is_err(), "Should fail");
696 Ok(())
697 }
698
699 #[test(timeouttest)]
700 async fn retrieve_sorting() -> TestResult {
701 let ds = Datastore::temporary().await;
702 ds.insert("test.test.one", "value0", 1_620_850_000.0, false)
703 .await?;
704 ds.insert("test.test.one", "value1", 1_620_850_111.0, true)
705 .await?;
706 ds.insert("test.test.one", "value3", 1_620_850_222.0, false)
707 .await?;
708 ds.insert("test.test.two", "value3", 1_620_850_333.0, true)
709 .await?;
710 ds.insert("test.test.two", "value1", 1_620_850_222.0, false)
711 .await?;
712 ds.insert("test.test.two", "value0", 1_620_850_111.0, true)
713 .await?;
714 let res = ds.get_last_datapoint("test.test.two").await?;
715 assert_eq!(res.name, "test.test.two");
716 assert_eq!(res.value, "value3");
717 assert_eq!(res.time, 1_620_850_333.0);
718
719 let res = ds.get_last_datapoint("test.test.one").await?;
720 assert_eq!(res.name, "test.test.one");
721 assert_eq!(res.value, "value3");
722 assert_eq!(res.time, 1_620_850_222.0);
723 Ok(())
724 }
725
726 #[test(timeouttest)]
727 async fn insert_bulk() -> TestResult {
728 let ds = Datastore::temporary().await;
729
730 let vals = vec![
731 metrc("test.test.one", "etta", 16_208_501_111.0),
732 metrc("test.test.two", "etta", 16_208_501_111.0),
733 metrc("test.test.three", "etta", 16_208_501_112.0),
734 metrc("test.test.one", "tvåa", 16_208_502_222.0),
735 metrc("test.test.two", "tvåa", 16_208_502_223.0),
736 metrc("test.test.three", "tvåa", 16_208_502_222.0),
737 metrc("test.test.one", "trea", 16_208_503_333.0),
738 metrc("test.test.two", "trea", 16_208_503_331.0),
739 metrc("test.test.three", "trea", 16_208_503_333.0),
740 metrc("test.test.one", "fyra", 16_208_504_444.0),
741 ];
742 ds.insert_bulk(vals, true).await?;
743 let res = ds.get_last_datapoint("test.test.one").await?;
744 assert_eq!(res.value, "fyra");
745 let res = ds.get_last_datapoint("test.test.three").await?;
746 assert_eq!(res.value, "trea");
747 let res = ds.get_last_datapoint("test.test.two").await?;
748 assert_eq!(res.value, "trea");
749 Ok(())
750 }
751
752 #[test(timeouttest)]
753 async fn insert_persist() -> TestResult {
754 let ds = Datastore::temporary().await;
755
756 let vals = vec![
757 metrc("test.test.one", "etta", 16_208_501_111.0),
758 metrc("test.test.two", "etta", 16_208_501_111.0),
759 metrc("test.test.three", "etta", 16_208_501_112.0),
760 ];
761 let seq = 0..1000;
763 let more: Vec<Metric> = seq
764 .map(|x| metrc("test.test.three", &format!("{x}"), fxtime()))
765 .collect();
766
767 ds.insert_bulk(vals, true).await?;
768 let first = ds.should_persist().await?;
769 assert!(!first, "Should not need persist with only 3 values");
770
771 let should = ds.buffer.get_metric("test.test.one").await.is_some();
772 assert!(
773 should,
774 "Should need persist because test.test.one is in buffer"
775 );
776
777 ds.insert_bulk(more, true).await?;
779 let second = ds.should_persist().await?;
780 assert!(second, "Should need persist with more values");
781
782 let res = ds.persist_data().await;
784 assert!(res.is_ok(), "We should have succesfully persisted data");
785
786 let third = ds.should_persist().await?;
788 assert!(!third, "We should not need persist again");
789
790 let should = ds.buffer.get_metric("test.test.one").await.is_some();
791 assert!(!should, "This key should no longer be in the buffer");
792
793 ds.persist_data().await?;
795 Ok(())
796 }
797
798 #[test(timeouttest)]
799 async fn persist_delete_and_purge() -> TestResult {
800 let ds = Datastore::temporary().await;
801 let vals: Vec<Metric> = (0..1000)
802 .map(|x| {
803 metrc(
804 &format!("test.test.bulk.{}", x % 7),
805 &x.to_string(),
806 fxtime(),
807 )
808 })
809 .collect();
810 ds.insert_bulk(vals, true).await?;
811 let should_count = ds.should_persist().await?;
812 let should_age = ds.should_persist_age().await?;
813 assert!(should_count, "Should persist based on count");
814 assert!(!should_age, "Oldest should not be that big");
815 ds.persist_data().await?;
816 let count = ds.delete_random_data().await?;
817 assert!(count > 0, "Should have deleted some data");
818 Ok(())
819 }
820
821 #[test(timeouttest)]
822 async fn delete_old_logged_values() -> TestResult {
823 let ds = Datastore::temporary().await;
824 ds.insert("test.test.ok", "one", 1_620_850_000.0, false)
825 .await?;
826 ds.insert("test.test.ok", "two", 1_999_950_251.0, false)
827 .await?;
828
829 let metric = ds.get_last_datapoint("test.test.ok").await?;
831 assert_eq!(metric.value, "two");
832
833 let to_xmit = ds.get_batch(50).await?;
835 let mut to_remove = Vec::<i64>::with_capacity(5);
836 for i in to_xmit {
837 to_remove.push(i.id);
838 }
839 ds.drop_batch(&to_remove).await?;
840 let metric = ds.get_last_datapoint("test.test.ok").await?;
844 assert_eq!(metric.value, "two");
845
846 let count = ds.delete_old_logdata().await?;
848 assert_eq!(count, 1);
849
850 let metric = ds.get_last_datapoint("test.test.ok").await?;
852 assert_eq!(metric.value, "two");
853
854 let count = ds.delete_old_logdata().await?;
856 assert_eq!(count, 0);
858
859 Ok(())
860 }
861
862 #[test(timeouttest)]
865 async fn get_latest_datapoints() -> TestResult {
866 let ds = Datastore::temporary().await;
867
868 let vals = vec![
869 metrc("test.test.one", "etta", 1_620_850_111.0),
870 metrc("test.test.two", "etta", 1_620_850_111.0),
871 metrc("test.test.three", "etta", 1_620_850_111.0),
872 metrc("test.test.one", "tvåa", 1_620_850_222.0),
873 metrc("test.test.two", "tvåa", 1_620_850_222.0),
874 metrc("test.test.three", "tvåa", 1_620_850_222.0),
875 metrc("test.test.one", "trea", 1_620_850_333.0),
876 metrc("test.test.three", "trea", 1_620_850_333.0),
877 metrc("test.test.one", "fyra", 1_620_850_444.0),
878 ];
879 ds.insert_bulk(vals, true).await?;
880 ds.persist_data().await?;
881 let res = ds.get_latest_logdata().await?;
882 assert_eq!(res[0].name, "test.test.two");
883 assert_eq!(res[0].value, "tvåa");
884 assert_eq!(res[1].name, "test.test.three");
885 assert_eq!(res[1].value, "trea");
886 assert_eq!(res[2].name, "test.test.one");
887 assert_eq!(res[2].value, "fyra");
888 assert_eq!(res.len(), 3);
889 Ok(())
890 }
891
892 #[test(timeouttest)]
894 async fn get_latest_datapoints_persists() -> TestResult {
895 let ds = Datastore::temporary().await;
896
897 let vals = vec![
898 metrc("test.test.one", "etta", fxtime() - MAX_PERSIST_AGE),
899 metrc("test.test.one", "tvåa", fxtime() - MAX_PERSIST_AGE / 2.0),
900 metrc("test.test.one", "trea", fxtime() - 1.0),
901 ];
902 ds.insert_bulk(vals, true).await?;
903 let res = ds.get_latest_logdata().await?;
904 assert_eq!(res[0].name, "test.test.one");
905 assert_eq!(res[0].value, "trea");
906 assert_eq!(res.len(), 1);
907
908 let vals = vec![metrc("test.test.one", "fyra", fxtime())];
911 ds.insert_bulk(vals, true).await?;
912 let res = ds.get_latest_logdata().await?;
913 assert_eq!(res[0].name, "test.test.one");
914 assert_eq!(res[0].value, "trea");
915 assert_eq!(res.len(), 1);
916
917 ds.persist_data().await?;
919 let res = ds.get_latest_logdata().await?;
920 assert_eq!(res[0].name, "test.test.one");
921 assert_eq!(res[0].value, "fyra");
922 assert_eq!(res.len(), 1);
923
924 Ok(())
925 }
926
927 #[test(timeouttest)]
928 async fn get_transactions() -> TestResult {
929 let ds = Datastore::temporary().await;
930
931 let vals = vec![
932 metrc("test.test.one", "etta", 16_208_501_111.0),
933 metrc("test.test.two", "tvåa", 16_208_504_444.0),
934 ];
935 ds.insert_bulk(vals, false).await?;
936 ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
937 .await?;
938 let res = ds.transaction_get("test.test.one").await?;
939 assert_eq!(res.len(), 1, "Expecting only one result");
940 assert_eq!(
941 res[0].name, "test.test.one",
942 "Expecting key to match added transaction"
943 );
944 assert_eq!(res[0].expected, "etta", "Epected value mismatch");
945 assert_eq!(res[0].target, "ettatvåa", "Target value mismatch");
946 assert_eq!(res[0].t_id, 1, "Transaction ID mismatch");
947 Ok(())
948 }
949
950 async fn with_keys() -> Datastore {
951 let ds = Datastore::temporary().await;
952 let vals = vec![
953 metrc("test.test.one", "etta", 16_208_501_111.0),
954 metrc("test.test.two", "etta", 16_208_501_111.0),
955 metrc("test.test.three", "etta", 16_208_501_112.0),
956 metrc("test.test.two", "tvåa", 16_208_502_223.0),
957 metrc("test.test.three", "tvåa", 16_208_502_222.0),
958 metrc("test.test.three", "trea", 16_208_503_333.0),
959 ];
960
961 ds.insert_bulk(vals, false).await.unwrap();
962 ds
963 }
964
965 #[test(timeouttest)]
966 async fn transmit_drop() -> TestResult {
967 let ds = with_keys().await;
968 ds.insert("modio.test.one", "modio-ett", 16_208_502_222.0, false)
969 .await?;
970 ds.insert("modio.test.one", "modio-ett", 16_208_502_222.0, true)
971 .await?;
972 ds.persist_data().await?;
973 let to_xmit = ds.get_batch(50).await?;
974 let mut to_remove = Vec::<i64>::with_capacity(50);
975 for i in to_xmit {
976 to_remove.push(i.id);
977 }
978 ds.drop_batch(&to_remove).await?;
979 let second = ds.get_batch(10).await?;
980 assert_eq!(second.len(), 0, "No elements should remain");
981 let third = ds.get_internal_batch(5).await?;
982 assert_eq!(
983 third.len(),
984 1,
985 "Expecting an internal value, because one is timefailed."
986 );
987 Ok(())
988 }
989 #[test(timeouttest)]
990 async fn timefail_handling_internal() -> TestResult {
991 let ds = with_keys().await;
992 ds.insert("modio.test.one", "modio-ett", 16_208_502_222.0, true)
993 .await?;
994 ds.insert("modio.test.one", "modio-ett", 16_208_502_221.0, true)
995 .await?;
996 let res = ds.get_internal_batch(10).await?;
997 assert_eq!(res.len(), 0);
998 ds.fix_timefail(10.0).await?;
999 let res = ds.get_internal_batch(10).await?;
1000 assert_eq!(res.len(), 2);
1001 Ok(())
1002 }
1003
1004 #[test(timeouttest)]
1005 async fn timefail_handling() -> TestResult {
1006 let ds = with_keys().await;
1007 ds.insert("test.test.one", "modio-ett", 1_620_850_222.0, true)
1008 .await?;
1009 ds.insert("test.test.two", "modio-två", 1_620_850_222.0, true)
1010 .await?;
1011 let res = ds.get_batch(10).await?;
1012 assert_eq!(res.len(), 6);
1013 ds.fix_timefail(10.0).await?;
1014 let res = ds.get_batch(10).await?;
1015 assert_eq!(res.len(), 8);
1016 Ok(())
1017 }
1018
1019 #[test(timeouttest)]
1020 async fn fail_transactions() -> TestResult {
1021 use serde::Deserialize;
1022 #[derive(Deserialize)]
1023 struct TransJson {
1024 id: String,
1025 clock: i64,
1026 status: String,
1027 }
1028 let ds = with_keys().await;
1029
1030 ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
1031 .await?;
1032 ds.transaction_add("test.test.two", "etta", "ettatvåa", "YYyyyyYYY")
1033 .await?;
1034 let first = ds.transaction_get("test.test.one").await?;
1035 assert_eq!(first.len(), 1, "Expecting only one result");
1036 assert_eq!(
1037 first[0].name, "test.test.one",
1038 "Expecting key to match added transaction"
1039 );
1040 ds.transaction_fail(first[0].t_id, false).await?;
1041 let logrow = ds
1042 .get_last_datapoint("mytemp.internal.change.test.test.one")
1043 .await?;
1044 let v1: TransJson = serde_json::from_str(&logrow.value)?;
1045 assert_eq!(v1.status, "FAILED");
1046 assert_eq!(v1.id, "xxxXXxx");
1047 assert!(v1.clock > 0);
1048
1049 let second = ds.transaction_get("test.test.one").await?;
1050 assert_eq!(second.len(), 0, "Should not have pending transactions");
1051
1052 let third = ds.transaction_get("test.test").await?;
1053 assert_eq!(third.len(), 1, "Should have pending for test.test.two");
1054 assert_eq!(
1055 third[0].name, "test.test.two",
1056 "Expecting key to match transaction two"
1057 );
1058 Ok(())
1059 }
1060 #[test(timeouttest)]
1061 async fn empty_fetch() {
1062 let ds = with_keys().await;
1063 let res = ds.get_last_datapoint("abc.def.ghi").await;
1064 assert!(res.is_err(), "Should have an error from absent keys");
1065 }
1066
1067 #[test(timeouttest)]
1068 async fn pass_transactions() -> TestResult {
1069 use serde::Deserialize;
1070 let ds = with_keys().await;
1071
1072 #[derive(Deserialize)]
1073 struct TransJson {
1074 id: String,
1075 clock: i64,
1076 status: String,
1077 }
1078
1079 ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
1080 .await?;
1081 ds.transaction_add("test.test.two", "etta", "ettatvåa", "YYyyyyYYY")
1082 .await?;
1083 let first = ds.transaction_get("test.test.one").await?;
1084
1085 assert_eq!(first.len(), 1, "Expecting only one result");
1086 assert_eq!(
1087 first[0].name, "test.test.one",
1088 "Expecting key to match added transaction"
1089 );
1090 ds.transaction_pass(first[0].t_id, false).await?;
1091 let logrow = ds
1092 .get_last_datapoint("mytemp.internal.change.test.test.one")
1093 .await?;
1094 let v1: TransJson = serde_json::from_str(&logrow.value)?;
1095 assert_eq!(v1.status, "SUCCESS");
1096 assert_eq!(v1.id, "xxxXXxx");
1097 assert!(v1.clock > 0);
1098
1099 let second = ds.transaction_get("test.test.one").await?;
1100 assert_eq!(second.len(), 0, "Should not have pending transactions");
1101
1102 let third = ds.transaction_get("test.test").await?;
1103 assert_eq!(third.len(), 1, "Should have pending for test.test.two");
1104 assert_eq!(
1105 third[0].name, "test.test.two",
1106 "Expecting key to match transaction two"
1107 );
1108 let third_row = ds
1110 .get_last_datapoint("mytemp.internal.change.test.test.two")
1111 .await;
1112 assert!(third_row.is_err());
1113 Ok(())
1114 }
1115
1116 #[test(timeouttest)]
1117 async fn delete_old_transactions() -> TestResult {
1118 let ds = with_keys().await;
1119
1120 ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
1121 .await?;
1122 ds.transaction_add("test.test.one", "tvåa", "ettatvåa", "zzZZZzzz")
1123 .await?;
1124 ds.transaction_add("test.test.two", "etta", "ettatvåa", "YYyyyyYYY")
1125 .await?;
1126
1127 assert_eq!(ds.count_transactions().await?, 3);
1128 let trans = ds.transaction_get("test.test.one").await?;
1130 ds.transaction_pass(trans[0].t_id, false).await?;
1131 ds.transaction_fail(trans[1].t_id, false).await?;
1132
1133 assert_eq!(ds.count_transactions().await?, 3);
1134
1135 let to_xmit = ds.get_batch(50).await?;
1137 let mut to_remove = Vec::<i64>::with_capacity(5);
1138 for i in to_xmit {
1139 to_remove.push(i.id);
1140 }
1141 ds.drop_batch(&to_remove).await?;
1142 assert_eq!(ds.count_transactions().await?, 3);
1144 let count = ds.delete_old_transactions().await?;
1146 assert_eq!(count, 1);
1147
1148 assert_eq!(ds.count_transactions().await?, 2);
1151
1152 let count = ds.delete_old_transactions().await?;
1154 assert_eq!(count, 0);
1156 Ok(())
1157 }
1158
1159 #[test(timeouttest)]
1160 async fn fail_queued_transactions() -> TestResult {
1161 use uuid::Uuid;
1162
1163 let ds = with_keys().await;
1164
1165 for x in 0..18 {
1166 let tok = Uuid::new_v4().hyphenated().to_string();
1167 let val = format!("newval{x}");
1168 ds.transaction_add("test.test.one", "etta", &val, &tok)
1169 .await?;
1170 }
1171 ds.transaction_add("test.test.two", "etta", "ettatvåa", "xxxXXxx")
1172 .await?;
1173 ds.transaction_add("test.test.three", "tvåa", "ettatvåa", "zzZZZzzz")
1174 .await?;
1175 ds.transaction_add("test.test.four", "etta", "ettatvåa", "YYyyyyYYY")
1176 .await?;
1177 ds.transaction_add("test.test.one", "etta", "lasttarget", "xxXXxxxXXxxx")
1178 .await?;
1179 assert_eq!(ds.count_transactions().await?, 22);
1180
1181 let count = ds.fail_queued_transactions().await?;
1183 assert_eq!(count, 3, "3 should be marked as failed");
1184
1185 ds.delete_old_transactions().await?;
1186 assert_eq!(ds.count_transactions().await?, 19, "19 should exist");
1187
1188 let res = ds.transaction_get("test.test.one").await?;
1189 let lastval = res.last().unwrap();
1190 assert_eq!(
1191 lastval.target, "lasttarget",
1192 "Expecting last transaction to be the last added"
1193 );
1194 Ok(())
1195 }
1196}
1197
1198mod metadata {
1199 use super::{raw, TagType};
1200 use super::{Datastore, Error};
1201 use crate::types::DataType;
1202 use crate::types::Metadata;
1203 use crate::types::SensorMode;
1204 use crate::types::ValueMap;
1205 use std::collections::{BTreeMap, HashMap};
1206 use tracing::info;
1207 use tracing::instrument;
1208
1209 #[derive(Debug)]
1210 enum KeyType {
1211 Modio,
1212 Customer,
1213 }
1214 impl Datastore {
1215 #[instrument]
1216 pub async fn metadata_get_names(&self) -> Result<Vec<Metadata>, Error> {
1217 info!("Requested all names");
1218 let vals = self.metadata_get_tag(TagType::Name).await?;
1219 let res = vals
1220 .into_iter()
1221 .map(|(key, name)| Metadata::builder(key).name(name).build())
1222 .collect();
1223 Ok(res)
1224 }
1225
1226 #[instrument]
1227 pub async fn metadata_get_units(&self) -> Result<Vec<Metadata>, Error> {
1228 info!("Requested all units");
1229 let vals = self.metadata_get_tag(TagType::Unit).await?;
1230 let res = vals
1231 .into_iter()
1232 .map(|(key, unit)| Metadata::builder(key).unit(unit).build())
1233 .collect();
1234 Ok(res)
1235 }
1236
1237 #[instrument]
1238 pub async fn metadata_get_descriptions(&self) -> Result<Vec<Metadata>, Error> {
1239 info!("Requested all units");
1240 let vals = self.metadata_get_tag(TagType::Description).await?;
1241 let res = vals
1242 .into_iter()
1243 .map(|(key, description)| Metadata::builder(key).description(description).build())
1244 .collect();
1245 Ok(res)
1246 }
1247
1248 #[instrument]
1249 pub async fn metadata_get_enum(&self) -> Result<Vec<Metadata>, Error> {
1250 info!("Requested all value maps");
1251 let vals = self.metadata_get_tag(TagType::Enum).await?;
1252 let res = vals
1253 .into_iter()
1254 .map(|(key, stringy)| Metadata::builder(key).value_map_string(stringy).build())
1255 .collect();
1256 Ok(res)
1257 }
1258 #[instrument(level = "info")]
1259 pub async fn get_metadata(&self, key: &str) -> Result<Option<Metadata>, Error> {
1260 let pairs = raw::get_metadata(&self.pool, key).await?;
1261 if pairs.is_empty() {
1264 return Ok(None);
1265 }
1266 let mut builder = Metadata::builder(key);
1267 for (tag, value) in pairs {
1268 builder = builder.pair(&tag, value);
1269 }
1270 let res = builder.build();
1271 Ok(Some(res))
1272 }
1273 #[instrument(level = "info")]
1275 async fn get_all_metadata_inner(&self, keytype: KeyType) -> Result<Vec<Metadata>, Error> {
1276 let mut map = HashMap::new();
1277 let mut offset: u32 = 0;
1278 info!("Requested all metadata from DB");
1279 loop {
1280 let pairs = match keytype {
1281 KeyType::Modio => raw::get_all_metadata_internal(&self.pool, offset).await?,
1282 KeyType::Customer => raw::get_all_metadata_customer(&self.pool, offset).await?,
1283 };
1284 if pairs.is_empty() {
1285 break;
1287 }
1288 offset += pairs.len() as u32;
1289 for val in pairs {
1290 let mut builder = map
1292 .remove(&val.key)
1293 .unwrap_or_else(|| Metadata::builder(&val.key));
1295 builder = builder.pair(&val.tag, val.value);
1296 map.insert(val.key, builder);
1297 }
1298 }
1299 let res: Vec<Metadata> = map.drain().map(|(_, builder)| builder.build()).collect();
1300 Ok(res)
1301 }
1302
1303 #[instrument(level = "debug")]
1304 pub async fn get_all_metadata(&self) -> Result<Vec<Metadata>, Error> {
1305 let res = self.get_all_metadata_inner(KeyType::Customer).await?;
1306 Ok(res)
1307 }
1308
1309 #[instrument(level = "debug")]
1310 pub async fn get_all_internal_metadata(&self) -> Result<Vec<Metadata>, Error> {
1311 let res = self.get_all_metadata_inner(KeyType::Modio).await?;
1312 Ok(res)
1313 }
1314 #[instrument(level = "info")]
1319 async fn metadata_replace_tag(
1320 &self,
1321 key: &str,
1322 tag: TagType,
1323 value: &str,
1324 ) -> Result<bool, Error> {
1325 if self.metadata_tag_equals(key, tag, value).await {
1327 return Ok(false);
1328 }
1329 self.add_sensor(key).await?;
1331 raw::metadata_replace_tag(&self.pool, key, tag, value).await?;
1332 Ok(true)
1333 }
1334 #[instrument(level = "info")]
1336 async fn metadata_set_tag(
1337 &self,
1338 key: &str,
1339 tag: TagType,
1340 value: &str,
1341 ) -> Result<(), Error> {
1342 self.add_sensor(key).await?;
1343 raw::metadata_set_tag(&self.pool, key, tag, value).await?;
1344 Ok(())
1345 }
1346 #[instrument(level = "info")]
1347 async fn metadata_get_tag(&self, tag: TagType) -> Result<Vec<(String, String)>, Error> {
1348 let res = raw::metadata_get_tag(&self.pool, tag).await?;
1349 Ok(res)
1350 }
1351 #[instrument(level = "info")]
1353 async fn metadata_get_single_tag(&self, key: &str, tag: TagType) -> Result<String, Error> {
1354 let res = raw::metadata_get_single_tag(&self.pool, key, tag).await?;
1355 Ok(res)
1356 }
1357
1358 async fn metadata_tag_equals(&self, key: &str, tag: TagType, value: &str) -> bool {
1361 self.metadata_get_single_tag(key, tag)
1362 .await
1363 .is_ok_and(|old_val| old_val == value)
1364 }
1365
1366 pub async fn metadata_set_name(&self, key: &str, name: &str) -> Result<bool, Error> {
1369 self.metadata_replace_tag(key, TagType::Name, name).await
1370 }
1371
1372 pub async fn metadata_set_unit(&self, key: &str, unit: &str) -> Result<bool, Error> {
1376 if let Some(meta) = self.get_metadata(key).await? {
1378 if let Some(u) = meta.u {
1379 if u == unit {
1380 return Ok(false);
1382 }
1383 }
1384 }
1385 self.metadata_set_tag(key, TagType::Unit, unit).await?;
1386 Ok(true)
1387 }
1388
1389 #[instrument]
1392 pub async fn metadata_set_description(
1393 &self,
1394 key: &str,
1395 description: &str,
1396 ) -> Result<bool, Error> {
1397 self.metadata_replace_tag(key, TagType::Description, description)
1398 .await
1399 }
1400
1401 #[instrument]
1404 pub async fn metadata_set_mode(&self, key: &str, mode: &SensorMode) -> Result<bool, Error> {
1405 self.metadata_replace_tag(key, TagType::Mode, mode.as_str())
1406 .await
1407 }
1408
1409 #[instrument]
1412 pub async fn metadata_set_enum(
1413 &self,
1414 key: &str,
1415 value_map: &ValueMap,
1416 ) -> Result<bool, Error> {
1417 let into = {
1418 let sorted_map: BTreeMap<_, _> = value_map.iter().collect();
1422 serde_json::to_string(&sorted_map)?
1423 };
1424 self.metadata_replace_tag(key, TagType::Enum, &into).await
1425 }
1426
1427 #[instrument]
1433 pub async fn metadata_set_row(
1434 &self,
1435 key: &str,
1436 row_types: Vec<DataType>,
1437 ) -> Result<bool, Error> {
1438 let rowdata = serde_json::to_string(&row_types)?;
1439 if let Ok(old_row) = self.metadata_get_single_tag(key, TagType::Row).await {
1443 if old_row == rowdata {
1444 return Ok(false);
1445 }
1446 }
1447 self.metadata_set_tag(key, TagType::Row, &rowdata).await?;
1448 Ok(true)
1449 }
1450
1451 #[instrument]
1453 pub async fn metadata_get_row(&self, key: &str) -> Result<Vec<DataType>, Error> {
1454 let value = self.metadata_get_single_tag(key, TagType::Row).await?;
1455 let res = DataType::vec_from_str(&value)?;
1456 Ok(res)
1457 }
1458 }
1459
1460 #[cfg(test)]
1461 mod tests {
1462 use super::*;
1463 use std::error::Error as StdError;
1464 use test_log::test;
1465 use timeout_macro::timeouttest;
1466 type TestResult = Result<(), Box<dyn StdError>>;
1467
1468 #[test(timeouttest)]
1469 async fn get_empty_metadata() -> TestResult {
1470 let ds = Datastore::temporary().await;
1471 ds.insert("modio.test.key", "one", 1_620_850_000.0, false)
1472 .await?;
1473 ds.insert("public.test.key", "two", 1_620_850_000.0, false)
1474 .await?;
1475
1476 let res = ds.metadata_get_names().await?;
1477 assert!(res.is_empty(), "should be empty");
1478
1479 let res = ds.get_metadata("modio.test.key").await?;
1480 assert!(res.is_none(), "should not exist");
1481 Ok(())
1482 }
1483
1484 #[test(timeouttest)]
1485 async fn get_metadata_name() -> TestResult {
1486 let ds = Datastore::temporary().await;
1487
1488 let res = ds
1490 .metadata_set_name("modio.test.key", "Modio Test Key")
1491 .await?;
1492 assert!(res);
1493
1494 let res = ds
1496 .metadata_set_name("modio.test.key", "Modio Test Key")
1497 .await?;
1498 assert!(!res);
1499
1500 let res = ds.metadata_get_names().await?;
1501 assert_eq!(res.len(), 1, "should have one key");
1502 assert_eq!(res[0].name, Some("Modio Test Key".into()), "Should match");
1503 assert_eq!(res[0].n, "modio.test.key", "Should be our key");
1504
1505 ds.metadata_set_name("modio.test.key", "Modio Test Key Two")
1506 .await?;
1507 let res = ds.metadata_get_names().await?;
1508 assert_eq!(res.len(), 1, "should have one key");
1509 assert_eq!(
1510 res[0].name,
1511 Some("Modio Test Key Two".into()),
1512 "Should match"
1513 );
1514 Ok(())
1515 }
1516 #[test(timeouttest)]
1517 async fn get_metadata_description() -> TestResult {
1518 let ds = Datastore::temporary().await;
1519
1520 ds.metadata_set_description("modio.test.key", "Modio Test description")
1521 .await?;
1522 let res = ds.metadata_get_descriptions().await?;
1523 assert_eq!(res.len(), 1, "should have one key");
1524 assert_eq!(
1525 res[0].description,
1526 Some("Modio Test description".into()),
1527 "Should match"
1528 );
1529 assert_eq!(res[0].n, "modio.test.key", "Should be our key");
1530
1531 ds.metadata_set_description(
1532 "modio.test.key",
1533 "The second update is to change the description",
1534 )
1535 .await?;
1536 let res = ds.metadata_get_descriptions().await?;
1537 assert_eq!(res.len(), 1, "should have one key");
1538 Ok(())
1539 }
1540
1541 #[test(timeouttest)]
1542 async fn get_metadata_unit() -> TestResult {
1543 let ds = Datastore::temporary().await;
1544
1545 ds.metadata_set_unit("modio.test.key", "Cel").await?;
1546
1547 let res = ds.metadata_get_units().await?;
1548 assert_eq!(res.len(), 1, "should be one item");
1549 assert_eq!(res[0].u, Some("Cel".into()), "Should be Celsius");
1550 assert_eq!(res[0].n, "modio.test.key", "Should be our key");
1551
1552 let status = ds.metadata_set_unit("modio.test.key", "m").await;
1553 assert!(status.is_err(), "Should not be able to replace unit");
1554 let res = ds.metadata_get_units().await?;
1555 assert_eq!(res[0].u, Some("Cel".into()), "Should still be Celsius");
1556 Ok(())
1557 }
1558
1559 #[test(timeouttest)]
1560 async fn set_unit_unique() -> TestResult {
1561 let ds = Datastore::temporary().await;
1562 ds.metadata_set_unit("modio.test.key", "Cel").await?;
1563 ds.metadata_set_unit("modio.test.key", "Cel").await?;
1565
1566 let err = ds
1567 .metadata_set_unit("modio.test.key", "m")
1568 .await
1569 .expect_err("Should get unique constraint failed");
1570 assert_eq!(err.to_string(), "Unique constraint failed");
1571 Ok(())
1572 }
1573
1574 #[test(timeouttest)]
1575 async fn get_metadata_enum() -> TestResult {
1576 let ds = Datastore::temporary().await;
1577
1578 let value_map = ValueMap::from([
1579 (0, "error".to_string()),
1580 (1, "enabled".to_string()),
1581 (2, "disabled".to_string()),
1582 ]);
1583 ds.metadata_set_enum("modio.test.key", &value_map).await?;
1584 let mut res = ds.metadata_get_enum().await?;
1585 assert_eq!(res.len(), 1, "Should have one value");
1586 assert_eq!(res[0].n, "modio.test.key");
1587 assert!(res[0].value_map.is_some());
1588 let entry = res.pop().unwrap();
1589 let vmap = entry.value_map.unwrap();
1590 assert_eq!(vmap.get(&0).unwrap(), &"error".to_string());
1591 assert_eq!(vmap.get(&1).unwrap(), &"enabled".to_string());
1592 assert_eq!(vmap.get(&2).unwrap(), &"disabled".to_string());
1593
1594 let humm = ds.get_metadata("modio.test.key").await?;
1595 assert!(humm.is_some());
1596 let humm = humm.unwrap();
1597 assert_eq!(humm.n, "modio.test.key");
1598 assert!(humm.value_map.is_some());
1599 let vmap = humm.value_map.unwrap();
1601 assert_eq!(vmap.get(&0).unwrap(), &"error".to_string());
1602 assert_eq!(vmap.get(&1).unwrap(), &"enabled".to_string());
1603 assert_eq!(vmap.get(&2).unwrap(), &"disabled".to_string());
1604
1605 Ok(())
1606 }
1607
1608 #[test(timeouttest)]
1609 async fn test_get_all_internal_metadata() -> TestResult {
1610 let ds = Datastore::temporary().await;
1611 let value_map = ValueMap::from([
1612 (0, "error".to_string()),
1613 (1, "enabled".to_string()),
1614 (2, "disabled".to_string()),
1615 ]);
1616 ds.metadata_set_name("customer.data", "Customer name")
1617 .await?;
1618 ds.metadata_set_name("modio.test.dupe", "Modio Test Another Key")
1619 .await?;
1620 ds.metadata_set_enum("modio.test.key", &value_map).await?;
1621 ds.metadata_set_name("modio.test.key", "Modio Test Key")
1622 .await?;
1623 ds.metadata_set_unit("modio.test.key", "Cel").await?;
1624 ds.metadata_set_description("modio.test.key", "Our Description")
1625 .await?;
1626 let res = ds.get_all_internal_metadata().await?;
1627 assert_eq!(res.len(), 2, "should have one key");
1628
1629 let mut filt: Vec<Metadata> = res
1630 .into_iter()
1631 .filter(|x| x.n == "modio.test.key")
1632 .collect();
1633 let obj = filt.pop().unwrap();
1634 assert_eq!(obj.n, "modio.test.key");
1635 assert_eq!(obj.name, Some("Modio Test Key".into()));
1636 assert_eq!(obj.description, Some("Our Description".into()));
1637 assert_eq!(obj.u, Some("Cel".into()));
1638 assert_eq!(obj.value_map.unwrap().get(&0), Some(&"error".to_string()));
1639
1640 let mut cust = ds.get_all_metadata().await?;
1641 assert_eq!(cust.len(), 1, "should have one key");
1642 let c_item = cust.pop().unwrap();
1643 assert_eq!(c_item.n, "customer.data", "Should be a customer key");
1644 assert_eq!(
1645 c_item.name,
1646 Some("Customer name".into()),
1647 "Should have a customer name"
1648 );
1649 Ok(())
1650 }
1651 }
1652}