pub use crate::Error;
use log::{debug, info};
use std::sync::Arc;
pub use super::db::SqlitePool;
use crate::types::{Metric, TXMetric};
#[derive(Clone)]
pub struct Datastore {
pool: SqlitePool,
buffer: VecBuffer,
_dbfile: Option<Arc<tempfile::NamedTempFile>>,
drop_event: Arc<tokio::sync::Notify>,
}
pub use crate::buffer::{inixtime, unixtime};
use crate::buffer::{Buffer, TimeStatus, VecBuffer};
impl Datastore {
pub async fn new(pool: SqlitePool) -> Result<Self, Error> {
let buffer = VecBuffer::new().await?;
let drop_event = Datastore::drop_event(pool.clone(), buffer.clone());
let res = Self {
pool,
buffer,
_dbfile: None,
drop_event,
};
Ok(res)
}
#[must_use]
pub fn pool(&self) -> SqlitePool {
self.pool.clone()
}
#[cfg(test)]
async fn get_name(&self, key: &str) -> Result<String, Error> {
let select_sensor = sqlx::query_as("SELECT name FROM sensor WHERE sensor.name = ?");
let res: (String,) = select_sensor.bind(key).fetch_one(&self.pool).await?;
Ok(res.0)
}
#[cfg(test)]
async fn check_tempdata(&self) -> Result<(), Error> {
let oldest = self.buffer.oldest().await?;
info!("Oldst value is {}", oldest);
Ok(())
}
#[cfg(test)]
async fn sensor_id(&self, key: &str) -> Result<i64, Error> {
let add_sensor = sqlx::query("INSERT OR IGNORE INTO sensor(name) VALUES (?)");
add_sensor.bind(key).execute(&self.pool).await?;
let sensor_by_name =
sqlx::query_as("SELECT s_id FROM sensor WHERE sensor.name = $1 ORDER BY s_id DESC");
let row: (i64,) = sensor_by_name.bind(key).fetch_one(&self.pool).await?;
Ok(row.0)
}
pub async fn count_transactions(&self) -> Result<i64, Error> {
let count_transactions = sqlx::query_as("SELECT COUNT(*) FROM changes");
let row: (i64,) = count_transactions.fetch_one(&self.pool).await?;
Ok(row.0)
}
pub async fn count_metrics(&self) -> Result<i32, Error> {
let count_query = sqlx::query_scalar!(
r#"SELECT count(*) FROM logdata JOIN sensor USING(s_id) WHERE logdata.status = 'NONE' AND sensor.name NOT LIKE 'modio.%';"#
);
let count = count_query.fetch_one(&self.pool).await?;
Ok(count)
}
pub async fn count_metrics_internal(&self) -> Result<i32, Error> {
let count_query = sqlx::query_scalar!(
r#"SELECT count(*) FROM logdata JOIN sensor USING(s_id) WHERE logdata.status = 'NONE' AND sensor.name LIKE 'modio.%';"#
);
let count = count_query.fetch_one(&self.pool).await?;
Ok(count)
}
pub async fn count_metrics_removed(&self) -> Result<i32, Error> {
let count_query = sqlx::query_scalar!(
r#"SELECT count(*) FROM logdata WHERE logdata.status = 'REMOVED';"#
);
let count = count_query.fetch_one(&self.pool).await?;
Ok(count)
}
pub async fn count_metrics_timefail(&self) -> Result<i32, Error> {
let count_query = sqlx::query_scalar!(
r#"SELECT count(*) FROM logdata WHERE logdata.status = 'TIMEFAIL';"#
);
let count = count_query.fetch_one(&self.pool).await?;
Ok(count)
}
pub async fn insert(
&self,
key: &str,
value: &str,
time: i64,
timefail: bool,
) -> Result<(), Error> {
let arf = vec![Metric {
name: key.into(),
value: value.into(),
time,
}];
self.insert_bulk(arf, timefail).await
}
async fn insert_bulk_buffer(&self, data: Vec<Metric>, status: TimeStatus) -> Result<(), Error> {
for metric in data {
self.buffer.add_metric(metric, status).await?;
}
Ok(())
}
pub async fn insert_bulk(&self, data: Vec<Metric>, timefail: bool) -> Result<(), Error> {
let mut tx = self.pool.begin().await?;
for metric in &data {
sqlx::query!("INSERT OR IGNORE INTO sensor(name) VALUES (?)", metric.name)
.execute(&mut *tx)
.await?;
}
tx.commit().await?;
let status = if timefail {
TimeStatus::TimeFail
} else {
TimeStatus::None
};
self.insert_bulk_buffer(data, status).await?;
Ok(())
}
}
impl Datastore {
pub async fn should_persist(&self) -> Result<bool, Error> {
const CUTOFF: usize = 100;
let num = self.buffer.count().await?;
info!("We have {} values in buffer", num);
Ok(num > CUTOFF)
}
pub async fn should_persist_age(&self) -> Result<bool, Error> {
const MAX_AGE: i64 = 57;
let now = inixtime();
let eldest = self.buffer.oldest().await?;
let age = now - eldest;
debug!("Oldest value in buffer is {} ( age: {}s )", &eldest, &age);
Ok(age > MAX_AGE)
}
pub async fn should_persist_key(&self, key: &str) -> Result<bool, Error> {
let res = self.buffer.has_name(key).await?;
Ok(res)
}
async fn persist_data_raw(
delete_buffer: Vec<(Metric, TimeStatus)>,
pool: &SqlitePool,
) -> Result<(), Error> {
let mut pool_tx = pool.begin().await?;
for (metric, tstatus) in delete_buffer {
let status = match tstatus {
TimeStatus::None => "NONE",
TimeStatus::TimeFail => "TIMEFAIL",
};
let add_logdata = sqlx::query!(
"\
INSERT INTO logdata (s_id, value, time, status) \
SELECT s_id, $2, $3, $4 \
FROM sensor WHERE sensor.name = $1",
metric.name,
metric.value,
metric.time,
status,
);
add_logdata.execute(&mut *pool_tx).await?;
}
pool_tx.commit().await?;
Ok(())
}
pub async fn persist_data(&self) -> Result<(), Error> {
let delete_buffer = self.buffer.consume_metrics().await?;
Datastore::persist_data_raw(delete_buffer, &self.pool).await?;
Ok(())
}
fn drop_event(pool: SqlitePool, buffer: VecBuffer) -> Arc<tokio::sync::Notify> {
let notify = Arc::new(tokio::sync::Notify::new());
let waiting = notify.clone();
tokio::task::spawn(async move {
waiting.notified().await;
info!("Persisting Buffered data to disk due to deallocation");
let delete_buffer = buffer
.consume_metrics()
.await
.expect("Failed to drain buffer");
if !delete_buffer.is_empty() {
Datastore::persist_data_raw(delete_buffer, &pool)
.await
.expect("Failed to persist data");
};
info!("Closing pool due to permanence.");
pool.close().await;
});
notify
}
}
impl Drop for Datastore {
fn drop(&mut self) {
let size = self.pool.size();
let closed = self.pool.is_closed();
debug!("Dropping datastore size={size}, closed={closed}");
self.drop_event.notify_waiters();
}
}
impl Datastore {
pub async fn get_batch(&self, size: u32) -> Result<Vec<TXMetric>, Error> {
self.persist_data().await?;
let get_external_batch = sqlx::query_as!(
TXMetric,
"\
SELECT id, name, value, time \
FROM logdata \
JOIN sensor USING(s_id) \
WHERE logdata.status = 'NONE' AND sensor.name NOT LIKE 'modio.%' \
ORDER BY ID ASC LIMIT $1",
size
);
let res = get_external_batch.fetch_all(&self.pool).await?;
Ok(res)
}
pub async fn get_internal_batch(&self, size: u32) -> Result<Vec<TXMetric>, Error> {
self.persist_data().await?;
let get_internal_batch = sqlx::query_as!(
TXMetric,
"\
SELECT id, name, value, time \
FROM logdata \
JOIN sensor USING(s_id) \
WHERE logdata.status = 'NONE' AND sensor.name LIKE 'modio.%' \
ORDER BY ID ASC LIMIT $1",
size
);
let res = get_internal_batch.fetch_all(&self.pool).await?;
Ok(res)
}
pub async fn drop_batch(&self, ids: &[i64]) -> Result<(), Error> {
let mut tx = self.pool.begin().await?;
for id in ids {
let query = sqlx::query!(
"UPDATE logdata SET status = 'REMOVED' WHERE id = $1 AND status = 'NONE'",
id
);
query.execute(&mut *tx).await?;
}
tx.commit().await?;
Ok(())
}
}
impl Datastore {
pub async fn fix_timefail(&self, adjust: f32) -> Result<u64, Error> {
self.persist_data().await?;
let mut tx = self.pool.begin().await?;
let fix_timefail = sqlx::query!(
"\
UPDATE logdata \
SET time = time + $1, status = 'NONE' \
WHERE status = 'TIMEFAIL'",
adjust
);
let res = fix_timefail.execute(&mut *tx).await?;
tx.commit().await?;
let count = res.rows_affected();
info!(
"Updated TIMEFAIL status for count={} items with offset={}",
&count, adjust
);
Ok(count)
}
}
impl Datastore {
pub async fn get_last_datapoint(&self, key: &str) -> Result<Metric, Error> {
if self.should_persist_key(key).await? {
self.persist_data().await?;
};
let last_value = sqlx::query_as!(
Metric,
r#"SELECT name as "name!", value as "value!", time as "time!" FROM logdata JOIN sensor USING(s_id) WHERE sensor.name = $1 ORDER BY time DESC LIMIT 1"#,
key
);
let res = last_value.fetch_one(&self.pool).await?;
Ok(res)
}
pub async fn get_latest_logdata(&self) -> Result<Vec<Metric>, Error> {
self.persist_data().await?;
let get_last_all_points = sqlx::query_as!(
Metric,
r#"SELECT name as "name!", value as "value!", MAX(time) as "time!: i64" FROM logdata JOIN sensor USING(s_id) GROUP BY name ORDER BY time ASC"#,
);
let res = get_last_all_points.fetch_all(&self.pool).await?;
Ok(res)
}
}
mod changes {
use super::Datastore;
use crate::buffer::unixtime;
use crate::types::Transaction;
use crate::Error;
use log::{debug, info};
impl Datastore {
pub async fn has_transaction(&self, token: &str) -> Result<bool, Error> {
let count_query =
sqlx::query_scalar!("SELECT count(*) FROM changes WHERE token = $1", token);
let res = count_query.fetch_one(&self.pool).await?;
Ok(res > 0)
}
pub async fn transaction_add(
&self,
key: &str,
expected: &str,
target: &str,
token: &str,
) -> Result<(), Error> {
let mut tx = self.pool.begin().await?;
info!(
"Storing transaction {}, '{}' => '{}'",
&key, &expected, &target
);
let internal_key = format!("mytemp.internal.change.{key}");
sqlx::query!("INSERT OR IGNORE INTO sensor(name) VALUES (?)", key)
.execute(&mut *tx)
.await?;
sqlx::query!(
"INSERT OR IGNORE INTO sensor(name) VALUES (?)",
internal_key
)
.execute(&mut *tx)
.await?;
sqlx::query!(
"\
INSERT INTO changes(s_id, token, expected, target) \
SELECT s_id, $2, $3, $4 \
FROM sensor \
WHERE sensor.name = $1",
key,
token,
expected,
target,
)
.execute(&mut *tx)
.await?;
tx.commit().await?;
Ok(())
}
pub async fn transaction_get(&self, prefix: &str) -> Result<Vec<Transaction>, Error> {
let mut tx = self.pool.begin().await?;
debug!("Marking transactions as pending for prefix={}", prefix);
let transaction_pending = sqlx::query!(
"\
UPDATE changes SET status = 'PENDING' \
FROM changes J1 JOIN sensor USING(s_id) \
WHERE changes.status = 'NONE' AND sensor.name LIKE $1 || '%'",
prefix
);
transaction_pending.execute(&mut *tx).await?;
let transaction_get = sqlx::query_as!(
Transaction,
"\
SELECT t_id, name, expected, target, status \
FROM changes JOIN sensor USING(s_id) \
WHERE changes.status = 'PENDING' AND sensor.name LIKE $1 || '%'",
prefix
);
let res = transaction_get.fetch_all(&mut *tx).await?;
tx.commit().await?;
Ok(res)
}
async fn transaction_mark_inner(
&self,
transaction_id: i64,
success: bool,
timefail: bool,
) -> Result<u64, Error> {
let logval = if success { "1" } else { "0" };
let transaction_result_str = if success { "SUCCESS" } else { "FAILED" };
let status = if timefail { "TIMEFAIL" } else { "NONE" };
let when = unixtime();
let transaction_result_key = sqlx::query!(
"\
SELECT 'mytemp.internal.change.'||sensor.name as name \
FROM sensor JOIN changes USING(s_id) \
WHERE changes.t_id=$1",
transaction_id
);
let change_key = transaction_result_key.fetch_one(&self.pool).await?;
let change_key = change_key.name;
let count = {
let mut tx = self.pool.begin().await?;
let mark_transaction = sqlx::query!(
"\
UPDATE changes SET status = $2 \
WHERE changes.t_id = $1 AND changes.status = 'PENDING'\
",
transaction_id,
transaction_result_str,
);
let res = mark_transaction.execute(&mut *tx).await?;
let count = res.rows_affected();
sqlx::query!("INSERT OR IGNORE INTO sensor(name) VALUES (?)", change_key)
.execute(&mut *tx)
.await?;
#[allow(clippy::cast_possible_wrap)]
let new_when = when as i64;
let add_logdata = sqlx::query!(
"\
INSERT INTO logdata (s_id, value, time, status) \
SELECT s_id, $2, $3, $4 \
FROM sensor \
WHERE sensor.name = $1",
change_key,
logval,
new_when,
status,
);
add_logdata.execute(&mut *tx).await?;
tx.commit().await?;
count
};
Ok(count)
}
pub async fn transaction_fail(
&self,
transaction_id: i64,
timefail: bool,
) -> Result<u64, Error> {
debug!("Failing transaction with id={}", transaction_id);
let count = self
.transaction_mark_inner(transaction_id, false, timefail)
.await?;
Ok(count)
}
pub async fn transaction_pass(
&self,
transaction_id: i64,
timefail: bool,
) -> Result<u64, Error> {
debug!("Passing transaction with id={}", transaction_id);
let count = self
.transaction_mark_inner(transaction_id, true, timefail)
.await?;
Ok(count)
}
pub async fn transaction_fail_pending(&self) -> Result<u64, Error> {
let transaction_fail_pending = sqlx::query!(
"\
UPDATE changes SET status = 'FAILED' \
WHERE status = 'PENDING'"
);
info!("Failing all pending transactions");
let count = {
let mut tx = self.pool.begin().await?;
let res = transaction_fail_pending.execute(&mut *tx).await?;
tx.commit().await?;
res.rows_affected()
};
Ok(count)
}
}
}
mod clean {
use super::Datastore;
use crate::Error;
use log::{debug, info, warn};
impl Datastore {
pub async fn delete_old_transactions(&self) -> Result<u64, Error> {
let transaction_delete_done = sqlx::query!(
"\
DELETE FROM changes \
WHERE status in ('FAILED', 'SUCCESS') \
and t_id NOT IN (\
SELECT MAX(t_id) as t_id \
FROM changes \
GROUP BY s_id ORDER BY t_id ASC)",
);
debug!("Deleting old transactions");
let count = {
let mut tx = self.pool.begin().await?;
let res = transaction_delete_done.execute(&mut *tx).await?;
tx.commit().await?;
res.rows_affected()
};
Ok(count)
}
pub async fn fail_queued_transactions(&self) -> Result<u64, Error> {
let fail_queued_transactions = sqlx::query!(
"\
WITH victims AS ( \
SELECT t_id, s_id, \
ROW_NUMBER() OVER (\
PARTITION BY s_id ORDER BY t_id DESC) AS row \
FROM changes WHERE changes.status='NONE') \
UPDATE changes SET status='FAILED' \
WHERE t_id IN (SELECT t_id FROM victims WHERE victims.row >= 17);"
);
debug!("Marking piled up queued transactions as failed");
let count = {
let mut tx = self.pool.begin().await?;
let res = fail_queued_transactions.execute(&mut *tx).await?;
tx.commit().await?;
res.rows_affected()
};
if count > 0 {
warn!(
"Many unhandled transactions for single keys found, marked {} as failed.",
count
);
};
Ok(count)
}
pub async fn delete_old_logdata(&self) -> Result<u64, Error> {
let logdata_delete_done = sqlx::query!(
"\
DELETE FROM logdata \
WHERE status = 'REMOVED' AND id NOT IN (\
SELECT id FROM (\
SELECT id, s_id, MAX(time) as time \
FROM logdata GROUP BY s_id ORDER BY time ASC));",
);
debug!("Deleting old and removed log data");
let count = {
let mut tx = self.pool.begin().await?;
let res = logdata_delete_done.execute(&mut *tx).await?;
tx.commit().await?;
res.rows_affected()
};
Ok(count)
}
pub async fn delete_random_data(&self) -> Result<u64, Error> {
let count1 = self.delete_old_logdata().await?;
let delete_random = sqlx::query!(
"DELETE FROM logdata WHERE ((random() / 9223372036854775808.0 + 1) / 2) < 0.25",
);
warn!("Deleting random data");
let count2 = {
let mut tx = self.pool.begin().await?;
let res = delete_random.execute(&mut *tx).await?;
tx.commit().await?;
res.rows_affected()
};
let result = count1 + count2;
info!(
"Deleted items, random={}, old={}, total={}.",
count2, count1, result
);
sqlx::query!("VACUUM;").execute(&self.pool).await?;
Ok(result)
}
pub async fn need_vacuum_or_shrink(&self) -> Result<usize, Error> {
const VACUUM_MIN_RATIO: f32 = 0.8;
const VACUUM_MIN_SIZE: f32 = 512.0 * 1024.0;
const MAX_SIZE: f32 = 24.0 * 1024.0 * 1024.0;
let res: (i32,) = sqlx::query_as("pragma page_count")
.fetch_one(&self.pool)
.await?;
#[allow(clippy::cast_precision_loss)]
let pages = res.0 as f32;
let res: (i32,) = sqlx::query_as("pragma freelist_count")
.fetch_one(&self.pool)
.await?;
#[allow(clippy::cast_precision_loss)]
let freepages = res.0 as f32;
let res: (i32,) = sqlx::query_as("pragma page_size")
.fetch_one(&self.pool)
.await?;
#[allow(clippy::cast_precision_loss)]
let pagesize: f32 = res.0 as f32;
if freepages * pagesize > VACUUM_MIN_SIZE && freepages > pages * VACUUM_MIN_RATIO {
info!("Vacuuming due to size");
sqlx::query("VACUUM;").execute(&self.pool).await?;
}
if freepages == 0.0 && pages * pagesize >= MAX_SIZE {
info!("DB out of size, removing random data");
self.delete_random_data().await?;
}
Ok(0)
}
}
}
impl Datastore {
pub async fn temporary() -> Datastore {
use crate::db::SqlitePoolBuilder;
use tempfile::Builder;
#[cfg(test)]
{
let _elog = env_logger::builder().is_test(true).try_init();
};
let dbfile = Builder::new()
.prefix("database")
.suffix(".sqlite")
.tempfile()
.expect("Error on tempfile");
let pool = SqlitePoolBuilder::new()
.db_path(dbfile.path())
.migrate(true)
.build()
.await
.expect("Failed to build pool");
let buffer = VecBuffer::new().await.expect("Failed to create buffer");
let drop_event = Datastore::drop_event(pool.clone(), buffer.clone());
Self {
pool,
buffer,
_dbfile: Some(dbfile.into()),
drop_event,
}
}
}
#[cfg(test)]
fn metrc(name: &str, value: &str, time: i64) -> super::Metric {
super::Metric {
name: name.into(),
value: value.into(),
time,
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::db::SqlitePoolBuilder;
use std::error::Error;
type TestResult = Result<(), Box<dyn Error>>;
#[tokio::test]
async fn has_file_database() -> TestResult {
use tempfile::Builder;
let tempfile = Builder::new()
.prefix("loggerdb_has_file_database")
.suffix(".sqlite")
.tempfile()?;
let named_path = tempfile.path();
let pool = SqlitePoolBuilder::new()
.db_path(named_path)
.migrate(true)
.build()
.await
.unwrap();
sqlx::query("SELECT * FROM sensor where sensor.name = 'mytemp.internal.sensors'")
.fetch_one(&pool)
.await?;
pool.close().await;
drop(pool);
Ok(())
}
#[tokio::test]
async fn datastore_tempdata() {
let ds = Datastore::temporary().await;
ds.check_tempdata()
.await
.expect("Should be able to get data from temp table");
}
#[tokio::test]
async fn datastore_names() -> TestResult {
let ds = Datastore::temporary().await;
let res = ds.get_name("mytemp.internal.sensors").await?;
assert_eq!(res, "mytemp.internal.sensors");
Ok(())
}
#[tokio::test]
async fn add_one_name() -> TestResult {
let ds = Datastore::temporary().await;
let res = ds.sensor_id("test.test.test").await?;
assert_eq!(res, 2);
Ok(())
}
#[tokio::test]
async fn add_two_names() -> TestResult {
let ds = Datastore::temporary().await;
let res = ds.sensor_id("test.test.test").await?;
assert_eq!(res, 2);
let res = ds.sensor_id("test.test.test").await?;
assert_eq!(res, 2);
let res = ds.sensor_id("test.test.test").await?;
assert_eq!(res, 2);
let res = ds.get_name("test.test.test").await?;
assert_eq!(res, "test.test.test");
Ok(())
}
#[tokio::test]
async fn insert_timefail() -> TestResult {
let ds = Datastore::temporary().await;
ds.insert("test.test.ok", "value", 1_620_850_252, false)
.await?;
ds.insert("test.test.ok", "value1", 1_620_850_253, false)
.await?;
ds.insert("test.test.ok", "value2", 1_620_850_255, false)
.await?;
ds.insert("test.test.fimefail", "value", 1_620_850_252, true)
.await?;
Ok(())
}
#[tokio::test]
async fn retrieve_last_empty() -> TestResult {
let ds = Datastore::temporary().await;
let pool = ds.pool();
ds.insert("test.test.ok", "value", 1_620_850_252, false)
.await?;
let before = ds.get_last_datapoint("test.test.ok").await;
assert!(before.is_ok(), "Should have a value");
let delete_all = sqlx::query("DELETE FROM logdata");
delete_all.execute(&pool).await?;
let after = ds.get_last_datapoint("test.test.ok").await;
assert!(after.is_err(), "Should fail");
Ok(())
}
#[tokio::test]
async fn retrieve_sorting() -> TestResult {
let ds = Datastore::temporary().await;
ds.insert("test.test.one", "value0", 1_620_850_000, false)
.await?;
ds.insert("test.test.one", "value1", 1_620_850_111, true)
.await?;
ds.insert("test.test.one", "value3", 1_620_850_222, false)
.await?;
ds.insert("test.test.two", "value3", 1_620_850_333, true)
.await?;
ds.insert("test.test.two", "value1", 1_620_850_222, false)
.await?;
ds.insert("test.test.two", "value0", 1_620_850_111, true)
.await?;
let res = ds.get_last_datapoint("test.test.two").await?;
assert_eq!(res.name, "test.test.two");
assert_eq!(res.value, "value3");
assert_eq!(res.time, 1_620_850_333);
let res = ds.get_last_datapoint("test.test.one").await?;
assert_eq!(res.name, "test.test.one");
assert_eq!(res.value, "value3");
assert_eq!(res.time, 1_620_850_222);
Ok(())
}
#[tokio::test]
async fn insert_bulk() -> TestResult {
let ds = Datastore::temporary().await;
let vals = vec![
metrc("test.test.one", "etta", 16_208_501_111),
metrc("test.test.two", "etta", 16_208_501_111),
metrc("test.test.three", "etta", 16_208_501_112),
metrc("test.test.one", "tvåa", 16_208_502_222),
metrc("test.test.two", "tvåa", 16_208_502_223),
metrc("test.test.three", "tvåa", 16_208_502_222),
metrc("test.test.one", "trea", 16_208_503_333),
metrc("test.test.two", "trea", 16_208_503_331),
metrc("test.test.three", "trea", 16_208_503_333),
metrc("test.test.one", "fyra", 16_208_504_444),
];
ds.insert_bulk(vals, true).await?;
let res = ds.get_last_datapoint("test.test.one").await?;
assert_eq!(res.value, "fyra");
let res = ds.get_last_datapoint("test.test.three").await?;
assert_eq!(res.value, "trea");
let res = ds.get_last_datapoint("test.test.two").await?;
assert_eq!(res.value, "trea");
Ok(())
}
#[tokio::test]
async fn insert_persist() -> TestResult {
let ds = Datastore::temporary().await;
let vals = vec![
metrc("test.test.one", "etta", 16_208_501_111),
metrc("test.test.two", "etta", 16_208_501_111),
metrc("test.test.three", "etta", 16_208_501_112),
];
let seq = 0..1000;
let more: Vec<Metric> = seq
.map(|x| metrc("test.test.three", &format!("{x}"), inixtime()))
.collect();
ds.insert_bulk(vals, true).await?;
let first = ds.should_persist().await?;
assert!(!first, "Should not need persist with only 3 values");
let should = ds.should_persist_key("test.test.one").await?;
assert!(
should,
"Should need persist because test.test.one is in buffer"
);
ds.insert_bulk(more, true).await?;
let second = ds.should_persist().await?;
assert!(second, "Should need persist with more values");
let res = ds.persist_data().await;
assert!(res.is_ok(), "We should have succesfully persisted data");
let third = ds.should_persist().await?;
assert!(!third, "We should not need persist again");
let should = ds.should_persist_key("test.test.one").await?;
assert!(!should, "This key should no longer be in the buffer");
ds.persist_data().await?;
Ok(())
}
#[tokio::test]
async fn persist_delete_and_purge() -> TestResult {
let ds = Datastore::temporary().await;
let vals: Vec<Metric> = (0..1000)
.map(|x| {
metrc(
&format!("test.test.bulk.{}", x % 7),
&x.to_string(),
inixtime(),
)
})
.collect();
ds.insert_bulk(vals, true).await?;
let should_count = ds.should_persist().await?;
let should_age = ds.should_persist_age().await?;
assert!(should_count, "Should persist based on count");
assert!(!should_age, "Oldest should not be that big");
ds.persist_data().await?;
let count = ds.delete_random_data().await?;
assert!(count > 0, "Should have deleted some data");
Ok(())
}
#[tokio::test]
async fn delete_old_logged_values() -> TestResult {
let ds = Datastore::temporary().await;
ds.insert("test.test.ok", "one", 1_620_850_000, false)
.await?;
ds.insert("test.test.ok", "two", 1_999_950_251, false)
.await?;
let metric = ds.get_last_datapoint("test.test.ok").await?;
assert_eq!(metric.value, "two");
let to_xmit = ds.get_batch(50).await?;
let mut to_remove = Vec::<i64>::with_capacity(5);
for i in to_xmit {
to_remove.push(i.id);
}
ds.drop_batch(&to_remove).await?;
let metric = ds.get_last_datapoint("test.test.ok").await?;
assert_eq!(metric.value, "two");
let count = ds.delete_old_logdata().await?;
assert_eq!(count, 1);
let metric = ds.get_last_datapoint("test.test.ok").await?;
assert_eq!(metric.value, "two");
let count = ds.delete_old_logdata().await?;
assert_eq!(count, 0);
Ok(())
}
#[tokio::test]
async fn get_latest_datapoints() -> TestResult {
let ds = Datastore::temporary().await;
let vals = vec![
metrc("test.test.one", "etta", 16_208_501_111),
metrc("test.test.two", "etta", 16_208_501_111),
metrc("test.test.three", "etta", 16_208_501_112),
metrc("test.test.one", "tvåa", 16_208_502_222),
metrc("test.test.two", "tvåa", 16_208_502_223),
metrc("test.test.three", "tvåa", 16_208_502_222),
metrc("test.test.one", "trea", 16_208_503_333),
metrc("test.test.three", "trea", 16_208_503_333),
metrc("test.test.one", "fyra", 16_208_504_444),
];
ds.insert_bulk(vals, true).await?;
let res = ds.get_latest_logdata().await?;
assert_eq!(res[0].name, "test.test.two");
assert_eq!(res[0].value, "tvåa");
assert_eq!(res[1].name, "test.test.three");
assert_eq!(res[1].value, "trea");
assert_eq!(res[2].name, "test.test.one");
assert_eq!(res[2].value, "fyra");
assert_eq!(res.len(), 3);
Ok(())
}
#[tokio::test]
async fn get_transactions() -> TestResult {
let ds = Datastore::temporary().await;
let vals = vec![
metrc("test.test.one", "etta", 16_208_501_111),
metrc("test.test.two", "tvåa", 16_208_504_444),
];
ds.insert_bulk(vals, false).await?;
ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
.await?;
let res = ds.transaction_get("test.test.one").await?;
assert_eq!(res.len(), 1, "Expecting only one result");
assert_eq!(
res[0].name, "test.test.one",
"Expecting key to match added transaction"
);
assert_eq!(res[0].expected, "etta", "Epected value mismatch");
assert_eq!(res[0].target, "ettatvåa", "Target value mismatch");
assert_eq!(res[0].t_id, 1, "Transaction ID mismatch");
Ok(())
}
async fn with_keys() -> Datastore {
let ds = Datastore::temporary().await;
let vals = vec![
metrc("test.test.one", "etta", 16_208_501_111),
metrc("test.test.two", "etta", 16_208_501_111),
metrc("test.test.three", "etta", 16_208_501_112),
metrc("test.test.two", "tvåa", 16_208_502_223),
metrc("test.test.three", "tvåa", 16_208_502_222),
metrc("test.test.three", "trea", 16_208_503_333),
];
ds.insert_bulk(vals, false).await.unwrap();
ds
}
#[tokio::test]
async fn transmit_drop() -> TestResult {
let ds = with_keys().await;
ds.insert("modio.test.one", "modio-ett", 16_208_502_222, false)
.await?;
ds.insert("modio.test.one", "modio-ett", 16_208_502_222, true)
.await?;
let to_xmit = ds.get_batch(50).await?;
let mut to_remove = Vec::<i64>::with_capacity(50);
for i in to_xmit {
to_remove.push(i.id);
}
ds.drop_batch(&to_remove).await?;
let second = ds.get_batch(10).await?;
assert_eq!(second.len(), 0, "No elements should remain");
let third = ds.get_internal_batch(5).await?;
assert_eq!(
third.len(),
1,
"Expecting an internal value, because one is timefailed."
);
Ok(())
}
#[tokio::test]
async fn timefail_handling_internal() -> TestResult {
let ds = with_keys().await;
ds.insert("modio.test.one", "modio-ett", 16_208_502_222, true)
.await?;
ds.insert("modio.test.one", "modio-ett", 16_208_502_221, true)
.await?;
let res = ds.get_internal_batch(10).await?;
assert_eq!(res.len(), 0);
ds.fix_timefail(10.0).await?;
let res = ds.get_internal_batch(10).await?;
assert_eq!(res.len(), 2);
Ok(())
}
#[tokio::test]
async fn timefail_handling() -> TestResult {
let ds = with_keys().await;
ds.insert("test.test.one", "modio-ett", 16_208_502_222, true)
.await?;
ds.insert("test.test.two", "modio-två", 16_208_502_221, true)
.await?;
let res = ds.get_batch(10).await?;
assert_eq!(res.len(), 6);
ds.fix_timefail(10.0).await?;
let res = ds.get_batch(10).await?;
assert_eq!(res.len(), 8);
Ok(())
}
#[tokio::test]
async fn fail_transactions() -> TestResult {
let ds = with_keys().await;
ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
.await?;
ds.transaction_add("test.test.two", "etta", "ettatvåa", "YYyyyyYYY")
.await?;
let first = ds.transaction_get("test.test.one").await?;
assert_eq!(first.len(), 1, "Expecting only one result");
assert_eq!(
first[0].name, "test.test.one",
"Expecting key to match added transaction"
);
ds.transaction_fail(first[0].t_id, false).await?;
let logrow = ds
.get_last_datapoint("mytemp.internal.change.test.test.one")
.await?;
assert_eq!(logrow.value, "0");
let second = ds.transaction_get("test.test.one").await?;
assert_eq!(second.len(), 0, "Should not have pending transactions");
let third = ds.transaction_get("test.test").await?;
assert_eq!(third.len(), 1, "Should have pending for test.test.two");
assert_eq!(
third[0].name, "test.test.two",
"Expecting key to match transaction two"
);
Ok(())
}
#[tokio::test]
async fn empty_fetch() {
let ds = with_keys().await;
let res = ds.get_last_datapoint("abc.def.ghi").await;
assert!(res.is_err(), "Should have an error from absent keys");
}
#[tokio::test]
async fn pass_transactions() -> TestResult {
let ds = with_keys().await;
ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
.await?;
ds.transaction_add("test.test.two", "etta", "ettatvåa", "YYyyyyYYY")
.await?;
let first = ds.transaction_get("test.test.one").await?;
assert_eq!(first.len(), 1, "Expecting only one result");
assert_eq!(
first[0].name, "test.test.one",
"Expecting key to match added transaction"
);
ds.transaction_pass(first[0].t_id, false).await?;
let logrow = ds
.get_last_datapoint("mytemp.internal.change.test.test.one")
.await?;
assert_eq!(logrow.value, "1");
let second = ds.transaction_get("test.test.one").await?;
assert_eq!(second.len(), 0, "Should not have pending transactions");
let third = ds.transaction_get("test.test").await?;
assert_eq!(third.len(), 1, "Should have pending for test.test.two");
assert_eq!(
third[0].name, "test.test.two",
"Expecting key to match transaction two"
);
let third_row = ds
.get_last_datapoint("mytemp.internal.change.test.test.two")
.await;
assert!(third_row.is_err());
Ok(())
}
#[tokio::test]
async fn delete_old_transactions() -> TestResult {
let ds = with_keys().await;
ds.transaction_add("test.test.one", "etta", "ettatvåa", "xxxXXxx")
.await?;
ds.transaction_add("test.test.one", "tvåa", "ettatvåa", "zzZZZzzz")
.await?;
ds.transaction_add("test.test.two", "etta", "ettatvåa", "YYyyyyYYY")
.await?;
assert_eq!(ds.count_transactions().await?, 3);
let trans = ds.transaction_get("test.test.one").await?;
ds.transaction_pass(trans[0].t_id, false).await?;
ds.transaction_fail(trans[1].t_id, false).await?;
assert_eq!(ds.count_transactions().await?, 3);
let to_xmit = ds.get_batch(50).await?;
let mut to_remove = Vec::<i64>::with_capacity(5);
for i in to_xmit {
to_remove.push(i.id);
}
ds.drop_batch(&to_remove).await?;
assert_eq!(ds.count_transactions().await?, 3);
let count = ds.delete_old_transactions().await?;
assert_eq!(count, 1);
assert_eq!(ds.count_transactions().await?, 2);
let count = ds.delete_old_transactions().await?;
assert_eq!(count, 0);
Ok(())
}
#[tokio::test]
async fn fail_queued_transactions() -> TestResult {
use uuid::Uuid;
let ds = with_keys().await;
for x in 0..18 {
let tok = Uuid::new_v4().hyphenated().to_string();
let val = format!("newval{x}");
ds.transaction_add("test.test.one", "etta", &val, &tok)
.await?;
}
ds.transaction_add("test.test.two", "etta", "ettatvåa", "xxxXXxx")
.await?;
ds.transaction_add("test.test.three", "tvåa", "ettatvåa", "zzZZZzzz")
.await?;
ds.transaction_add("test.test.four", "etta", "ettatvåa", "YYyyyyYYY")
.await?;
ds.transaction_add("test.test.one", "etta", "lasttarget", "xxXXxxxXXxxx")
.await?;
assert_eq!(ds.count_transactions().await?, 22);
let count = ds.fail_queued_transactions().await?;
assert_eq!(count, 3, "3 should be marked as failed");
ds.delete_old_transactions().await?;
assert_eq!(ds.count_transactions().await?, 19, "19 should exist");
let res = ds.transaction_get("test.test.one").await?;
let lastval = res.last().unwrap();
assert_eq!(
lastval.target, "lasttarget",
"Expecting last transaction to be the last added"
);
Ok(())
}
}
mod metadata {
use super::{Datastore, Error};
use crate::types::Metadata;
use crate::types::SensorMode;
use crate::types::ValueMap;
use log::{debug, info};
use std::collections::{BTreeMap, HashMap};
impl Datastore {
pub async fn metadata_get_names(&self) -> Result<Vec<Metadata>, Error> {
info!("Requested all names");
let mut vals = self.metadata_get_tag("name").await?;
let res = vals
.drain(..)
.map(|(key, name)| Metadata::builder(key).name(name).build())
.collect();
Ok(res)
}
pub async fn metadata_get_units(&self) -> Result<Vec<Metadata>, Error> {
info!("Requested all units");
let mut vals = self.metadata_get_tag("unit").await?;
let res = vals
.drain(..)
.map(|(key, unit)| Metadata::builder(key).unit(unit).build())
.collect();
Ok(res)
}
pub async fn metadata_get_descriptions(&self) -> Result<Vec<Metadata>, Error> {
info!("Requested all units");
let mut vals = self.metadata_get_tag("description").await?;
let res = vals
.drain(..)
.map(|(key, description)| Metadata::builder(key).description(description).build())
.collect();
Ok(res)
}
pub async fn metadata_get_enum(&self) -> Result<Vec<Metadata>, Error> {
info!("Requested all value maps");
let mut vals = self.metadata_get_tag("enum").await?;
let res = vals
.drain(..)
.map(|(key, stringy)| Metadata::builder(key).value_map_string(&stringy).build())
.collect();
Ok(res)
}
pub async fn get_metadata(&self, key: &str) -> Result<Option<Metadata>, Error> {
debug!("Requested metadata for key={}", key);
let query = sqlx::query!(
"\
SELECT sensor.name as key, tag, value \
FROM tag_single \
JOIN sensor USING(s_id) \
WHERE sensor.name = $1",
key
);
let pairs = query.fetch_all(&self.pool).await?;
if pairs.is_empty() {
return Ok(None);
}
let mut builder = Metadata::builder(key.to_string());
for val in pairs {
builder = builder.from_pair(&val.tag, val.value);
}
let res = builder.build();
Ok(Some(res))
}
pub async fn get_all_metadata(&self) -> Result<Vec<Metadata>, Error> {
info!("Requested all metadata from DB");
let mut map = HashMap::new();
let query = sqlx::query!(
"\
SELECT sensor.name as key, tag, value \
FROM tag_single \
JOIN sensor USING(s_id)"
);
let pairs = query.fetch_all(&self.pool).await?;
for val in pairs {
let mut builder = map
.remove(&val.key)
.unwrap_or_else(|| Metadata::builder(val.key.to_string()));
builder = builder.from_pair(&val.tag, val.value);
map.insert(val.key, builder);
}
let res: Vec<Metadata> = map.drain().map(|(_, builder)| builder.build()).collect();
Ok(res)
}
async fn metadata_replace_tag(
&self,
key: &str,
tag: &str,
value: &str,
) -> Result<bool, Error> {
if self.metadata_tag_equals(key, tag, value).await {
return Ok(false);
}
debug!(
"tags: Replacing key={} tag={} with value='{}'",
key, tag, value
);
let mut tx = self.pool.begin().await?;
sqlx::query!("INSERT OR IGNORE INTO sensor(name) VALUES (?)", key)
.execute(&mut *tx)
.await?;
let query = sqlx::query!(
"\
INSERT OR REPLACE INTO tag_single (s_id, tag, value) \
SELECT s_id, $2, $3 \
FROM sensor \
WHERE sensor.name = $1",
key,
tag,
value
);
query.execute(&mut *tx).await?;
tx.commit().await?;
Ok(true)
}
async fn metadata_set_tag(&self, key: &str, tag: &str, value: &str) -> Result<(), Error> {
info!("tags: Setting {} for key: {} = '{}'", tag, key, value);
let mut tx = self.pool.begin().await?;
sqlx::query!("INSERT OR IGNORE INTO sensor(name) VALUES (?)", key)
.execute(&mut *tx)
.await?;
let query = sqlx::query!(
"\
INSERT INTO tag_single (s_id, tag, value) \
SELECT s_id, $2, $3 \
FROM sensor \
WHERE sensor.name = $1",
key,
tag,
value
);
query.execute(&mut *tx).await?;
tx.commit().await?;
Ok(())
}
async fn metadata_get_tag(&self, tag: &str) -> Result<Vec<(String, String)>, Error> {
info!("tags: Retrieving all tags of type: {}", tag);
let query = sqlx::query!(
"\
SELECT sensor.name as key, tag_single.value as value \
FROM tag_single \
JOIN sensor USING(s_id) \
WHERE tag_single.tag = $1 \
",
tag
);
let res = query
.map(|val| (val.key, val.value))
.fetch_all(&self.pool)
.await?;
Ok(res)
}
async fn metadata_get_single_tag(&self, key: &str, tag: &str) -> Result<String, Error> {
let query = sqlx::query_scalar!(
"\
SELECT tag_single.value as value \
FROM tag_single \
JOIN sensor USING(s_id) \
WHERE sensor.name = $1 AND tag_single.tag = $2 \
",
key,
tag
);
let res = query.fetch_one(&self.pool).await?;
Ok(res)
}
async fn metadata_tag_equals(&self, key: &str, tag: &str, value: &str) -> bool {
if let Ok(old_val) = self.metadata_get_single_tag(key, tag).await {
old_val == value
} else {
false
}
}
pub async fn metadata_set_name(&self, key: &str, name: &str) -> Result<bool, Error> {
self.metadata_replace_tag(key, "name", name).await
}
pub async fn metadata_set_unit(&self, key: &str, unit: &str) -> Result<bool, Error> {
if let Some(meta) = self.get_metadata(key).await? {
if let Some(u) = meta.u {
if u == unit {
return Ok(false);
}
}
}
self.metadata_set_tag(key, "unit", unit).await?;
Ok(true)
}
pub async fn metadata_set_description(
&self,
key: &str,
description: &str,
) -> Result<bool, Error> {
self.metadata_replace_tag(key, "description", description)
.await
}
pub async fn metadata_set_mode(&self, key: &str, mode: &SensorMode) -> Result<bool, Error> {
self.metadata_replace_tag(key, "mode", mode.as_str()).await
}
pub async fn metadata_set_enum(
&self,
key: &str,
value_map: &ValueMap,
) -> Result<bool, Error> {
let into = {
let sorted_map: BTreeMap<_, _> = value_map.iter().collect();
serde_json::to_string(&sorted_map)?
};
self.metadata_replace_tag(key, "enum", &into).await
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::error::Error as StdError;
type TestResult = Result<(), Box<dyn StdError>>;
#[tokio::test]
async fn get_empty_metadata() -> TestResult {
let ds = Datastore::temporary().await;
ds.insert("modio.test.key", "one", 1_620_850_000, false)
.await?;
ds.insert("public.test.key", "two", 1_620_850_000, false)
.await?;
let res = ds.metadata_get_names().await?;
assert!(res.is_empty(), "should be empty");
let res = ds.get_metadata("modio.test.key").await?;
assert!(res.is_none(), "should not exist");
Ok(())
}
#[tokio::test]
async fn get_metadata_name() -> TestResult {
let ds = Datastore::temporary().await;
let res = ds
.metadata_set_name("modio.test.key", "Modio Test Key")
.await?;
assert!(res);
let res = ds
.metadata_set_name("modio.test.key", "Modio Test Key")
.await?;
assert!(!res);
let res = ds.metadata_get_names().await?;
assert_eq!(res.len(), 1, "should have one key");
assert_eq!(res[0].name, Some("Modio Test Key".into()), "Should match");
assert_eq!(res[0].n, "modio.test.key", "Should be our key");
ds.metadata_set_name("modio.test.key", "Modio Test Key Two")
.await?;
let res = ds.metadata_get_names().await?;
assert_eq!(res.len(), 1, "should have one key");
assert_eq!(
res[0].name,
Some("Modio Test Key Two".into()),
"Should match"
);
Ok(())
}
#[tokio::test]
async fn get_metadata_description() -> TestResult {
let ds = Datastore::temporary().await;
ds.metadata_set_description("modio.test.key", "Modio Test description")
.await?;
let res = ds.metadata_get_descriptions().await?;
assert_eq!(res.len(), 1, "should have one key");
assert_eq!(
res[0].description,
Some("Modio Test description".into()),
"Should match"
);
assert_eq!(res[0].n, "modio.test.key", "Should be our key");
ds.metadata_set_description(
"modio.test.key",
"The second update is to change the description",
)
.await?;
let res = ds.metadata_get_descriptions().await?;
assert_eq!(res.len(), 1, "should have one key");
Ok(())
}
#[tokio::test]
async fn get_metadata_unit() -> TestResult {
let ds = Datastore::temporary().await;
ds.metadata_set_unit("modio.test.key", "Cel").await?;
let res = ds.metadata_get_units().await?;
assert_eq!(res.len(), 1, "should be one item");
assert_eq!(res[0].u, Some("Cel".into()), "Should be Celsius");
assert_eq!(res[0].n, "modio.test.key", "Should be our key");
let status = ds.metadata_set_unit("modio.test.key", "m").await;
assert!(status.is_err(), "Should not be able to replace unit");
let res = ds.metadata_get_units().await?;
assert_eq!(res[0].u, Some("Cel".into()), "Should still be Celsius");
Ok(())
}
#[tokio::test]
async fn set_unit_unique() -> TestResult {
let ds = Datastore::temporary().await;
ds.metadata_set_unit("modio.test.key", "Cel").await?;
ds.metadata_set_unit("modio.test.key", "Cel").await?;
let err = ds
.metadata_set_unit("modio.test.key", "m")
.await
.expect_err("Should get unique constraint failed");
assert_eq!(err.to_string(), "Unique constraint failed");
Ok(())
}
#[tokio::test]
async fn get_metadata_enum() -> TestResult {
let ds = Datastore::temporary().await;
let value_map = ValueMap::from([
(0, "error".to_string()),
(1, "enabled".to_string()),
(2, "disabled".to_string()),
]);
ds.metadata_set_enum("modio.test.key", &value_map).await?;
let mut res = ds.metadata_get_enum().await?;
assert_eq!(res.len(), 1, "Should have one value");
assert_eq!(res[0].n, "modio.test.key");
assert!(res[0].value_map.is_some());
let entry = res.pop().unwrap();
let vmap = entry.value_map.unwrap();
assert_eq!(vmap.get(&0).unwrap(), &"error".to_string());
assert_eq!(vmap.get(&1).unwrap(), &"enabled".to_string());
assert_eq!(vmap.get(&2).unwrap(), &"disabled".to_string());
let humm = ds.get_metadata("modio.test.key").await?;
assert!(humm.is_some());
let humm = humm.unwrap();
assert_eq!(humm.n, "modio.test.key");
assert!(humm.value_map.is_some());
let vmap = humm.value_map.unwrap();
assert_eq!(vmap.get(&0).unwrap(), &"error".to_string());
assert_eq!(vmap.get(&1).unwrap(), &"enabled".to_string());
assert_eq!(vmap.get(&2).unwrap(), &"disabled".to_string());
Ok(())
}
#[tokio::test]
async fn get_all_metadata() -> TestResult {
let ds = Datastore::temporary().await;
let value_map = ValueMap::from([
(0, "error".to_string()),
(1, "enabled".to_string()),
(2, "disabled".to_string()),
]);
ds.metadata_set_name("modio.test.dupe", "Modio Test Another Key")
.await?;
ds.metadata_set_enum("modio.test.key", &value_map).await?;
ds.metadata_set_name("modio.test.key", "Modio Test Key")
.await?;
ds.metadata_set_unit("modio.test.key", "Cel").await?;
ds.metadata_set_description("modio.test.key", "Our Description")
.await?;
let mut res = ds.get_all_metadata().await?;
assert_eq!(res.len(), 2, "should have one key");
let mut filt: Vec<Metadata> =
res.drain(..).filter(|x| x.n == "modio.test.key").collect();
let obj = filt.pop().unwrap();
assert_eq!(obj.n, "modio.test.key");
assert_eq!(obj.name, Some("Modio Test Key".into()));
assert_eq!(obj.description, Some("Our Description".into()));
assert_eq!(obj.u, Some("Cel".into()));
assert_eq!(obj.value_map.unwrap().get(&0), Some(&"error".to_string()));
Ok(())
}
}
}