mocklogger/
logger1.rs

1// Author: D.S. Ljungmark <spider@skuggor.se>, Modio AB
2// SPDX-License-Identifier: AGPL-3.0-or-later
3use crate::logger::{keys, LogErr};
4use crate::timefail;
5use crate::types::Metadata;
6use crate::types::Unit;
7use crate::types::ValueMap;
8use crate::values::{
9    check_data_row, valid_metric, value_to_string, zbus_row_to_json_string, TimestampRow,
10};
11use fsipc::logger1::SensorMode;
12use modio_logger_db::{Datastore, Metric};
13use std::collections::HashMap;
14use tracing::{debug, error, info, instrument, warn};
15use zbus::zvariant::Value as zValue;
16use zbus::{interface, object_server::SignalEmitter};
17mod builder;
18pub use builder::Builder;
19
20pub struct Logger1 {
21    ds: Datastore,
22    timefail: timefail::Timefail,
23}
24
25impl Logger1 {
26    pub async fn new(timefail: timefail::Timefail, ds: Datastore) -> Result<Self, LogErr> {
27        if timefail.is_timefail() {
28            info!("We are currently in TIMEFAIL mode.");
29            let count = ds.transaction_fail_pending().await?;
30            if count > 0 {
31                info!("Failed {count} pending change requests due to TIMEFAIL");
32            }
33        }
34        Ok(Self { ds, timefail })
35    }
36    #[must_use]
37    pub const fn builder() -> Builder {
38        Builder::new()
39    }
40
41    #[cfg(test)]
42    pub(crate) async fn persist_data(&self) {
43        self.ds
44            .persist_data()
45            .await
46            .expect("Failed to persist data");
47    }
48}
49
50// This is a simple wrapper around the modio_logger_db `DataType`, but since that version does not
51// know about "dbus" or how to implement zbus::variant::Type,  we wrap it in a struct here, in a
52// pattern called "Newtype" in rust. This lets us add the missing functionality, but not
53// re-implement it.
54#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize, zbus::zvariant::Type)]
55#[zvariant(signature = "s")] // Here we say that it should be a "string" on d-bus and not a struct
56                             // or enum.
57struct DataTypeWrp(modio_logger_db::DataType);
58
59#[derive(Debug, serde::Serialize, serde::Deserialize, zbus::zvariant::Type)]
60struct RowItem {
61    name: String,
62    datatype: DataTypeWrp,
63}
64
65#[must_use]
66fn maybe_metric(key: impl Into<String>, val: &zValue<'_>, time: f64) -> Option<Metric> {
67    value_to_string(val).map(|value| Metric {
68        name: key.into(),
69        value,
70        time,
71    })
72}
73
74fn metadata_change_metric() -> Metric {
75    let key = "modio.logger.metadata.change";
76    assert!(
77        keys::valid_key(key).is_ok(),
78        "Hardcoded key name should be valid"
79    );
80    let time = modio_logger_db::fxtime_ms();
81    Metric {
82        name: key.into(),
83        value: "1".into(),
84        time,
85    }
86}
87
88impl Logger1 {
89    // When metadata change is succesful, store a key & timestamp and fire off a signal, this
90    // allows us to check the "last changed" timestamp in the submitter and decide if we need to
91    // read all metadata and see what changes or not.
92    async fn metadata_last_changed(
93        &self,
94        ctxt: &SignalEmitter<'_>,
95        key: &str,
96    ) -> Result<(), LogErr> {
97        let timefail = self.timefail.is_timefail();
98        let metric = metadata_change_metric();
99        let batch = vec![metric];
100        self.ds.insert_bulk(batch, timefail).await?;
101        Self::metadata_updated(ctxt, key).await?;
102        Ok(())
103    }
104}
105
106#[allow(clippy::use_self)]
107#[interface(name = "se.modio.logger.Logger1")]
108impl Logger1 {
109    /// Signal sent when new metadata is set
110    #[zbus(signal)]
111    async fn metadata_updated(ctxt: &SignalEmitter<'_>, key: &str) -> zbus::Result<()>;
112
113    // Should emit "MetadataUpdated"
114    #[instrument(skip(self, ctxt))]
115    async fn set_metadata_name(
116        &mut self,
117        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
118        key: &str,
119        name: String,
120    ) -> Result<(), LogErr> {
121        if self
122            .ds
123            .metadata_set_name(key, &name)
124            .await
125            .inspect_err(|e| error!("Failed to set metadata name  err={:?}", e))?
126        {
127            info!("Updated name of key={} to name='{}'", &key, &name);
128            self.metadata_last_changed(&ctxt, key).await?;
129        }
130        Ok(())
131    }
132
133    #[instrument(skip(self, ctxt))]
134    async fn set_metadata_description(
135        &mut self,
136        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
137        key: &str,
138        description: String,
139    ) -> Result<(), LogErr> {
140        if self
141            .ds
142            .metadata_set_description(key, &description)
143            .await
144            .inspect_err(|e| error!("Failed to set metadata description err={:?}", e))?
145        {
146            info!(
147                "Updated description of key={} to description='{}'",
148                &key, &description
149            );
150            self.metadata_last_changed(&ctxt, key).await?;
151        }
152        Ok(())
153    }
154
155    // Set Metadata for row batches
156    //
157    // row_items is an array of (Name, Datatype) where both are (currently) strings.
158    // The items in a batch array should not change (types or counts) while the names can be.
159    // The names will be written to the Description field, and the data-types will be stored
160    // elsewhere.
161    //
162    // Currently, it's all dumped as description to make hacking easier.
163    //
164    // This should only be used on keys that expect to store rows of correlated, time-stamped data.
165    #[instrument(skip(self, ctxt))]
166    async fn set_metadata_row(
167        &mut self,
168        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
169        key: &str,
170        row_items: Vec<RowItem>,
171    ) -> Result<(), LogErr> {
172        // Create an Vec of the "datatypes" in the order they come in.
173        let row_metadata: Vec<modio_logger_db::DataType> =
174            row_items.iter().map(|x| x.datatype.0).collect();
175
176        // And make a pair of row items into a description string
177        let desc = serde_json::to_string(&row_items)?;
178
179        // If we fail to set the row data, fex. the row already has a different row-definition, we
180        // fail here.
181        self.ds
182            .metadata_set_row(key, row_metadata)
183            .await
184            .inspect_err(|e| error!("Failed to set metadata row err={:?}", e))?;
185        info!("Updated row metadata of key='{}'", key);
186
187        if self
188            .ds
189            .metadata_set_description(key, &desc)
190            .await
191            .inspect_err(|e| error!("Failed to set metadata description err={:?}", e))?
192        {
193            info!(
194                "Updated description of key={} to auto generated description='{}'",
195                &key, &desc
196            );
197            self.metadata_last_changed(&ctxt, key).await?;
198        }
199        Ok(())
200    }
201
202    #[instrument(skip(self, ctxt))]
203    async fn set_metadata_mode(
204        &mut self,
205        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
206        key: &str,
207        mode: SensorMode,
208    ) -> Result<(), LogErr> {
209        let new_mode: modio_logger_db::SensorMode = mode.into();
210        if self
211            .ds
212            .metadata_set_mode(key, &new_mode)
213            .await
214            .inspect_err(|e| error!("Failed to set metadata mode err={:?}", e))?
215        {
216            info!("Updated mode of key={} to mode='{:?}'", &key, &new_mode);
217            self.metadata_last_changed(&ctxt, key).await?;
218        }
219        Ok(())
220    }
221
222    #[instrument(skip(self, ctxt))]
223    async fn set_metadata_value_map(
224        &mut self,
225        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
226        key: &str,
227        value_map: ValueMap,
228    ) -> Result<(), LogErr> {
229        if self
230            .ds
231            .metadata_set_enum(key, &value_map)
232            .await
233            .inspect_err(|e| error!("Failed to set metadata enum err={:?}", e))?
234        {
235            info!(
236                "Updated metadata of key={} to enum='{:?}'",
237                &key, &value_map
238            );
239            self.metadata_last_changed(&ctxt, key).await?;
240        }
241        Ok(())
242    }
243
244    #[instrument(skip(self, ctxt))]
245    async fn set_metadata_unit(
246        &mut self,
247        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
248        key: &str,
249        unit: String,
250    ) -> Result<(), LogErr> {
251        use crate::types::UnitError;
252        use std::convert::TryFrom;
253
254        // First validate by turning it into our Unit, then return it back to a string format.
255        let unit = Unit::try_from(unit)?.into_inner();
256
257        let res = self.ds.metadata_set_unit(key, &unit).await;
258
259        // Error handling here is more involved than normally.
260        // We want to return a UnitError "Unique" when we get a Unique constraint failure from the
261        // database, as we do not permit overwriting unit errors.
262        //
263        // This in turn means that we need to manually match the error type, rather than let
264        // automatic conversion do it for us.
265        match res {
266            // Actually wrote the update to db
267            Ok(true) => {
268                info!("Updated unit of key={} to unit={}", &key, &unit);
269                self.metadata_last_changed(&ctxt, key).await?;
270                Ok(())
271            }
272            // Did not change the db
273            Ok(false) => Ok(()),
274            Err(modio_logger_db::Error::Unique { source }) => {
275                debug!("Throwing away database error: {:?}", source);
276                Err(UnitError::Unique.into())
277            }
278            Err(e) => {
279                error!("Failed to set metadata unit. err={:?}", e);
280                Err(e.into())
281            }
282        }
283    }
284
285    #[instrument(skip(self))]
286    async fn get_metadata(&mut self, key: &str) -> Result<Metadata, LogErr> {
287        // Get the metadata for the key.  Error if not found?
288        //
289        info!("Fetching metadata for key={}", &key);
290        match self.ds.get_metadata(key).await {
291            Ok(None) => Err(LogErr::NoMetadata),
292            Ok(Some(data)) => Ok(Metadata::from(data)),
293            // We don't log key not found errors as they are the callers problem.
294            Err(e @ modio_logger_db::Error::NotFound { .. }) => {
295                debug!(
296                    "Requested metadata for non-existing key. key={}, err={:?}",
297                    key, e
298                );
299                Err(e.into())
300            }
301            Err(e) => {
302                error!("Failed to fetch metadata from db. err={:?}", e);
303                Err(e.into())
304            }
305        }
306    }
307
308    /// Signal sent when values are stored
309    #[zbus(signal)]
310    async fn store_signal(
311        ctxt: &SignalEmitter<'_>,
312        batch: Vec<(String, zValue<'_>, i64)>,
313    ) -> zbus::Result<()>;
314
315    #[instrument(skip_all)]
316    async fn store_batch(
317        &mut self,
318        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
319        mut batch: HashMap<String, zValue<'_>>,
320    ) -> Result<(), LogErr> {
321        let time = modio_logger_db::fxtime_ms();
322        let timefail = self.timefail.is_timefail();
323
324        // Check all the values and keys, print a debug-friendly log message on the terminal, and
325        // then return an early error.
326        for (key, value) in &batch {
327            if let Err(e) = keys::valid_key(key) {
328                warn!("Invalid key for key='{key}' value='{value:?}' err='{e}'");
329                return Err(e.into());
330            }
331            if let Err(e) = valid_metric(value) {
332                warn!("Invalid data for key='{key}' value='{value:?}' err='{e}'");
333                return Err(e.into());
334            }
335        }
336
337        // Clone the values and insert into a vec of Metrics to store
338        // This converts any floats/ints/bools/strings to string, which is an implementation
339        // detail that may change in the future.
340        let db_batch: Vec<Metric> = batch
341            .iter()
342            // Apply "maybe_metric" on the (key, value, timestamp), which returns Option<Metric>.
343            // And filter out anything that is None.
344            .filter_map(|(key, value)| maybe_metric(key, value, time))
345            .collect();
346
347        // If this fails, bail early.
348        self.ds
349            .insert_bulk(db_batch, timefail)
350            .await
351            .inspect_err(|e| error!("Failed to insert batch of data to ds. err={:?}", e))?;
352        #[allow(clippy::cast_possible_truncation)]
353        let trunc_time = time as i64;
354
355        let payload: Vec<_> = batch
356            .drain()
357            // Keep all keys that do not start with "modio."
358            .filter(|(key, _)| !key.starts_with("modio."))
359            // And turn them into a tuple
360            .map(|(key, value)| (key, value, trunc_time))
361            .collect();
362
363        // Do not send signals if the payload is empty
364        if !payload.is_empty() {
365            Self::store_signal(&ctxt, payload).await?;
366        }
367
368        Ok(())
369    }
370
371    /// Store a single metric.
372    ///
373    /// In reality, it will just call the batch interface and is thus less
374    /// efficient than just using a batch.
375    ///
376    /// However, it helps porting from the old interface.
377    #[instrument(skip_all)]
378    async fn store(
379        &mut self,
380        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
381        key: String,
382        value: zValue<'_>,
383    ) -> Result<(), LogErr> {
384        let batch = HashMap::from([(key, value)]);
385        self.store_batch(ctxt, batch).await?;
386        Ok(())
387    }
388
389    /// This store a series of _rows_ of data for a single key.
390    ///
391    /// The interface is meant for things that store coherent rows of data in the same form:
392    ///    timestamp,  [key1, key2, key3, key4, key5]
393    ///    timestamp,  [key1, key2, key3, key4, key5]
394    ///    timestamp,  [key1, key2, key3, key4, key5]
395    ///
396    /// The rows are expected to not change in size, but can come in high resolution compared to
397    /// other data, ie. millisecond resolution of timestamps.
398    ///
399    /// A key is expected to tell the system _first_ that there will be a row and it's format, and
400    /// the logger will fail data in case this does not match up with expectations.
401    ///
402    /// We also expect the data to always be the same size and shape.
403    ///
404    #[instrument(skip_all)]
405    async fn store_rows(&mut self, key: &str, rows: Vec<TimestampRow<'_>>) -> Result<(), LogErr> {
406        keys::valid_key(key)?;
407
408        // Check if the key has a registered metadata row, if not, return an error.
409        let row_datatypes = self
410            .ds
411            .metadata_get_row(key)
412            .await
413            // RowNotFound errors are common here and do not deserve to be logged as error
414            .inspect_err(|e| debug!("Failed to get row metadata for key. err={:?}", e))?;
415
416        // Pre-allocate enough data to fit the row we have
417        let mut db_batch: Vec<Metric> = Vec::with_capacity(rows.len());
418
419        for (time, row) in rows {
420            // Do data validation for the row.
421            check_data_row(&row_datatypes, &row)?;
422
423            let value = zbus_row_to_json_string(row)?;
424            let m = Metric {
425                name: key.to_string(),
426                value,
427                time,
428            };
429            db_batch.push(m);
430        }
431        self.ds
432            .insert_bulk(db_batch, false)
433            .await
434            .inspect_err(|e| error!("Failed to insert bulk data for key. err={:?}", e))?;
435        // Row data does not cause signals to happen as DBus is not the proper place to get bulk
436        // data every few milliseconds.
437        Ok(())
438    }
439
440    /// Store a series of metrics with timestamp
441    ///
442    /// Each metric may cause a signal to be emitted.
443    /// Do not use for bulk data.
444    #[instrument(skip_all)]
445    async fn store_ts(
446        &mut self,
447        #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
448        batch: Vec<(String, zValue<'_>, f64)>,
449    ) -> Result<(), LogErr> {
450        // Check all the values and keys, print a debug-friendly log message on the terminal, and
451        // then return an early error.
452        for (key, value, _) in &batch {
453            if let Err(e) = keys::valid_key(key) {
454                warn!("Invalid key for key='{key}' value='{value:?}' err='{e}'");
455                return Err(e.into());
456            }
457            if let Err(e) = valid_metric(value) {
458                warn!("Invalid data for key='{key}' value='{value:?}' err='{e}'");
459                return Err(e.into());
460            }
461        }
462
463        // Clone the values and insert into a vec of Metrics to store
464        // This converts any floats/ints/bools/strings to string, which is an implementation
465        // detail that may change in the future.
466        let db_batch: Vec<Metric> = batch
467            .iter()
468            // Apply "maybe_metric" on the (key, value, time) which returns Option<Metric>.
469            // Then removing all None, leaving a vector of Metric values
470            .filter_map(|(key, value, time)| maybe_metric(key, value, *time))
471            .collect();
472
473        // If this fails, bail early.
474        self.ds
475            .insert_bulk(db_batch, false)
476            .await
477            .inspect_err(|e| error!("Failed to insert data with ts. err={:?}", e))?;
478
479        #[allow(clippy::cast_possible_truncation)]
480        let payload: Vec<_> = batch
481            .into_iter()
482            // Keep all keys that do not start with "modio."
483            .filter(|(key, _, _)| !key.starts_with("modio."))
484            // And turn them into a tuple
485            .map(|(key, value, time)| (key, value, time as i64))
486            .collect();
487
488        // Do not send signals if the payload is empty
489        if !payload.is_empty() {
490            Self::store_signal(&ctxt, payload).await?;
491        }
492        Ok(())
493    }
494}
495
496#[cfg(test)]
497mod test {
498    use super::Logger1;
499    use crate::conn::make_connection;
500    use crate::testing::TestServer;
501    use crate::types::Unit;
502    use async_std::task::sleep;
503    use fsipc::logger1::SensorMode;
504    use futures_util::{FutureExt, StreamExt};
505    use modio_logger_db::Datastore;
506    use modio_logger_db::SqlitePoolBuilder;
507    use std::collections::HashMap;
508    use std::error::Error;
509    use std::time::Duration;
510    use tempfile;
511    use test_log::test;
512    use timeout_macro::timeouttest;
513    use zbus::zvariant;
514
515    #[test(timeouttest)]
516    async fn set_metadata_works() -> Result<(), Box<dyn Error>> {
517        const PATH: &str = "/se/modio/logger/metadata";
518        let dbfile = tempfile::Builder::new()
519            .prefix("set_metadata_works")
520            .suffix(".sqlite")
521            .tempfile()
522            .expect("Error on tempfile");
523
524        // Open the first pool
525        let pool = SqlitePoolBuilder::new()
526            .db_path(dbfile.path())
527            .build()
528            .await
529            .expect("Error opening database");
530
531        let ds = Datastore::new(pool).await?;
532        {
533            // As signals require the connection to be active, we set up the connection, object
534            // server and logger here.
535            let connection = make_connection(true).await?;
536            let logger = Logger1::builder()
537                .development(true)
538                .datastore(ds)
539                .build()
540                .await?;
541
542            // For signals to work, the logger needs to be registered in the object server of a
543            // connection.
544            connection.object_server().at(PATH, logger).await?;
545
546            // Take a reference for the interface of our logger & path, so we can get a signal
547            // context to use.
548            let iface_ref = connection
549                .object_server()
550                .interface::<_, Logger1>(PATH)
551                .await?;
552
553            // Get the logger object back from the server again. Or well, a shimmed one.
554            let mut logger = iface_ref.get_mut().await;
555            // The signal context is held by the async function until the signal is done, after
556            // which it is consumed.
557            // So to test, we need a new context for each call.
558            let ctx = iface_ref.signal_emitter();
559            logger
560                .set_metadata_name(
561                    ctx.to_owned(),
562                    "modio.key.key",
563                    "Some internal name".to_string(),
564                )
565                .await?;
566            logger
567                .set_metadata_description(
568                    ctx.to_owned(),
569                    "modio.key.key",
570                    "Some internal description".to_string(),
571                )
572                .await?;
573
574            logger
575                .set_metadata_name(
576                    ctx.to_owned(),
577                    "customer.key.key.key",
578                    "Some customer name".to_string(),
579                )
580                .await?;
581            logger
582                .set_metadata_mode(
583                    ctx.to_owned(),
584                    "customer.key.key.key",
585                    SensorMode::ReadWrite,
586                )
587                .await?;
588            logger
589                .set_metadata_description(
590                    ctx.to_owned(),
591                    "customer.key.key.key",
592                    "Some customer description".to_string(),
593                )
594                .await?;
595            logger.persist_data().await;
596            drop(logger);
597        }
598        // Open the pool again
599        let pool = SqlitePoolBuilder::new()
600            .db_path(dbfile.path())
601            .build()
602            .await
603            .expect("Error opening database");
604        let ds = Datastore::new(pool).await?;
605        let res = ds.metadata_get_names().await?;
606        // Both customer and internal should be available when querying the datastore
607        assert!(res.len() == 2);
608        eprintln!("{res:?}");
609        Ok(())
610    }
611
612    #[test(timeouttest)]
613    async fn set_unit_override_fails() -> Result<(), Box<dyn Error>> {
614        const PATH: &str = "/se/modio/logger/testcase";
615
616        let ds = Datastore::temporary().await;
617        {
618            // As signals require the connection to be active, we set up the connection, object
619            // server and logger here.
620            let connection = make_connection(true).await?;
621            let logger = Logger1::builder()
622                .development(true)
623                .datastore(ds)
624                .build()
625                .await?;
626            // For signals to work, the logger needs to be registered in the object server of a
627            // connection.
628            connection.object_server().at(PATH, logger).await?;
629
630            // Take a reference for the interface of our logger & path, so we can get a signal
631            // context to use.
632            let iface_ref = connection
633                .object_server()
634                .interface::<_, Logger1>(PATH)
635                .await?;
636            // Get the logger object back from the server again. Or well, a shimmed one.
637            let mut logger = iface_ref.get_mut().await;
638            // The signal context is held by the async function until the signal is done, after
639            // which it is consumed.
640            // So to test, we need a new context for each call.
641            let ctx = iface_ref.signal_emitter();
642            logger
643                .set_metadata_name(
644                    ctx.to_owned(),
645                    "customer.key.key",
646                    "Some customer name".to_string(),
647                )
648                .await?;
649
650            logger
651                .set_metadata_unit(ctx.to_owned(), "customer.key.key.key", Unit::string("Cel"))
652                .await?;
653
654            // Same unit once more. should work
655            logger
656                .set_metadata_unit(ctx.to_owned(), "customer.key.key.key", Unit::string("Cel"))
657                .await?;
658
659            // New unit. should fail
660            let res = logger
661                .set_metadata_unit(ctx.to_owned(), "customer.key.key.key", Unit::string("m"))
662                .await;
663            drop(logger);
664            let e = res.expect_err("should not be able to set it twice");
665            assert!(e.to_string().contains("May not replace unit"));
666        }
667        Ok(())
668    }
669
670    #[test(timeouttest)]
671    async fn set_read_only() -> Result<(), Box<dyn Error>> {
672        const PATH: &str = "/se/modio/logger/testcase/set_read_only";
673        let ds = Datastore::temporary().await;
674        {
675            // As signals require the connection to be active, we set up the connection, object
676            // server and logger here.
677            let connection = make_connection(true).await?;
678            let logger = Logger1::builder()
679                .development(true)
680                .datastore(ds)
681                .build()
682                .await?;
683
684            // For signals to work, the logger needs to be registered in the object server of a
685            // connection.
686            connection.object_server().at(PATH, logger).await?;
687
688            // Take a reference for the interface of our logger & path, so we can get a signal
689            // context to use.
690            let iface_ref = connection
691                .object_server()
692                .interface::<_, Logger1>(PATH)
693                .await?;
694            // Get the logger object back from the server again. Or well, a shimmed one.
695            let mut logger = iface_ref.get_mut().await;
696
697            let ctx = iface_ref.signal_emitter();
698            logger
699                .set_metadata_name(
700                    ctx.to_owned(),
701                    "customer.key.key",
702                    "Some customer name".to_string(),
703                )
704                .await?;
705            let res = logger.get_metadata("customer.key.key").await?;
706            assert!(res.mode.is_none());
707            logger
708                .set_metadata_mode(ctx.to_owned(), "customer.key.key", SensorMode::ReadOnly)
709                .await?;
710            let res = logger.get_metadata("customer.key.key").await?;
711            drop(logger);
712            assert_eq!(res.mode, Some(SensorMode::ReadOnly));
713        }
714        Ok(())
715    }
716
717    #[test(timeouttest)]
718    async fn no_modio_signals_in_batch() -> Result<(), Box<dyn Error>> {
719        let server = TestServer::new(line!()).await?;
720        let logger1 = server.logger1().await?;
721        let mut stream = logger1.receive_store_signal().await?;
722
723        // Send a transaction
724        let mut batch = HashMap::<String, zvariant::Value<'_>>::new();
725        batch.insert("test.test.string".into(), String::from("string").into());
726        batch.insert("test.test.int".into(), (42_u64).into());
727        batch.insert("test.test.float".into(), (0.3_f64).into());
728        batch.insert("modio.test.bool.true".into(), (true).into());
729        batch.insert("modio.test.bool.false".into(), (false).into());
730        logger1.store_batch(batch).await?;
731
732        // Read signal, it should only be one, with a batch of data.
733        let sig = stream.next().await.unwrap();
734        let payload = sig.args()?;
735        assert!(payload.batch.len() == 3, "Should have three out of 4 keys");
736        // The modio internal keys should not be stored
737        for (key, _, _) in payload.batch {
738            assert!(key.starts_with("test.test"));
739        }
740        // We should not have more signals waiting
741        let last_signal = stream.next().now_or_never();
742        assert!(last_signal.is_none());
743
744        let ipc = server.proxy().await?;
745        {
746            let m = ipc.retrieve("test.test.string").await?;
747            assert_eq!(m.key, "test.test.string");
748            assert_eq!(m.value, "string");
749        }
750        {
751            let m = ipc.retrieve("test.test.int").await?;
752            assert_eq!(m.key, "test.test.int");
753            assert_eq!(m.value, "42");
754        }
755
756        {
757            let m = ipc.retrieve("test.test.float").await?;
758            assert_eq!(m.key, "test.test.float");
759            assert_eq!(m.value, "0.3");
760        }
761
762        {
763            let m = ipc.retrieve("modio.test.bool.true").await?;
764            assert_eq!(m.key, "modio.test.bool.true");
765            assert_eq!(m.value, "1");
766        }
767
768        {
769            let m = ipc.retrieve("modio.test.bool.false").await?;
770            assert_eq!(m.key, "modio.test.bool.false");
771            assert_eq!(m.value, "0");
772        }
773
774        Ok(())
775    }
776
777    #[test(timeouttest)]
778    async fn modio_only_no_signals() -> Result<(), Box<dyn Error>> {
779        let server = TestServer::new(line!()).await?;
780        let logger1 = server.logger1().await?;
781        let mut stream = logger1.receive_store_signal().await?;
782
783        // Send a transaction
784        let mut batch = HashMap::<String, zvariant::Value<'_>>::new();
785        batch.insert("modio.test.bool.true".into(), (true).into());
786        batch.insert("modio.test.bool.false".into(), (false).into());
787        logger1.store_batch(batch).await?;
788
789        // We should not have any signals waiting
790        assert!(
791            stream.next().now_or_never().is_none(),
792            "Should have no signals pending"
793        );
794        Ok(())
795    }
796
797    #[test(timeouttest)]
798    async fn store_singles_work_as_well() -> Result<(), Box<dyn Error>> {
799        let server = TestServer::new(line!()).await?;
800        let logger1 = server.logger1().await?;
801        let mut stream = logger1.receive_store_signal().await?;
802        logger1
803            .store("test.test.string".into(), String::from("string").into())
804            .await?;
805        logger1
806            .store("test.test.int".into(), (42_u64).into())
807            .await?;
808        logger1
809            .store("test.test.float".into(), (0.3_f64).into())
810            .await?;
811        logger1
812            .store("modio.test.bool.true".into(), (true).into())
813            .await?;
814        logger1
815            .store("modio.test.bool.false".into(), (false).into())
816            .await?;
817
818        // Read signal, it should only be one, with a batch of data.
819        for _ in 0..3 {
820            let sig = stream.next().await.unwrap();
821            let payload = sig.args()?;
822            // Maybe we should allow some buffering in the store signals too?
823            assert!(payload.batch.len() == 1, "May have 1 after single stores");
824            // The modio internal keys should not be stored
825            for (key, _, _) in payload.batch {
826                assert!(key.starts_with("test.test"));
827            }
828        }
829        // We should not have more signals waiting
830        assert!(
831            stream.next().now_or_never().is_none(),
832            "Should have no signals pending"
833        );
834        Ok(())
835    }
836
837    #[test(async_std::test)]
838    async fn store_misc_with_ts_works() -> Result<(), Box<dyn Error>> {
839        let server = TestServer::new(line!()).await?;
840        let logger1 = server.logger1().await?;
841        let mut stream = logger1.receive_store_signal().await?;
842
843        // Ugly helper to make a tuple of test-data that goes in the interface
844        fn tst<'a>(
845            key: &str,
846            val: impl Into<zvariant::Value<'a>>,
847            ts: f64,
848        ) -> (String, zvariant::Value<'a>, f64) {
849            (key.into(), val.into(), ts)
850        }
851
852        let values = vec![
853            tst("test.test.string", "string", 1708017581.0),
854            tst("test.test.int", 42_u64, 1708017581.1),
855            tst("test.test.float", 0.3_f64, 1708017581.2),
856            tst("modio.test.bool.true", true, 1708017581.3),
857            tst("modio.test.bool.false", false, 1708017581.4),
858        ];
859        logger1.store_ts(values).await?;
860
861        // Read signal, it should only be one, with a batch of data.
862        let sig = stream.next().await.unwrap();
863        let payload = sig.args()?;
864        assert!(
865            payload.batch.len() == 3,
866            "Should have three sets in the payload"
867        );
868        // The modio internal keys should not be stored
869        for (key, _, _) in payload.batch {
870            assert!(key.starts_with("test.test"));
871        }
872        // We should not have more signals waiting
873        assert!(
874            stream.next().now_or_never().is_none(),
875            "Should have no signals pending"
876        );
877        Ok(())
878    }
879
880    #[test(async_std::test)]
881    async fn store_rows_works() -> Result<(), Box<dyn Error>> {
882        use zbus::zvariant::Value;
883
884        let server = TestServer::new(line!()).await?;
885        let logger1 = server.logger1().await?;
886        let mut stream = logger1.receive_store_signal().await?;
887
888        fn cast(v: &[(f64, [f64; 7])]) -> Vec<(f64, Vec<Value<'static>>)> {
889            v.iter()
890                .map(|(ts, vals)| (*ts, vals.iter().map(|x| Value::from(*x)).collect()))
891                .collect()
892        }
893        // Send a transaction
894        let raw_rows = [
895            (1707329936.0, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
896            (1707329936.1, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
897            (1707329936.2, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
898            (1707329936.3, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
899            (1707329936.4, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
900            (1707329936.5, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
901            (1707329936.6, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
902            (1707329936.7, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
903            (1707329936.8, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
904            (1707329936.9, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
905        ];
906        let rows = cast(&raw_rows);
907        let rows2 = cast(&raw_rows);
908
909        let res1 = logger1.store_rows("test.rows.key", rows2).await;
910        res1.expect_err("Should be an error that test.rows.key is not prepared");
911        let row_items = vec![
912            ("nil", "f64"),
913            ("First_name", "f64"),
914            ("Second", "f64"),
915            ("Thirdly", "f64"),
916            ("other", "f64"),
917            ("fifthly", "f64"),
918            ("sixer", "f64"),
919        ];
920        logger1.set_metadata_row("test.rows.key", row_items).await?;
921
922        let res1 = logger1.store_rows("test.rows.key", rows).await;
923        res1.expect("Should be ok because test.rows.key is prepared");
924        // We should not have more signals waiting
925        assert!(
926            stream.next().now_or_never().is_none(),
927            "Should have no signals pending"
928        );
929        Ok(())
930    }
931
932    #[test(async_std::test)]
933    async fn row_metadata_checks() -> Result<(), Box<dyn Error>> {
934        let server = TestServer::new(line!()).await?;
935        let logger1 = server.logger1().await?;
936
937        let mut row_items = vec![
938            ("nil", "f64"),
939            ("First_name", "string"),
940            ("Second", "u64"),
941            ("liar", "bool"),
942        ];
943
944        logger1
945            .set_metadata_row("test.rows.meta", row_items.clone())
946            .await?;
947        // Setting row items twice is expected to work
948        logger1
949            .set_metadata_row("test.rows.meta", row_items.clone())
950            .await?;
951
952        row_items.push(("this fails", "f64"));
953        // Setting row items to a different thing should fail
954        logger1
955            .set_metadata_row("test.rows.meta", row_items)
956            .await
957            .expect_err("Expected a failure but got success");
958        Ok(())
959    }
960    #[test(timeouttest)]
961    async fn timestamp_on_update() -> Result<(), Box<dyn Error>> {
962        let server = TestServer::new(line!()).await?;
963        let logger1 = server.logger1().await?;
964        let ipc = server.proxy().await?;
965        let second = Duration::from_secs(1);
966        ipc.retrieve("modio.logger.metadata.change")
967            .await
968            .expect_err("Should have no data");
969
970        logger1
971            .set_metadata_name("customer.key.key", "Some customer name")
972            .await
973            .expect("Should be able to set name");
974        let res = logger1
975            .get_metadata("customer.key.key")
976            .await
977            .expect("Should have metadata");
978        assert!(res.name.is_some());
979        let metric = ipc
980            .retrieve("modio.logger.metadata.change")
981            .await
982            .expect("Should have data for metadata changed");
983
984        // Need to have a delay as the Measures are truncated to second precision
985        sleep(second).await;
986
987        logger1
988            .set_metadata_name("customer.key.key", "Some customer name")
989            .await
990            .expect("Should be able to set name to the same thing");
991        let metric2 = ipc
992            .retrieve("modio.logger.metadata.change")
993            .await
994            .expect("Should have data for metadata changed");
995        assert_eq!(
996            metric.timestamp, metric2.timestamp,
997            "Timestamp should not change when no metadata updated."
998        );
999        // Need to have a delay as the Measures are truncated to second precision
1000        sleep(second).await;
1001        logger1
1002            .set_metadata_name("customer.key.key", "Some OTHER name")
1003            .await
1004            .expect("Should be able to change the name");
1005        let metric3 = ipc
1006            .retrieve("modio.logger.metadata.change")
1007            .await
1008            .expect("Should have data for metadata changed");
1009        assert!(
1010            metric.timestamp < metric3.timestamp,
1011            "Timestamp should have updated"
1012        );
1013        Ok(())
1014    }
1015}