#![allow(clippy::module_name_repetitions)]
use log::{debug, error, info, warn};
use fsipc::legacy::{PreparedPoint, Transaction};
use fsipc::unixtime;
use super::timefail;
use modio_logger_db::Datastore;
use zbus::{dbus_interface, SignalContext};
mod errors;
mod keys;
pub use errors::LogErr;
mod ping;
pub use ping::LoggerPing;
mod builder;
pub use builder::Builder;
pub struct Logger {
ds: Datastore,
timefail: timefail::Timefail,
}
impl Logger {
pub async fn new(timefail: timefail::Timefail, ds: Datastore) -> Result<Self, LogErr> {
if timefail.is_timefail() {
info!("Failing all pending change requests due to TIMEFAIL");
ds.transaction_fail_pending().await?;
}
ds.fail_queued_transactions().await?;
Ok(Self { ds, timefail })
}
async fn periodic(&self) -> Result<(), LogErr> {
info!("Doing periodic stuff like timefail");
self.ds.persist_data().await?;
if self.timefail.is_adjust() {
if let Some(adjust) = self.timefail.get_adjust().await {
info!("Time jump has happened, adjusting data with {}", adjust);
self.ds.fix_timefail(adjust).await?;
self.timefail.remove_adjust().await?;
}
}
let count = self.ds.fail_queued_transactions().await?;
info!("Failed {} queued transactions from db", count);
let count = self.ds.delete_old_transactions().await?;
info!("Removed {} old transactions from db", count);
let count = self.ds.delete_old_logdata().await?;
info!("Removed {} old log values from db", count);
self.ds.need_vacuum_or_shrink().await?;
Ok(())
}
#[must_use]
pub fn builder() -> Builder {
Builder::new()
}
}
async fn get_mac() -> String {
use async_std::fs;
let wan = std::path::Path::new("/sys/class/net/wan/address");
let mut res = match fs::read_to_string(&wan).await {
Ok(data) => data.to_lowercase(),
Err(_) => String::from("00:00:00:00:00:00"),
};
res.retain(|c| matches!(c, '0'..='9' | 'a'..='f'));
res
}
#[dbus_interface(name = "se.modio.logger.fsipc")]
impl Logger {
#[allow(clippy::unused_self)]
const fn ping(&self) -> &str {
"Ping? Pong"
}
#[allow(clippy::unused_self)]
fn valid_key(&mut self, key: &str) -> bool {
keys::valid_key(key).is_ok()
}
#[allow(clippy::unused_self)]
async fn get_boxid(&self) -> String {
get_mac().await
}
async fn retrieve(&mut self, key: &str) -> Result<(fsipc::legacy::Measure,), LogErr> {
keys::valid_key(key)?;
let res = self.ds.get_last_datapoint(key).await;
match res {
Ok(dat) => {
let val = fsipc::legacy::Measure::from(dat);
Ok((val,))
}
Err(_e) => Err(LogErr::NotFound("Key not found in storage".to_string())),
}
}
async fn retrieve_all(&mut self) -> Result<Vec<fsipc::legacy::Measure>, LogErr> {
let mut result = self.ds.get_latest_logdata().await?;
let res: Vec<fsipc::legacy::Measure> = result
.drain(0..)
.map(fsipc::legacy::Measure::from)
.collect();
Ok(res)
}
#[dbus_interface(signal)]
async fn store_signal(
ctxt: &SignalContext<'_>,
key: &str,
value: &str,
when: u64,
) -> zbus::Result<()>;
async fn store(
&self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
key: &str,
value: &str,
) -> Result<(), LogErr> {
keys::valid_key(key)?;
let when = unixtime();
let timefail = self.timefail.is_timefail();
#[allow(clippy::cast_possible_wrap)]
let db_when = when as i64;
self.ds.insert(key, value, db_when, timefail).await?;
if !key.starts_with("modio.") {
Self::store_signal(&ctxt, key, value, when).await?;
};
Ok(())
}
async fn store_with_time(
&mut self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
key: &str,
value: &str,
when: u64,
) -> Result<(), LogErr> {
keys::valid_key(key)?;
let timefail = false;
#[allow(clippy::cast_possible_wrap)]
let db_when = when as i64;
self.ds.insert(key, value, db_when, timefail).await?;
if !key.starts_with("modio.") {
Self::store_signal(&ctxt, key, value, when).await?;
};
Ok(())
}
#[dbus_interface(signal)]
async fn transaction_added(ctxt: &SignalContext<'_>, key: &str) -> zbus::Result<()>;
async fn transaction_add(
&mut self,
#[zbus(signal_context)] ctxt: SignalContext<'_>,
key: &str,
expected: &str,
target: &str,
token: &str,
) -> Result<(), LogErr> {
keys::valid_key(key)?;
keys::valid_token(token)?;
if self.ds.has_transaction(token).await? {
warn!(
"Duplicate transaction (key: {}, token: {}) Ignoring for backwards compatibility.",
key, token
);
return Ok(());
}
self.ds
.transaction_add(key, expected, target, token)
.await?;
Self::transaction_added(&ctxt, key).await?;
Ok(())
}
async fn transaction_get(&mut self, prefix: &str) -> Result<Vec<Transaction>, LogErr> {
debug!("Retrieving transactions beginning with {}", prefix);
let mut res = self.ds.transaction_get(prefix).await?;
let res: Vec<Transaction> = res.drain(0..).map(Transaction::from).collect();
Ok(res)
}
async fn transaction_fail(&mut self, t_id: u64) -> Result<(), LogErr> {
debug!("Marking transaction: t_id={}, failed", t_id);
let timefail = self.timefail.is_timefail();
#[allow(clippy::cast_possible_wrap)]
let t_id = t_id as i64;
let count = self.ds.transaction_fail(t_id, timefail).await?;
if count > 0 {
Ok(())
} else {
Err(LogErr::NotFound("No such ID".into()))
}
}
async fn transaction_pass(&mut self, t_id: u64) -> Result<(), LogErr> {
debug!("Marking transaction: t_id={}, passed ", t_id);
let timefail = self.timefail.is_timefail();
#[allow(clippy::cast_possible_wrap)]
let t_id = t_id as i64;
let count = self.ds.transaction_pass(t_id, timefail).await?;
if count > 0 {
Ok(())
} else {
Err(LogErr::NotFound("No such ID".into()))
}
}
async fn prepare_datapoints(&mut self, maximum: u32) -> Result<Vec<PreparedPoint>, LogErr> {
prepare_range_check(maximum)?;
let mut data = self.ds.get_batch(maximum).await?;
let result: Vec<PreparedPoint> = data.drain(0..).map(PreparedPoint::from).collect();
Ok(result)
}
async fn prepare_modio_datapoints(
&mut self,
maximum: u32,
) -> Result<Vec<PreparedPoint>, LogErr> {
prepare_range_check(maximum)?;
let mut data = self.ds.get_internal_batch(maximum).await?;
let result: Vec<PreparedPoint> = data.drain(0..).map(PreparedPoint::from).collect();
Ok(result)
}
async fn remove_prepared(&mut self, items: Vec<i64>) -> Result<(), LogErr> {
if items.is_empty() {
return Err(LogErr::NotFound("Empty set".to_string()));
}
if !items.iter().all(|x| *x >= 0_i64) {
return Err(LogErr::NotFound("Invalid index".to_string()));
}
self.ds.drop_batch(&items).await?;
Ok(())
}
}
fn prepare_range_check(num: u32) -> Result<(), LogErr> {
match num {
0 => Err(LogErr::NotFound("Too small".to_string())),
1..=250 => Ok(()),
_ => Err(LogErr::NotFound("Too big".to_string())),
}
}
pub async fn call_periodic(iface: zbus::InterfaceDerefMut<'_, Logger>) -> Result<(), zbus::Error> {
let res = iface.periodic().await;
match res {
Ok(o) => Ok(o),
Err(e) => {
error!("Periodic task failed {}", e);
let err = zbus::Error::Unsupported;
Err(err)
}
}
}
#[cfg(test)]
pub mod tests {
use super::*;
use crate::testing::{launch_server, test_server_with_paths, Tempbase, TestPaths};
use fsipc::legacy::fsipcProxy;
use std::error::Error;
type TestResult = Result<(), Box<dyn Error>>;
use futures_util::future::FutureExt;
use std::future::Future;
async fn run_testcase<'a, F, T>(line: u32, func: F) -> TestResult
where
F: FnOnce(fsipcProxy<'a>) -> T,
T: Future<Output = TestResult> + Send + 'static,
{
let (logger, done, task) = launch_server(line).await?;
let res = func(logger).await;
done.notify(1);
task.cancel().await;
res
}
#[async_std::test]
async fn use_run_testcase() -> TestResult {
async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
let res = proxy.done().await;
assert!(res.is_err());
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn ping_pong_test() -> TestResult {
async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
let first = proxy.ping().await?;
let second = proxy.ping().await?;
assert_eq!(first, "Ping? Pong");
assert_eq!(second, "Ping? Pong");
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn done_gives_error_test() -> TestResult {
async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
let res = proxy.done().await;
assert!(res.is_err());
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn store_retrieve() -> TestResult {
async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
proxy.store("test.key", "abc123").await?;
let m = proxy.retrieve("test.key").await?;
assert_eq!(m.key, "test.key");
assert_eq!(m.value, "abc123");
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn store_buffer() -> TestResult {
async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
proxy
.store_with_time("test.key", "abc123", 1_494_602_107)
.await?;
proxy
.store_with_time("test.key", "abc1234", 1_494_602_108)
.await?;
let m = proxy.retrieve("test.key").await?;
assert_eq!(m.key, "test.key");
assert_eq!(m.value, "abc1234");
assert_eq!(m.timestamp, 1_494_602_108);
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn retrieve_all_test() -> TestResult {
async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
proxy
.store_with_time("test.key", "abc123", 1_494_602_107)
.await?;
proxy
.store_with_time("test.key", "abc1234", 1_494_602_108)
.await?;
proxy.store("test.key2", "abcdefg").await?;
let all = proxy.retrieve_all().await?;
assert_eq!(all.len(), 2);
let m0 = all.get(0).expect("Should have value");
assert_eq!(m0.key, "test.key");
assert_eq!(m0.value, "abc1234");
assert_eq!(m0.timestamp, 1_494_602_108);
let m1 = all.get(1).expect("Should have value");
assert_eq!(m1.key, "test.key2");
assert_eq!(m1.value, "abcdefg");
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn transaction_adding_test() -> TestResult {
async fn inner(proxy: fsipcProxy<'_>) -> TestResult {
proxy
.transaction_add("test.test.one", "first", "second", "012")
.await?;
proxy
.transaction_add("dummy.test.one", "should not", "be present", "013")
.await?;
let transactions = proxy.transaction_get("test.test").await?;
assert_eq!(transactions.len(), 1);
let res = &transactions[0];
assert_eq!(res.key, "test.test.one");
assert_eq!(res.t_id, 1, "Transaction ID mismatch");
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn transaction_dupe_adding_test() -> TestResult {
async fn transaction_dupe_adding(proxy: fsipcProxy<'_>) -> TestResult {
proxy
.transaction_add("test.test.one", "first", "second", "1638290048")
.await?;
let res = proxy
.transaction_add("test.test.one", "first", "second", "1638290048")
.await;
res.expect("duplicated tokens should not cause error");
let transactions = proxy.transaction_get("test.test").await?;
assert_eq!(transactions.len(), 1);
Ok(())
}
run_testcase(line!(), transaction_dupe_adding).await
}
use futures_util::stream::StreamExt;
#[async_std::test]
async fn transaction_signal_test() -> TestResult {
async fn inner(logger: fsipcProxy<'_>) -> TestResult {
let mut stream = logger.receive_transaction_added().await?;
let sender =
logger.transaction_add("test.test.transaction_signal", "first", "second", "012");
let recver = stream.next();
let (_, recv) = futures::future::join(sender, recver).await;
assert_eq!(
recv.unwrap().args().unwrap().key,
"test.test.transaction_signal"
);
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn transaction_passing_test() -> TestResult {
async fn inner(logger: fsipcProxy<'_>) -> TestResult {
logger
.transaction_add("test.test.one", "first", "second", "012")
.await?;
logger
.transaction_add("test.test.two", "uno", "dos", "0113")
.await?;
let trans = logger.transaction_get("test.test").await?;
logger.transaction_fail(trans[0].t_id).await?;
logger.transaction_pass(trans[1].t_id).await?;
logger
.transaction_add("test.test.three", "etta", "tvåa", "0114")
.await?;
let transactions = logger.transaction_get("test.test").await?;
assert_eq!(transactions.len(), 1);
let res = &transactions[0];
assert_eq!(res.key, "test.test.three");
assert_eq!(res.t_id, 3, "Transaction id mismatch");
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn retrieving_data_test() -> TestResult {
async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
ipc.store("test.test.one", "first").await?;
ipc.store("test.test.one", "second").await?;
ipc.store("test.test.one", "third").await?;
ipc.store("test.test.two", "1").await?;
ipc.store("test.test.two", "2").await?;
ipc.store("test.test.two", "3").await?;
let res = ipc.retrieve_all().await?;
for measure in &res {
let data = ipc.retrieve(&measure.key).await?;
assert_eq!(data.key, measure.key);
}
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn valid_key_test() -> TestResult {
async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
assert!(ipc.valid_key("modio.software.development").await?);
assert!(ipc.valid_key("abc").await?);
assert!(ipc.valid_key("a.b.c").await?);
assert!(ipc.valid_key("a_b.c").await?);
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn invalid_key_test() -> TestResult {
async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
assert!(!ipc.valid_key("modio..invalid").await?);
assert!(!ipc.valid_key(".modio..invalid").await?);
assert!(!ipc.valid_key("modio.invalid.").await?);
assert!(!ipc.valid_key("modio. invalid").await?);
assert!(!ipc.valid_key("modio.in valid").await?);
assert!(!ipc.valid_key("modio.invalid ").await?);
assert!(!ipc.valid_key(" modio.invalid").await?);
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn transaction_double() -> TestResult {
async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
let key = "test.test.one";
let first = "first";
let second = "second";
let guid4 = zbus::Guid::generate();
ipc.store(key, first).await?;
ipc.transaction_add(key, first, second, guid4.as_str())
.await?;
let transactions = ipc.transaction_get(key).await?;
let first_transaction = transactions
.get(0)
.expect("Should have at least one transaction");
let res = ipc.transaction_pass(first_transaction.t_id).await;
assert!(res.is_ok());
let res = ipc.transaction_pass(first_transaction.t_id).await;
assert!(res.is_err());
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn transaction_tests() -> TestResult {
async fn inner_transactions(ipc: fsipcProxy<'_>) -> TestResult {
let key = "test.test.one";
let first = "first";
let mut our_value = "first";
let second = "second";
ipc.store(key, our_value).await?;
let when = unixtime();
ipc.store_with_time(key, our_value, when).await?;
let guid1 = zbus::Guid::generate(); ipc.transaction_add(key, first, second, guid1.as_str())
.await?;
let guid2 = zbus::Guid::generate();
ipc.transaction_add(key, first, second, guid2.as_str())
.await?;
let guid3 = zbus::Guid::generate();
ipc.transaction_add(key, first, second, guid3.as_str())
.await?;
let transactions = ipc.transaction_get(key).await?;
for trn in &transactions {
if trn.key == key {
if our_value == trn.expected {
our_value = &trn.target;
ipc.transaction_pass(trn.t_id).await?;
} else {
ipc.transaction_fail(trn.t_id).await?;
}
}
ipc.store(key, our_value).await?;
}
let res = ipc.transaction_get(key).await?;
assert_eq!(res.len(), 0);
Ok(())
}
run_testcase(line!(), inner_transactions).await
}
#[async_std::test]
async fn no_modio_signals() -> TestResult {
async fn no_modio_signals_inner(ipc: fsipcProxy<'_>) -> TestResult {
let mut stream = ipc.receive_store_signal().await?;
ipc.store("test.test.test", "value").await?;
let first = stream.next().await.unwrap();
assert_eq!(first.args()?.key, "test.test.test");
ipc.store("modio.test.test", "value").await?;
let second = stream.next().now_or_never();
assert!(second.is_none());
Ok(())
}
run_testcase(line!(), no_modio_signals_inner).await
}
#[async_std::test]
async fn submit_consume() -> TestResult {
async fn inner_submit_consume(ipc: fsipcProxy<'_>) -> TestResult {
for x in 0..5 {
ipc.store("test.foo", &x.to_string()).await?;
}
let vals = ipc.prepare_datapoints(10).await?;
assert_eq!(vals.len(), 5);
let point = &vals[0];
assert_eq!(point.key, "test.foo");
assert_eq!(point.value, "0");
let more = ipc.prepare_datapoints(10).await?;
assert_eq!(more.len(), 5);
let last = &vals[4];
assert_eq!(last.key, "test.foo");
assert_eq!(last.value, "4");
let mut to_remove = Vec::new();
for m in &vals {
to_remove.push(m.id);
}
ipc.remove_prepared(to_remove).await?;
let after = ipc.prepare_datapoints(30).await?;
assert!(after.is_empty());
Ok(())
}
run_testcase(line!(), inner_submit_consume).await
}
#[async_std::test]
async fn submit_modio_consume() -> TestResult {
async fn inner_submit_modio_consume(ipc: fsipcProxy<'_>) -> TestResult {
for x in 0..5 {
ipc.store("test.foo", &x.to_string()).await?;
ipc.store("modio.test.foo", &x.to_string()).await?;
}
let vals = ipc.prepare_modio_datapoints(20).await?;
assert_eq!(vals.len(), 5);
for point in &vals {
assert_eq!(&point.key, "modio.test.foo");
}
let more = ipc.prepare_modio_datapoints(10).await?;
assert_eq!(more.len(), 5);
let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
ipc.remove_prepared(to_remove).await?;
let modio_after = ipc.prepare_modio_datapoints(30).await?;
assert!(modio_after.is_empty());
let after = ipc.prepare_datapoints(30).await?;
assert!(!after.is_empty());
Ok(())
}
run_testcase(line!(), inner_submit_modio_consume).await
}
#[async_std::test]
async fn test_get_batch() -> TestResult {
async fn inner_get_batch(ipc: fsipcProxy<'_>) -> TestResult {
for x in 0..25 {
ipc.store("test.foo", &x.to_string()).await?;
}
let vals = ipc.prepare_datapoints(10).await?;
assert_eq!(vals.len(), 10);
let point = &vals[0];
assert_eq!(point.key, "test.foo");
assert_eq!(point.value, "0");
let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
ipc.remove_prepared(to_remove).await?;
let vals = ipc.prepare_datapoints(30).await?;
assert_eq!(vals.len(), 15);
let point = &vals[0];
assert_eq!(point.key, "test.foo");
assert_eq!(point.value, "10");
let to_remove: Vec<i64> = vals.iter().map(|x| x.id).collect();
ipc.remove_prepared(to_remove).await?;
let vals = ipc.prepare_datapoints(30).await?;
assert!(vals.is_empty());
Ok(())
}
run_testcase(line!(), inner_get_batch).await
}
#[async_std::test]
async fn test_get_mac() -> TestResult {
async fn inner(logger: fsipcProxy<'_>) -> TestResult {
let res = logger.get_boxid().await?;
assert_eq!(res, "000000000000");
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn test_empty_transactions() -> TestResult {
async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
let transactions = ipc.transaction_get("test").await?;
assert_eq!(transactions.len(), 0);
let values = ipc.retrieve_all().await?;
assert_eq!(values.len(), 0);
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn test_transaction_get_prefix() -> TestResult {
async fn inner(ipc: fsipcProxy<'_>) -> TestResult {
let transactions = ipc.transaction_get("mbus.").await?;
assert_eq!(transactions.len(), 0);
Ok(())
}
run_testcase(line!(), inner).await
}
#[async_std::test]
async fn resume_database_test(ipc: fsipcProxy<'_>) -> TestResult {
async fn spawn_server(
server_name: &str,
path: TestPaths,
) -> Result<
(
fsipcProxy<'static>,
event_listener::Event,
async_std::task::JoinHandle<()>,
),
Box<dyn Error>,
> {
use crate::LOGGER_PATH;
let test_complete = event_listener::Event::new();
let test_waiter = test_complete.listen();
let server = event_listener::Event::new();
let server_ready = server.listen();
dbg!(&server_name);
let client_name = server_name.to_string();
let task = async_std::task::spawn(test_server_with_paths(
server_name.to_string(),
server,
test_waiter,
path,
));
server_ready.await;
let conn = zbus::Connection::session().await?;
let proxy = fsipcProxy::builder(&conn)
.path(LOGGER_PATH)?
.destination(client_name)?
.build()
.await?;
proxy.ping().await?;
Ok((proxy, test_complete, task))
}
let base = Tempbase::new();
let server_name = format!("se.modio.logger.TestCase{}", line!());
let expected = vec![
("test.test.one", "1"),
("test.test.two", "2"),
("test.test.three", "3"),
("test.test.four", "4"),
("test.test.five", "5"),
("test.test.six", "6"),
];
{
let (logger, done, task) = spawn_server(&server_name, base.paths.clone()).await?;
{
for (key, val) in &expected {
logger.store(key, val).await?;
}
for (key, value) in &expected {
let data = logger.retrieve(key).await?;
assert_eq!(&data.value, value);
assert_eq!(&data.key, key);
}
}
done.notify(1);
task.await;
};
{
let (logger, done, task) = spawn_server(&server_name, base.paths.clone()).await?;
for (key, value) in &expected {
let data = logger.retrieve(key).await?;
assert_eq!(&data.value, value);
assert_eq!(&data.key, key);
}
done.notify(1);
task.await;
}
Ok(())
}
}