1use crate::logger::{keys, LogErr};
4use crate::timefail;
5use crate::types::Metadata;
6use crate::types::Unit;
7use crate::types::ValueMap;
8use crate::values::{
9 check_data_row, valid_metric, value_to_string, zbus_row_to_json_string, TimestampRow,
10};
11use fsipc::logger1::SensorMode;
12use modio_logger_db::{Datastore, Metric};
13use std::collections::HashMap;
14use tracing::{debug, error, info, instrument, warn};
15use zbus::zvariant::Value as zValue;
16use zbus::{interface, object_server::SignalEmitter};
17mod builder;
18pub use builder::Builder;
19
20pub struct Logger1 {
21 ds: Datastore,
22 timefail: timefail::Timefail,
23}
24
25impl Logger1 {
26 pub async fn new(timefail: timefail::Timefail, ds: Datastore) -> Result<Self, LogErr> {
27 if timefail.is_timefail() {
28 info!("We are currently in TIMEFAIL mode.");
29 let count = ds.transaction_fail_pending().await?;
30 if count > 0 {
31 info!("Failed {count} pending change requests due to TIMEFAIL");
32 }
33 }
34 Ok(Self { ds, timefail })
35 }
36 #[must_use]
37 pub const fn builder() -> Builder {
38 Builder::new()
39 }
40
41 #[cfg(test)]
42 pub(crate) async fn persist_data(&self) {
43 self.ds
44 .persist_data()
45 .await
46 .expect("Failed to persist data");
47 }
48}
49
50#[derive(Debug, Copy, Clone, serde::Serialize, serde::Deserialize, zbus::zvariant::Type)]
55#[zvariant(signature = "s")] struct DataTypeWrp(modio_logger_db::DataType);
58
59#[derive(Debug, serde::Serialize, serde::Deserialize, zbus::zvariant::Type)]
60struct RowItem {
61 name: String,
62 datatype: DataTypeWrp,
63}
64
65#[must_use]
66fn maybe_metric(key: impl Into<String>, val: &zValue<'_>, time: f64) -> Option<Metric> {
67 value_to_string(val).map(|value| Metric {
68 name: key.into(),
69 value,
70 time,
71 })
72}
73
74fn metadata_change_metric() -> Metric {
75 let key = "modio.logger.metadata.change";
76 assert!(
77 keys::valid_key(key).is_ok(),
78 "Hardcoded key name should be valid"
79 );
80 let time = modio_logger_db::fxtime_ms();
81 Metric {
82 name: key.into(),
83 value: "1".into(),
84 time,
85 }
86}
87
88impl Logger1 {
89 async fn metadata_last_changed(
93 &self,
94 ctxt: &SignalEmitter<'_>,
95 key: &str,
96 ) -> Result<(), LogErr> {
97 let timefail = self.timefail.is_timefail();
98 let metric = metadata_change_metric();
99 let batch = vec![metric];
100 self.ds.insert_bulk(batch, timefail).await?;
101 Self::metadata_updated(ctxt, key).await?;
102 Ok(())
103 }
104}
105
106#[allow(clippy::use_self)]
107#[interface(name = "se.modio.logger.Logger1")]
108impl Logger1 {
109 #[zbus(signal)]
111 async fn metadata_updated(ctxt: &SignalEmitter<'_>, key: &str) -> zbus::Result<()>;
112
113 #[instrument(skip(self, ctxt))]
115 async fn set_metadata_name(
116 &mut self,
117 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
118 key: &str,
119 name: String,
120 ) -> Result<(), LogErr> {
121 if self
122 .ds
123 .metadata_set_name(key, &name)
124 .await
125 .inspect_err(|e| error!("Failed to set metadata name err={:?}", e))?
126 {
127 info!("Updated name of key={} to name='{}'", &key, &name);
128 self.metadata_last_changed(&ctxt, key).await?;
129 }
130 Ok(())
131 }
132
133 #[instrument(skip(self, ctxt))]
134 async fn set_metadata_description(
135 &mut self,
136 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
137 key: &str,
138 description: String,
139 ) -> Result<(), LogErr> {
140 if self
141 .ds
142 .metadata_set_description(key, &description)
143 .await
144 .inspect_err(|e| error!("Failed to set metadata description err={:?}", e))?
145 {
146 info!(
147 "Updated description of key={} to description='{}'",
148 &key, &description
149 );
150 self.metadata_last_changed(&ctxt, key).await?;
151 }
152 Ok(())
153 }
154
155 #[instrument(skip(self, ctxt))]
166 async fn set_metadata_row(
167 &mut self,
168 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
169 key: &str,
170 row_items: Vec<RowItem>,
171 ) -> Result<(), LogErr> {
172 let row_metadata: Vec<modio_logger_db::DataType> =
174 row_items.iter().map(|x| x.datatype.0).collect();
175
176 let desc = serde_json::to_string(&row_items)?;
178
179 self.ds
182 .metadata_set_row(key, row_metadata)
183 .await
184 .inspect_err(|e| error!("Failed to set metadata row err={:?}", e))?;
185 info!("Updated row metadata of key='{}'", key);
186
187 if self
188 .ds
189 .metadata_set_description(key, &desc)
190 .await
191 .inspect_err(|e| error!("Failed to set metadata description err={:?}", e))?
192 {
193 info!(
194 "Updated description of key={} to auto generated description='{}'",
195 &key, &desc
196 );
197 self.metadata_last_changed(&ctxt, key).await?;
198 }
199 Ok(())
200 }
201
202 #[instrument(skip(self, ctxt))]
203 async fn set_metadata_mode(
204 &mut self,
205 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
206 key: &str,
207 mode: SensorMode,
208 ) -> Result<(), LogErr> {
209 let new_mode: modio_logger_db::SensorMode = mode.into();
210 if self
211 .ds
212 .metadata_set_mode(key, &new_mode)
213 .await
214 .inspect_err(|e| error!("Failed to set metadata mode err={:?}", e))?
215 {
216 info!("Updated mode of key={} to mode='{:?}'", &key, &new_mode);
217 self.metadata_last_changed(&ctxt, key).await?;
218 }
219 Ok(())
220 }
221
222 #[instrument(skip(self, ctxt))]
223 async fn set_metadata_value_map(
224 &mut self,
225 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
226 key: &str,
227 value_map: ValueMap,
228 ) -> Result<(), LogErr> {
229 if self
230 .ds
231 .metadata_set_enum(key, &value_map)
232 .await
233 .inspect_err(|e| error!("Failed to set metadata enum err={:?}", e))?
234 {
235 info!(
236 "Updated metadata of key={} to enum='{:?}'",
237 &key, &value_map
238 );
239 self.metadata_last_changed(&ctxt, key).await?;
240 }
241 Ok(())
242 }
243
244 #[instrument(skip(self, ctxt))]
245 async fn set_metadata_unit(
246 &mut self,
247 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
248 key: &str,
249 unit: String,
250 ) -> Result<(), LogErr> {
251 use crate::types::UnitError;
252 use std::convert::TryFrom;
253
254 let unit = Unit::try_from(unit)?.into_inner();
256
257 let res = self.ds.metadata_set_unit(key, &unit).await;
258
259 match res {
266 Ok(true) => {
268 info!("Updated unit of key={} to unit={}", &key, &unit);
269 self.metadata_last_changed(&ctxt, key).await?;
270 Ok(())
271 }
272 Ok(false) => Ok(()),
274 Err(modio_logger_db::Error::Unique { source }) => {
275 debug!("Throwing away database error: {:?}", source);
276 Err(UnitError::Unique.into())
277 }
278 Err(e) => {
279 error!("Failed to set metadata unit. err={:?}", e);
280 Err(e.into())
281 }
282 }
283 }
284
285 #[instrument(skip(self))]
286 async fn get_metadata(&mut self, key: &str) -> Result<Metadata, LogErr> {
287 info!("Fetching metadata for key={}", &key);
290 match self.ds.get_metadata(key).await {
291 Ok(None) => Err(LogErr::NoMetadata),
292 Ok(Some(data)) => Ok(Metadata::from(data)),
293 Err(e @ modio_logger_db::Error::NotFound { .. }) => {
295 debug!(
296 "Requested metadata for non-existing key. key={}, err={:?}",
297 key, e
298 );
299 Err(e.into())
300 }
301 Err(e) => {
302 error!("Failed to fetch metadata from db. err={:?}", e);
303 Err(e.into())
304 }
305 }
306 }
307
308 #[zbus(signal)]
310 async fn store_signal(
311 ctxt: &SignalEmitter<'_>,
312 batch: Vec<(String, zValue<'_>, i64)>,
313 ) -> zbus::Result<()>;
314
315 #[instrument(skip_all)]
316 async fn store_batch(
317 &mut self,
318 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
319 mut batch: HashMap<String, zValue<'_>>,
320 ) -> Result<(), LogErr> {
321 let time = modio_logger_db::fxtime_ms();
322 let timefail = self.timefail.is_timefail();
323
324 for (key, value) in &batch {
327 if let Err(e) = keys::valid_key(key) {
328 warn!("Invalid key for key='{key}' value='{value:?}' err='{e}'");
329 return Err(e.into());
330 }
331 if let Err(e) = valid_metric(value) {
332 warn!("Invalid data for key='{key}' value='{value:?}' err='{e}'");
333 return Err(e.into());
334 }
335 }
336
337 let db_batch: Vec<Metric> = batch
341 .iter()
342 .filter_map(|(key, value)| maybe_metric(key, value, time))
345 .collect();
346
347 self.ds
349 .insert_bulk(db_batch, timefail)
350 .await
351 .inspect_err(|e| error!("Failed to insert batch of data to ds. err={:?}", e))?;
352 #[allow(clippy::cast_possible_truncation)]
353 let trunc_time = time as i64;
354
355 let payload: Vec<_> = batch
356 .drain()
357 .filter(|(key, _)| !key.starts_with("modio."))
359 .map(|(key, value)| (key, value, trunc_time))
361 .collect();
362
363 if !payload.is_empty() {
365 Self::store_signal(&ctxt, payload).await?;
366 }
367
368 Ok(())
369 }
370
371 #[instrument(skip_all)]
378 async fn store(
379 &mut self,
380 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
381 key: String,
382 value: zValue<'_>,
383 ) -> Result<(), LogErr> {
384 let batch = HashMap::from([(key, value)]);
385 self.store_batch(ctxt, batch).await?;
386 Ok(())
387 }
388
389 #[instrument(skip_all)]
405 async fn store_rows(&mut self, key: &str, rows: Vec<TimestampRow<'_>>) -> Result<(), LogErr> {
406 keys::valid_key(key)?;
407
408 let row_datatypes = self
410 .ds
411 .metadata_get_row(key)
412 .await
413 .inspect_err(|e| debug!("Failed to get row metadata for key. err={:?}", e))?;
415
416 let mut db_batch: Vec<Metric> = Vec::with_capacity(rows.len());
418
419 for (time, row) in rows {
420 check_data_row(&row_datatypes, &row)?;
422
423 let value = zbus_row_to_json_string(row)?;
424 let m = Metric {
425 name: key.to_string(),
426 value,
427 time,
428 };
429 db_batch.push(m);
430 }
431 self.ds
432 .insert_bulk(db_batch, false)
433 .await
434 .inspect_err(|e| error!("Failed to insert bulk data for key. err={:?}", e))?;
435 Ok(())
438 }
439
440 #[instrument(skip_all)]
445 async fn store_ts(
446 &mut self,
447 #[zbus(signal_context)] ctxt: SignalEmitter<'_>,
448 batch: Vec<(String, zValue<'_>, f64)>,
449 ) -> Result<(), LogErr> {
450 for (key, value, _) in &batch {
453 if let Err(e) = keys::valid_key(key) {
454 warn!("Invalid key for key='{key}' value='{value:?}' err='{e}'");
455 return Err(e.into());
456 }
457 if let Err(e) = valid_metric(value) {
458 warn!("Invalid data for key='{key}' value='{value:?}' err='{e}'");
459 return Err(e.into());
460 }
461 }
462
463 let db_batch: Vec<Metric> = batch
467 .iter()
468 .filter_map(|(key, value, time)| maybe_metric(key, value, *time))
471 .collect();
472
473 self.ds
475 .insert_bulk(db_batch, false)
476 .await
477 .inspect_err(|e| error!("Failed to insert data with ts. err={:?}", e))?;
478
479 #[allow(clippy::cast_possible_truncation)]
480 let payload: Vec<_> = batch
481 .into_iter()
482 .filter(|(key, _, _)| !key.starts_with("modio."))
484 .map(|(key, value, time)| (key, value, time as i64))
486 .collect();
487
488 if !payload.is_empty() {
490 Self::store_signal(&ctxt, payload).await?;
491 }
492 Ok(())
493 }
494}
495
496#[cfg(test)]
497mod test {
498 use super::Logger1;
499 use crate::conn::make_connection;
500 use crate::testing::TestServer;
501 use crate::types::Unit;
502 use async_std::task::sleep;
503 use fsipc::logger1::SensorMode;
504 use futures_util::{FutureExt, StreamExt};
505 use modio_logger_db::Datastore;
506 use modio_logger_db::SqlitePoolBuilder;
507 use std::collections::HashMap;
508 use std::error::Error;
509 use std::time::Duration;
510 use tempfile;
511 use test_log::test;
512 use timeout_macro::timeouttest;
513 use zbus::zvariant;
514
515 #[test(timeouttest)]
516 async fn set_metadata_works() -> Result<(), Box<dyn Error>> {
517 const PATH: &str = "/se/modio/logger/metadata";
518 let dbfile = tempfile::Builder::new()
519 .prefix("set_metadata_works")
520 .suffix(".sqlite")
521 .tempfile()
522 .expect("Error on tempfile");
523
524 let pool = SqlitePoolBuilder::new()
526 .db_path(dbfile.path())
527 .build()
528 .await
529 .expect("Error opening database");
530
531 let ds = Datastore::new(pool).await?;
532 {
533 let connection = make_connection(true).await?;
536 let logger = Logger1::builder()
537 .development(true)
538 .datastore(ds)
539 .build()
540 .await?;
541
542 connection.object_server().at(PATH, logger).await?;
545
546 let iface_ref = connection
549 .object_server()
550 .interface::<_, Logger1>(PATH)
551 .await?;
552
553 let mut logger = iface_ref.get_mut().await;
555 let ctx = iface_ref.signal_emitter();
559 logger
560 .set_metadata_name(
561 ctx.to_owned(),
562 "modio.key.key",
563 "Some internal name".to_string(),
564 )
565 .await?;
566 logger
567 .set_metadata_description(
568 ctx.to_owned(),
569 "modio.key.key",
570 "Some internal description".to_string(),
571 )
572 .await?;
573
574 logger
575 .set_metadata_name(
576 ctx.to_owned(),
577 "customer.key.key.key",
578 "Some customer name".to_string(),
579 )
580 .await?;
581 logger
582 .set_metadata_mode(
583 ctx.to_owned(),
584 "customer.key.key.key",
585 SensorMode::ReadWrite,
586 )
587 .await?;
588 logger
589 .set_metadata_description(
590 ctx.to_owned(),
591 "customer.key.key.key",
592 "Some customer description".to_string(),
593 )
594 .await?;
595 logger.persist_data().await;
596 drop(logger);
597 }
598 let pool = SqlitePoolBuilder::new()
600 .db_path(dbfile.path())
601 .build()
602 .await
603 .expect("Error opening database");
604 let ds = Datastore::new(pool).await?;
605 let res = ds.metadata_get_names().await?;
606 assert!(res.len() == 2);
608 eprintln!("{res:?}");
609 Ok(())
610 }
611
612 #[test(timeouttest)]
613 async fn set_unit_override_fails() -> Result<(), Box<dyn Error>> {
614 const PATH: &str = "/se/modio/logger/testcase";
615
616 let ds = Datastore::temporary().await;
617 {
618 let connection = make_connection(true).await?;
621 let logger = Logger1::builder()
622 .development(true)
623 .datastore(ds)
624 .build()
625 .await?;
626 connection.object_server().at(PATH, logger).await?;
629
630 let iface_ref = connection
633 .object_server()
634 .interface::<_, Logger1>(PATH)
635 .await?;
636 let mut logger = iface_ref.get_mut().await;
638 let ctx = iface_ref.signal_emitter();
642 logger
643 .set_metadata_name(
644 ctx.to_owned(),
645 "customer.key.key",
646 "Some customer name".to_string(),
647 )
648 .await?;
649
650 logger
651 .set_metadata_unit(ctx.to_owned(), "customer.key.key.key", Unit::string("Cel"))
652 .await?;
653
654 logger
656 .set_metadata_unit(ctx.to_owned(), "customer.key.key.key", Unit::string("Cel"))
657 .await?;
658
659 let res = logger
661 .set_metadata_unit(ctx.to_owned(), "customer.key.key.key", Unit::string("m"))
662 .await;
663 drop(logger);
664 let e = res.expect_err("should not be able to set it twice");
665 assert!(e.to_string().contains("May not replace unit"));
666 }
667 Ok(())
668 }
669
670 #[test(timeouttest)]
671 async fn set_read_only() -> Result<(), Box<dyn Error>> {
672 const PATH: &str = "/se/modio/logger/testcase/set_read_only";
673 let ds = Datastore::temporary().await;
674 {
675 let connection = make_connection(true).await?;
678 let logger = Logger1::builder()
679 .development(true)
680 .datastore(ds)
681 .build()
682 .await?;
683
684 connection.object_server().at(PATH, logger).await?;
687
688 let iface_ref = connection
691 .object_server()
692 .interface::<_, Logger1>(PATH)
693 .await?;
694 let mut logger = iface_ref.get_mut().await;
696
697 let ctx = iface_ref.signal_emitter();
698 logger
699 .set_metadata_name(
700 ctx.to_owned(),
701 "customer.key.key",
702 "Some customer name".to_string(),
703 )
704 .await?;
705 let res = logger.get_metadata("customer.key.key").await?;
706 assert!(res.mode.is_none());
707 logger
708 .set_metadata_mode(ctx.to_owned(), "customer.key.key", SensorMode::ReadOnly)
709 .await?;
710 let res = logger.get_metadata("customer.key.key").await?;
711 drop(logger);
712 assert_eq!(res.mode, Some(SensorMode::ReadOnly));
713 }
714 Ok(())
715 }
716
717 #[test(timeouttest)]
718 async fn no_modio_signals_in_batch() -> Result<(), Box<dyn Error>> {
719 let server = TestServer::new(line!()).await?;
720 let logger1 = server.logger1().await?;
721 let mut stream = logger1.receive_store_signal().await?;
722
723 let mut batch = HashMap::<String, zvariant::Value<'_>>::new();
725 batch.insert("test.test.string".into(), String::from("string").into());
726 batch.insert("test.test.int".into(), (42_u64).into());
727 batch.insert("test.test.float".into(), (0.3_f64).into());
728 batch.insert("modio.test.bool.true".into(), (true).into());
729 batch.insert("modio.test.bool.false".into(), (false).into());
730 logger1.store_batch(batch).await?;
731
732 let sig = stream.next().await.unwrap();
734 let payload = sig.args()?;
735 assert!(payload.batch.len() == 3, "Should have three out of 4 keys");
736 for (key, _, _) in payload.batch {
738 assert!(key.starts_with("test.test"));
739 }
740 let last_signal = stream.next().now_or_never();
742 assert!(last_signal.is_none());
743
744 let ipc = server.proxy().await?;
745 {
746 let m = ipc.retrieve("test.test.string").await?;
747 assert_eq!(m.key, "test.test.string");
748 assert_eq!(m.value, "string");
749 }
750 {
751 let m = ipc.retrieve("test.test.int").await?;
752 assert_eq!(m.key, "test.test.int");
753 assert_eq!(m.value, "42");
754 }
755
756 {
757 let m = ipc.retrieve("test.test.float").await?;
758 assert_eq!(m.key, "test.test.float");
759 assert_eq!(m.value, "0.3");
760 }
761
762 {
763 let m = ipc.retrieve("modio.test.bool.true").await?;
764 assert_eq!(m.key, "modio.test.bool.true");
765 assert_eq!(m.value, "1");
766 }
767
768 {
769 let m = ipc.retrieve("modio.test.bool.false").await?;
770 assert_eq!(m.key, "modio.test.bool.false");
771 assert_eq!(m.value, "0");
772 }
773
774 Ok(())
775 }
776
777 #[test(timeouttest)]
778 async fn modio_only_no_signals() -> Result<(), Box<dyn Error>> {
779 let server = TestServer::new(line!()).await?;
780 let logger1 = server.logger1().await?;
781 let mut stream = logger1.receive_store_signal().await?;
782
783 let mut batch = HashMap::<String, zvariant::Value<'_>>::new();
785 batch.insert("modio.test.bool.true".into(), (true).into());
786 batch.insert("modio.test.bool.false".into(), (false).into());
787 logger1.store_batch(batch).await?;
788
789 assert!(
791 stream.next().now_or_never().is_none(),
792 "Should have no signals pending"
793 );
794 Ok(())
795 }
796
797 #[test(timeouttest)]
798 async fn store_singles_work_as_well() -> Result<(), Box<dyn Error>> {
799 let server = TestServer::new(line!()).await?;
800 let logger1 = server.logger1().await?;
801 let mut stream = logger1.receive_store_signal().await?;
802 logger1
803 .store("test.test.string".into(), String::from("string").into())
804 .await?;
805 logger1
806 .store("test.test.int".into(), (42_u64).into())
807 .await?;
808 logger1
809 .store("test.test.float".into(), (0.3_f64).into())
810 .await?;
811 logger1
812 .store("modio.test.bool.true".into(), (true).into())
813 .await?;
814 logger1
815 .store("modio.test.bool.false".into(), (false).into())
816 .await?;
817
818 for _ in 0..3 {
820 let sig = stream.next().await.unwrap();
821 let payload = sig.args()?;
822 assert!(payload.batch.len() == 1, "May have 1 after single stores");
824 for (key, _, _) in payload.batch {
826 assert!(key.starts_with("test.test"));
827 }
828 }
829 assert!(
831 stream.next().now_or_never().is_none(),
832 "Should have no signals pending"
833 );
834 Ok(())
835 }
836
837 #[test(async_std::test)]
838 async fn store_misc_with_ts_works() -> Result<(), Box<dyn Error>> {
839 let server = TestServer::new(line!()).await?;
840 let logger1 = server.logger1().await?;
841 let mut stream = logger1.receive_store_signal().await?;
842
843 fn tst<'a>(
845 key: &str,
846 val: impl Into<zvariant::Value<'a>>,
847 ts: f64,
848 ) -> (String, zvariant::Value<'a>, f64) {
849 (key.into(), val.into(), ts)
850 }
851
852 let values = vec![
853 tst("test.test.string", "string", 1708017581.0),
854 tst("test.test.int", 42_u64, 1708017581.1),
855 tst("test.test.float", 0.3_f64, 1708017581.2),
856 tst("modio.test.bool.true", true, 1708017581.3),
857 tst("modio.test.bool.false", false, 1708017581.4),
858 ];
859 logger1.store_ts(values).await?;
860
861 let sig = stream.next().await.unwrap();
863 let payload = sig.args()?;
864 assert!(
865 payload.batch.len() == 3,
866 "Should have three sets in the payload"
867 );
868 for (key, _, _) in payload.batch {
870 assert!(key.starts_with("test.test"));
871 }
872 assert!(
874 stream.next().now_or_never().is_none(),
875 "Should have no signals pending"
876 );
877 Ok(())
878 }
879
880 #[test(async_std::test)]
881 async fn store_rows_works() -> Result<(), Box<dyn Error>> {
882 use zbus::zvariant::Value;
883
884 let server = TestServer::new(line!()).await?;
885 let logger1 = server.logger1().await?;
886 let mut stream = logger1.receive_store_signal().await?;
887
888 fn cast(v: &[(f64, [f64; 7])]) -> Vec<(f64, Vec<Value<'static>>)> {
889 v.iter()
890 .map(|(ts, vals)| (*ts, vals.iter().map(|x| Value::from(*x)).collect()))
891 .collect()
892 }
893 let raw_rows = [
895 (1707329936.0, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
896 (1707329936.1, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
897 (1707329936.2, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
898 (1707329936.3, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
899 (1707329936.4, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
900 (1707329936.5, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
901 (1707329936.6, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
902 (1707329936.7, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
903 (1707329936.8, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
904 (1707329936.9, [0.0, 1.1, 2.2, 3.3, 4.4, 5.5, 6.6]),
905 ];
906 let rows = cast(&raw_rows);
907 let rows2 = cast(&raw_rows);
908
909 let res1 = logger1.store_rows("test.rows.key", rows2).await;
910 res1.expect_err("Should be an error that test.rows.key is not prepared");
911 let row_items = vec![
912 ("nil", "f64"),
913 ("First_name", "f64"),
914 ("Second", "f64"),
915 ("Thirdly", "f64"),
916 ("other", "f64"),
917 ("fifthly", "f64"),
918 ("sixer", "f64"),
919 ];
920 logger1.set_metadata_row("test.rows.key", row_items).await?;
921
922 let res1 = logger1.store_rows("test.rows.key", rows).await;
923 res1.expect("Should be ok because test.rows.key is prepared");
924 assert!(
926 stream.next().now_or_never().is_none(),
927 "Should have no signals pending"
928 );
929 Ok(())
930 }
931
932 #[test(async_std::test)]
933 async fn row_metadata_checks() -> Result<(), Box<dyn Error>> {
934 let server = TestServer::new(line!()).await?;
935 let logger1 = server.logger1().await?;
936
937 let mut row_items = vec![
938 ("nil", "f64"),
939 ("First_name", "string"),
940 ("Second", "u64"),
941 ("liar", "bool"),
942 ];
943
944 logger1
945 .set_metadata_row("test.rows.meta", row_items.clone())
946 .await?;
947 logger1
949 .set_metadata_row("test.rows.meta", row_items.clone())
950 .await?;
951
952 row_items.push(("this fails", "f64"));
953 logger1
955 .set_metadata_row("test.rows.meta", row_items)
956 .await
957 .expect_err("Expected a failure but got success");
958 Ok(())
959 }
960 #[test(timeouttest)]
961 async fn timestamp_on_update() -> Result<(), Box<dyn Error>> {
962 let server = TestServer::new(line!()).await?;
963 let logger1 = server.logger1().await?;
964 let ipc = server.proxy().await?;
965 let second = Duration::from_secs(1);
966 ipc.retrieve("modio.logger.metadata.change")
967 .await
968 .expect_err("Should have no data");
969
970 logger1
971 .set_metadata_name("customer.key.key", "Some customer name")
972 .await
973 .expect("Should be able to set name");
974 let res = logger1
975 .get_metadata("customer.key.key")
976 .await
977 .expect("Should have metadata");
978 assert!(res.name.is_some());
979 let metric = ipc
980 .retrieve("modio.logger.metadata.change")
981 .await
982 .expect("Should have data for metadata changed");
983
984 sleep(second).await;
986
987 logger1
988 .set_metadata_name("customer.key.key", "Some customer name")
989 .await
990 .expect("Should be able to set name to the same thing");
991 let metric2 = ipc
992 .retrieve("modio.logger.metadata.change")
993 .await
994 .expect("Should have data for metadata changed");
995 assert_eq!(
996 metric.timestamp, metric2.timestamp,
997 "Timestamp should not change when no metadata updated."
998 );
999 sleep(second).await;
1001 logger1
1002 .set_metadata_name("customer.key.key", "Some OTHER name")
1003 .await
1004 .expect("Should be able to change the name");
1005 let metric3 = ipc
1006 .retrieve("modio.logger.metadata.change")
1007 .await
1008 .expect("Should have data for metadata changed");
1009 assert!(
1010 metric.timestamp < metric3.timestamp,
1011 "Timestamp should have updated"
1012 );
1013 Ok(())
1014 }
1015}