use log::{debug, info, warn};
use crate::logger::{keys, LogErr};
use crate::timefail;
use crate::types::DataTypeError;
use crate::types::Metadata;
use crate::types::Unit;
use crate::types::ValueMap;
use fsipc::logger1::SensorMode;
use modio_logger_db::{Datastore, Metric};
use std::collections::HashMap;
use zbus::{dbus_interface, zvariant, SignalContext};
mod builder;
pub use builder::Builder;
pub struct Logger1 {
ds: Datastore,
timefail: timefail::Timefail,
}
impl Logger1 {
pub async fn new(timefail: timefail::Timefail, ds: Datastore) -> Result<Self, LogErr> {
if timefail.is_timefail() {
info!("We are currently in TIMEFAIL mode.");
let count = ds.transaction_fail_pending().await?;
if count > 0 {
info!("Failed {count} pending change requests due to TIMEFAIL");
}
}
Ok(Self { ds, timefail })
}
#[must_use]
pub fn builder() -> Builder {
Builder::new()
}
#[cfg(test)]
pub(crate) async fn persist_data(&self) {
self.ds
.persist_data()
.await
.expect("Failed to persist data");
}
}
const fn valid_metric(value: &zvariant::Value<'_>) -> Result<(), DataTypeError> {
use zvariant::Value::{Bool, Str, F64, I16, I32, I64, U16, U32, U64, U8};
match value {
U64(_) | U32(_) | U16(_) | U8(_) | F64(_) | Str(_) | Bool(_) | I16(_) | I32(_) | I64(_) => {
Ok(())
}
_ => Err(DataTypeError::Unknown),
}
}
fn value_to_string(value: &zvariant::Value<'_>) -> Option<String> {
use zvariant::Value::{Bool, Str, F64, I16, I32, I64, U16, U32, U64, U8};
match value {
U64(v) => Some(format!("{v}")),
U32(v) => Some(format!("{v}")),
U16(v) => Some(format!("{v}")),
U8(v) => Some(format!("{v}")),
F64(v) => Some(format!("{v}")),
Str(v) => Some(format!("{v}")),
Bool(v) => Some(format!("{}", u32::from(*v))),
I16(v) => Some(format!("{v}")),
I32(v) => Some(format!("{v}")),
I64(v) => Some(format!("{v}")),
_ => None,
}
}
#[dbus_interface(name = "se.modio.logger.Logger1")]
impl Logger1 {
#[dbus_interface(signal)]
async fn metadata_updated(ctxt: &SignalContext<'_>, key: &str) -> zbus::Result<()>;
async fn set_metadata_name(
&mut self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
key: &str,
name: String,
) -> Result<(), LogErr> {
if self.ds.metadata_set_name(key, &name).await? {
info!("Updated name of key={} to name='{}'", &key, &name);
Self::metadata_updated(&ctxt, key).await?;
}
Ok(())
}
async fn set_metadata_description(
&mut self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
key: &str,
description: String,
) -> Result<(), LogErr> {
if self.ds.metadata_set_description(key, &description).await? {
info!(
"Updated description of key={} to description='{}'",
&key, &description
);
Self::metadata_updated(&ctxt, key).await?;
}
Ok(())
}
async fn set_metadata_mode(
&mut self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
key: &str,
mode: SensorMode,
) -> Result<(), LogErr> {
let new_mode: modio_logger_db::SensorMode = mode.into();
if self.ds.metadata_set_mode(key, &new_mode).await? {
info!("Updated mode of key={} to mode='{:?}'", &key, &new_mode);
Self::metadata_updated(&ctxt, key).await?;
}
Ok(())
}
async fn set_metadata_value_map(
&mut self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
key: &str,
value_map: ValueMap,
) -> Result<(), LogErr> {
if self.ds.metadata_set_enum(key, &value_map).await? {
info!(
"Updated metadata of key={} to enum='{:?}'",
&key, &value_map
);
Self::metadata_updated(&ctxt, key).await?;
}
Ok(())
}
async fn set_metadata_unit(
&mut self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
key: &str,
unit: String,
) -> Result<(), LogErr> {
use crate::types::UnitError;
use std::convert::TryFrom;
let unit = Unit::try_from(unit)?.into_inner();
let res = self.ds.metadata_set_unit(key, &unit).await;
match res {
Ok(true) => {
info!("Updated unit of key={} to unit={}", &key, &unit);
Self::metadata_updated(&ctxt, key).await?;
Ok(())
}
Ok(false) => Ok(()),
Err(modio_logger_db::Error::Unique { source }) => {
debug!("Throwing away database error: {:?}", source);
Err(UnitError::Unique.into())
}
Err(e) => Err(e.into()),
}
}
async fn get_metadata(&mut self, key: &str) -> Result<Metadata, LogErr> {
info!("Fetching metadata for key={}", &key);
let res = self.ds.get_metadata(key).await?;
if let Some(ret) = res {
Ok(Metadata::from(ret))
} else {
Err(LogErr::NotFound("NoData".into()))
}
}
#[dbus_interface(signal)]
async fn store_signal(
ctxt: &SignalContext<'_>,
batch: Vec<(String, zvariant::Value<'_>, i64)>,
) -> zbus::Result<()>;
async fn store_batch(
&mut self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
mut batch: HashMap<String, zvariant::Value<'_>>,
) -> Result<(), LogErr> {
let when = modio_logger_db::inixtime();
let timefail = self.timefail.is_timefail();
for (key, value) in &batch {
if let Err(e) = keys::valid_key(key) {
warn!("Invalid key for key='{key}' value='{value:?}' err='{e}'");
return Err(e.into());
}
if let Err(e) = valid_metric(value) {
warn!("Invalid data for key='{key}' value='{value:?}' err='{e}'");
return Err(e.into());
}
}
let db_batch: Vec<Metric> = batch
.iter()
.filter_map(|(key, value)| value_to_string(value).map(|val| (key, val)))
.map(|(key, value)| Metric {
name: key.clone(),
value,
time: when,
})
.collect();
self.ds.insert_bulk(db_batch, timefail).await?;
let payload: Vec<_> = batch
.drain()
.filter(|(key, _)| !key.starts_with("modio."))
.map(|(key, value)| (key, value, when))
.collect();
if !payload.is_empty() {
Self::store_signal(&ctxt, payload).await?;
}
Ok(())
}
async fn store(
&mut self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
key: String,
value: zvariant::Value<'_>,
) -> Result<(), LogErr> {
let batch = HashMap::from([(key, value)]);
self.store_batch(ctxt, batch).await?;
Ok(())
}
}
#[cfg(test)]
mod test {
use super::Logger1;
use crate::conn::make_connection;
use crate::testing::TestServer;
use crate::types::Unit;
use fsipc::logger1::SensorMode;
use futures_util::{FutureExt, StreamExt};
use modio_logger_db::Datastore;
use modio_logger_db::SqlitePoolBuilder;
use std::collections::HashMap;
use std::error::Error;
use tempfile;
use zbus::zvariant;
#[tokio::test]
async fn set_metadata_works() -> Result<(), Box<dyn Error>> {
const PATH: &str = "/se/modio/logger/metadata";
let _elog = env_logger::builder().is_test(true).try_init();
let dbfile = tempfile::Builder::new()
.prefix("set_metadata_works")
.suffix(".sqlite")
.tempfile()
.expect("Error on tempfile");
let pool = SqlitePoolBuilder::new()
.db_path(dbfile.path())
.build()
.await
.expect("Error opening database");
let ds = Datastore::new(pool).await?;
{
let connection = make_connection(true).await?;
let logger = Logger1::builder()
.development(true)
.datastore(ds)
.build()
.await?;
connection.object_server().at(PATH, logger).await?;
let iface_ref = connection
.object_server()
.interface::<_, Logger1>(PATH)
.await?;
let mut logger = iface_ref.get_mut().await;
let ctx = iface_ref.signal_context();
logger
.set_metadata_name(
ctx.to_owned(),
"modio.key.key",
"Some internal name".to_string(),
)
.await?;
logger
.set_metadata_description(
ctx.to_owned(),
"modio.key.key",
"Some internal description".to_string(),
)
.await?;
logger
.set_metadata_name(
ctx.to_owned(),
"customer.key.key.key",
"Some customer name".to_string(),
)
.await?;
logger
.set_metadata_mode(
ctx.to_owned(),
"customer.key.key.key",
SensorMode::ReadWrite,
)
.await?;
logger
.set_metadata_description(
ctx.to_owned(),
"customer.key.key.key",
"Some customer description".to_string(),
)
.await?;
logger.persist_data().await;
}
let pool = SqlitePoolBuilder::new()
.db_path(dbfile.path())
.build()
.await
.expect("Error opening database");
let ds = Datastore::new(pool).await?;
let res = ds.metadata_get_names().await?;
assert!(res.len() == 2);
eprintln!("{res:?}");
Ok(())
}
#[tokio::test]
async fn set_unit_override_fails() -> Result<(), Box<dyn Error>> {
const PATH: &str = "/se/modio/logger/testcase";
let _elog = env_logger::builder().is_test(true).try_init();
let ds = Datastore::temporary().await;
{
let connection = make_connection(true).await?;
let logger = Logger1::builder()
.development(true)
.datastore(ds)
.build()
.await?;
connection.object_server().at(PATH, logger).await?;
let iface_ref = connection
.object_server()
.interface::<_, Logger1>(PATH)
.await?;
let mut logger = iface_ref.get_mut().await;
let ctx = iface_ref.signal_context();
logger
.set_metadata_name(
ctx.to_owned(),
"customer.key.key",
"Some customer name".to_string(),
)
.await?;
logger
.set_metadata_unit(ctx.to_owned(), "customer.key.key.key", Unit::string("Cel"))
.await?;
logger
.set_metadata_unit(ctx.to_owned(), "customer.key.key.key", Unit::string("Cel"))
.await?;
let res = logger
.set_metadata_unit(ctx.to_owned(), "customer.key.key.key", Unit::string("m"))
.await;
let e = res.expect_err("should not be able to set it twice");
assert!(e.to_string().contains("May not replace unit"));
}
Ok(())
}
#[tokio::test]
async fn set_read_only() -> Result<(), Box<dyn Error>> {
const PATH: &str = "/se/modio/logger/testcase/set_read_only";
let _elog = env_logger::builder().is_test(true).try_init();
let ds = Datastore::temporary().await;
{
let connection = make_connection(true).await?;
let logger = Logger1::builder()
.development(true)
.datastore(ds)
.build()
.await?;
connection.object_server().at(PATH, logger).await?;
let iface_ref = connection
.object_server()
.interface::<_, Logger1>(PATH)
.await?;
let mut logger = iface_ref.get_mut().await;
let ctx = iface_ref.signal_context();
logger
.set_metadata_name(
ctx.to_owned(),
"customer.key.key",
"Some customer name".to_string(),
)
.await?;
let res = logger.get_metadata("customer.key.key").await?;
assert!(res.mode.is_none());
logger
.set_metadata_mode(ctx.to_owned(), "customer.key.key", SensorMode::ReadOnly)
.await?;
let res = logger.get_metadata("customer.key.key").await?;
assert_eq!(res.mode, Some(SensorMode::ReadOnly));
}
Ok(())
}
#[tokio::test]
async fn no_modio_signals_in_batch() -> Result<(), Box<dyn Error>> {
let server = TestServer::new(line!()).await?;
let logger1 = server.logger1().await?;
let mut stream = logger1.receive_store_signal().await?;
let mut batch = HashMap::<String, zvariant::Value<'_>>::new();
batch.insert("test.test.string".into(), String::from("string").into());
batch.insert("test.test.int".into(), (42_u64).into());
batch.insert("test.test.float".into(), (0.3_f64).into());
batch.insert("modio.test.bool.true".into(), (true).into());
batch.insert("modio.test.bool.false".into(), (false).into());
logger1.store_batch(batch).await?;
let sig = stream.next().await.unwrap();
let payload = sig.args()?;
assert!(payload.batch.len() == 3, "Should have three out of 4 keys");
for (key, _, _) in payload.batch {
assert!(key.starts_with("test.test"));
}
let last_signal = stream.next().now_or_never();
assert!(last_signal.is_none());
let ipc = server.proxy().await?;
{
let m = ipc.retrieve("test.test.string").await?;
assert_eq!(m.key, "test.test.string");
assert_eq!(m.value, "string");
}
{
let m = ipc.retrieve("test.test.int").await?;
assert_eq!(m.key, "test.test.int");
assert_eq!(m.value, "42");
}
{
let m = ipc.retrieve("test.test.float").await?;
assert_eq!(m.key, "test.test.float");
assert_eq!(m.value, "0.3");
}
{
let m = ipc.retrieve("modio.test.bool.true").await?;
assert_eq!(m.key, "modio.test.bool.true");
assert_eq!(m.value, "1");
}
{
let m = ipc.retrieve("modio.test.bool.false").await?;
assert_eq!(m.key, "modio.test.bool.false");
assert_eq!(m.value, "0");
}
Ok(())
}
#[tokio::test]
async fn modio_only_no_signals() -> Result<(), Box<dyn Error>> {
let server = TestServer::new(line!()).await?;
let logger1 = server.logger1().await?;
let mut stream = logger1.receive_store_signal().await?;
let mut batch = HashMap::<String, zvariant::Value<'_>>::new();
batch.insert("modio.test.bool.true".into(), (true).into());
batch.insert("modio.test.bool.false".into(), (false).into());
logger1.store_batch(batch).await?;
assert!(
stream.next().now_or_never().is_none(),
"Should have no signals pending"
);
Ok(())
}
#[tokio::test]
async fn store_singles_work_as_well() -> Result<(), Box<dyn Error>> {
let server = TestServer::new(line!()).await?;
let logger1 = server.logger1().await?;
let mut stream = logger1.receive_store_signal().await?;
logger1
.store("test.test.string".into(), String::from("string").into())
.await?;
logger1
.store("test.test.int".into(), (42_u64).into())
.await?;
logger1
.store("test.test.float".into(), (0.3_f64).into())
.await?;
logger1
.store("modio.test.bool.true".into(), (true).into())
.await?;
logger1
.store("modio.test.bool.false".into(), (false).into())
.await?;
for _ in 0..3 {
let sig = stream.next().await.unwrap();
let payload = sig.args()?;
assert!(payload.batch.len() == 1, "May have 1 after single stores");
for (key, _, _) in payload.batch {
assert!(key.starts_with("test.test"));
}
}
assert!(
stream.next().now_or_never().is_none(),
"Should have no signals pending"
);
Ok(())
}
}