use std::sync::{Arc, Mutex};
use std::time::{SystemTime, UNIX_EPOCH};
use mlua::prelude::*;
use rusqlite::Connection;
use crate::bridge::{json_to_lua, lua_to_json};
use crate::host::HostContext;
fn validate_tag_key(key: &str) -> LuaResult<()> {
if key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
Ok(())
} else {
Err(LuaError::external(
"ts tag key must be [a-zA-Z0-9_]+".to_string(),
))
}
}
fn build_query_sql(
agg: Option<&str>,
bucket_ms: Option<i64>,
tag_keys: &[String],
limit: Option<i64>,
offset: Option<i64>,
) -> Result<String, String> {
let tag_clauses: String = tag_keys
.iter()
.map(|k| format!(" AND json_extract(tags, '$.{k}') = ?"))
.collect();
let where_clause = format!("WHERE series = ? AND ts >= ? AND ts <= ?{tag_clauses}");
let limit_clause = match (limit, offset) {
(Some(l), Some(o)) => format!(" LIMIT {l} OFFSET {o}"),
(Some(l), None) => format!(" LIMIT {l}"),
(None, Some(o)) => format!(" LIMIT -1 OFFSET {o}"),
(None, None) => String::new(),
};
match agg {
None => {
let sql = format!(
"SELECT ts, value, tags FROM ts {where_clause} ORDER BY ts, rowid{limit_clause}"
);
Ok(sql)
}
Some(agg_name) => {
let agg_expr: &str = match agg_name {
"count" => "COUNT(*)",
"sum" => "SUM(CAST(value AS REAL))",
"avg" => "AVG(CAST(value AS REAL))",
"last" => {
"last"
}
other => return Err(format!("unknown agg: {other}")),
};
match bucket_ms {
None => {
if agg_name == "last" {
let sql = format!(
"SELECT value, tags, ts FROM ts {where_clause} ORDER BY ts DESC, rowid DESC LIMIT 1"
);
Ok(sql)
} else {
let sql = format!("SELECT {agg_expr} FROM ts {where_clause}");
Ok(sql)
}
}
Some(bms) => {
let bucketed_agg_expr = if agg_name == "last" {
"MAX(ts)".to_string()
} else {
agg_expr.to_string()
};
let sql = format!(
"SELECT (ts / {bms}) * {bms} AS bucket_ts, {bucketed_agg_expr} AS agg_value \
FROM ts {where_clause} \
GROUP BY bucket_ts ORDER BY bucket_ts{limit_clause}"
);
Ok(sql)
}
}
}
}
}
pub fn register(lua: &Lua, ctx: &HostContext) -> LuaResult<()> {
let conn = ctx.ts_conn.lock().map_err(|e| {
tracing::warn!(error = %e, "ts conn lock poisoned during DDL");
LuaError::external(format!("ts conn lock poisoned: {e}"))
})?;
conn.execute_batch(
"CREATE TABLE IF NOT EXISTS ts \
(series TEXT NOT NULL, ts INTEGER NOT NULL, \
tags TEXT, value TEXT NOT NULL); \
CREATE INDEX IF NOT EXISTS idx_ts_series_ts ON ts(series, ts);",
)
.map_err(|e| {
tracing::warn!(error = %e, "ts ddl failed");
LuaError::external(format!("ts DDL: {e}"))
})?;
drop(conn);
let ts_tbl = lua.create_table()?;
ts_tbl.set("append", make_append(lua, Arc::clone(&ctx.ts_conn))?)?;
ts_tbl.set("query", make_query(lua, Arc::clone(&ctx.ts_conn))?)?;
ts_tbl.set("last", make_last(lua, Arc::clone(&ctx.ts_conn))?)?;
let std_table: LuaTable = lua.globals().get("std")?;
std_table.set("ts", ts_tbl)?;
lua.load(include_str!("ts_tools.lua"))
.set_name("std.ts.register_tools")
.exec()?;
Ok(())
}
fn make_append(lua: &Lua, conn: Arc<Mutex<Connection>>) -> LuaResult<LuaFunction> {
lua.create_async_function(
move |lua, (series, value, tags, at): (String, LuaValue, Option<LuaTable>, Option<i64>)| {
let conn = Arc::clone(&conn);
async move {
tracing::trace!(series = %series, "ts.append");
let ts_ms = at.unwrap_or_else(|| {
SystemTime::now()
.duration_since(UNIX_EPOCH)
.unwrap_or_default()
.as_millis() as i64
});
let value_json = lua_to_json(&lua, value).map_err(LuaError::external)?;
let value_str = serde_json::to_string(&value_json).map_err(LuaError::external)?;
let tags_str: Option<String> = match tags {
None => None,
Some(tbl) => {
for pair in tbl.clone().pairs::<String, LuaValue>() {
let (k, _) = pair?;
validate_tag_key(&k)?;
}
let tags_json =
lua_to_json(&lua, LuaValue::Table(tbl)).map_err(LuaError::external)?;
Some(serde_json::to_string(&tags_json).map_err(LuaError::external)?)
}
};
let result = tokio::task::spawn_blocking(move || {
let conn = conn
.lock()
.map_err(|e| format!("ts conn lock poisoned: {e}"))?;
conn.execute(
"INSERT INTO ts (series, ts, tags, value) VALUES (?1, ?2, ?3, ?4)",
rusqlite::params![series, ts_ms, tags_str, value_str],
)
.map_err(|e| format!("ts append: {e}"))?;
Ok::<(), String>(())
})
.await
.map_err(|e| LuaError::external(format!("ts task: {e}")))?;
result.map_err(|e| {
tracing::warn!(error = %e, "ts append failed");
LuaError::external(e)
})?;
Ok(LuaValue::Nil)
}
},
)
}
fn make_query(lua: &Lua, conn: Arc<Mutex<Connection>>) -> LuaResult<LuaFunction> {
lua.create_async_function(move |lua, (series, opts): (String, Option<LuaTable>)| {
let conn = Arc::clone(&conn);
async move {
tracing::trace!(series = %series, "ts.query");
let from_ts: i64 = opts
.as_ref()
.and_then(|t| t.get::<Option<i64>>("from").ok().flatten())
.unwrap_or(i64::MIN);
let to_ts: i64 = opts
.as_ref()
.and_then(|t| t.get::<Option<i64>>("to").ok().flatten())
.unwrap_or(i64::MAX);
let agg: Option<String> = opts
.as_ref()
.and_then(|t| t.get::<Option<String>>("agg").ok().flatten());
let bucket_ms: Option<i64> = opts
.as_ref()
.and_then(|t| t.get::<Option<i64>>("bucket_ms").ok().flatten());
let limit: Option<i64> = opts
.as_ref()
.and_then(|t| t.get::<Option<i64>>("limit").ok().flatten());
let offset: Option<i64> = opts
.as_ref()
.and_then(|t| t.get::<Option<i64>>("offset").ok().flatten());
if let Some(bms) = bucket_ms {
if bms <= 0 {
return Err(LuaError::external(
"ts bucket_ms must be positive".to_string(),
));
}
if agg.is_none() {
return Err(LuaError::external("ts bucket_ms requires agg".to_string()));
}
}
if let Some(l) = limit {
if l < 0 {
return Err(LuaError::external("ts opts.limit must be >= 0".to_string()));
}
}
if let Some(o) = offset {
if o < 0 {
return Err(LuaError::external(
"ts opts.offset must be >= 0".to_string(),
));
}
}
let tags_filter: Vec<(String, String)> = match opts
.as_ref()
.and_then(|t| t.get::<Option<LuaTable>>("tags").ok().flatten())
{
None => vec![],
Some(tbl) => {
let mut pairs = Vec::new();
for p in tbl.pairs::<String, LuaValue>() {
let (k, v) = p?;
validate_tag_key(&k)?;
let v_json = lua_to_json(&lua, v).map_err(LuaError::external)?;
let v_str = match &v_json {
serde_json::Value::String(s) => s.clone(),
other => serde_json::to_string(other).map_err(LuaError::external)?,
};
pairs.push((k, v_str));
}
pairs
}
};
let tag_keys: Vec<String> = tags_filter.iter().map(|(k, _)| k.clone()).collect();
let tag_values: Vec<String> = tags_filter.iter().map(|(_, v)| v.clone()).collect();
let agg_ref = agg.as_deref();
let sql = build_query_sql(agg_ref, bucket_ms, &tag_keys, limit, offset)
.map_err(LuaError::external)?;
let is_single_agg = agg.is_some() && bucket_ms.is_none();
let is_last_single = agg.as_deref() == Some("last") && bucket_ms.is_none();
let is_bucketed = agg.is_some() && bucket_ms.is_some();
let rows_raw: Result<Vec<Vec<Option<String>>>, String> =
tokio::task::spawn_blocking(move || {
let conn = conn
.lock()
.map_err(|e| format!("ts conn lock poisoned: {e}"))?;
let mut stmt = conn
.prepare(&sql)
.map_err(|e| format!("ts query prepare: {e}"))?;
let mut params: Vec<Box<dyn rusqlite::ToSql>> =
vec![Box::new(series), Box::new(from_ts), Box::new(to_ts)];
for v in tag_values {
params.push(Box::new(v));
}
let param_refs: Vec<&dyn rusqlite::ToSql> =
params.iter().map(|p| p.as_ref()).collect();
let col_count = stmt.column_count();
let rows: Vec<Vec<Option<String>>> = stmt
.query(param_refs.as_slice())
.map_err(|e| format!("ts query exec: {e}"))?
.mapped(|row| {
let mut cols = Vec::with_capacity(col_count);
for i in 0..col_count {
let v: rusqlite::types::Value =
row.get::<_, rusqlite::types::Value>(i)?;
let s = match v {
rusqlite::types::Value::Null => None,
rusqlite::types::Value::Integer(n) => Some(n.to_string()),
rusqlite::types::Value::Real(f) => Some(f.to_string()),
rusqlite::types::Value::Text(s) => Some(s),
rusqlite::types::Value::Blob(_) => None,
};
cols.push(s);
}
Ok(cols)
})
.collect::<Result<_, _>>()
.map_err(|e| format!("ts query row: {e}"))?;
Ok(rows)
})
.await
.map_err(|e| LuaError::external(format!("ts task: {e}")))?;
let rows_raw = rows_raw.map_err(|e| {
tracing::warn!(error = %e, "ts query failed");
LuaError::external(e)
})?;
let result_table = lua.create_table()?;
if is_last_single {
for (idx, row) in rows_raw.iter().enumerate() {
let row_tbl = lua.create_table()?;
let value_lv = decode_value_col(&lua, row.first().and_then(|s| s.as_deref()))?;
row_tbl.set("value", value_lv)?;
let tags_lv = decode_tags_col(&lua, row.get(1).and_then(|s| s.as_deref()))?;
row_tbl.set("tags", tags_lv)?;
let ts_lv = if let Some(Some(s)) = row.get(2) {
let n: i64 = s.parse().map_err(LuaError::external)?;
LuaValue::Integer(n)
} else {
LuaValue::Nil
};
row_tbl.set("ts", ts_lv)?;
result_table.set(idx + 1, row_tbl)?;
}
} else if is_single_agg {
for (idx, row) in rows_raw.iter().enumerate() {
let row_tbl = lua.create_table()?;
let agg_lv = decode_value_col(&lua, row.first().and_then(|s| s.as_deref()))?;
row_tbl.set("value", agg_lv)?;
result_table.set(idx + 1, row_tbl)?;
}
} else if is_bucketed {
for (idx, row) in rows_raw.iter().enumerate() {
let row_tbl = lua.create_table()?;
let bts_lv = if let Some(Some(s)) = row.first() {
let n: i64 = s.parse().map_err(LuaError::external)?;
LuaValue::Integer(n)
} else {
LuaValue::Nil
};
row_tbl.set("bucket_ts", bts_lv)?;
let agg_lv = decode_value_col(&lua, row.get(1).and_then(|s| s.as_deref()))?;
row_tbl.set("value", agg_lv)?;
result_table.set(idx + 1, row_tbl)?;
}
} else {
for (idx, row) in rows_raw.iter().enumerate() {
let row_tbl = lua.create_table()?;
let ts_lv = if let Some(Some(s)) = row.first() {
let n: i64 = s.parse().map_err(LuaError::external)?;
LuaValue::Integer(n)
} else {
LuaValue::Nil
};
row_tbl.set("ts", ts_lv)?;
let value_lv = decode_value_col(&lua, row.get(1).and_then(|s| s.as_deref()))?;
row_tbl.set("value", value_lv)?;
let tags_lv = decode_tags_col(&lua, row.get(2).and_then(|s| s.as_deref()))?;
row_tbl.set("tags", tags_lv)?;
result_table.set(idx + 1, row_tbl)?;
}
}
Ok(LuaValue::Table(result_table))
}
})
}
fn make_last(lua: &Lua, conn: Arc<Mutex<Connection>>) -> LuaResult<LuaFunction> {
lua.create_async_function(move |lua, (series, tags): (String, Option<LuaTable>)| {
let conn = Arc::clone(&conn);
async move {
tracing::trace!(series = %series, "ts.last");
let tags_filter: Vec<(String, String)> = match tags {
None => vec![],
Some(tbl) => {
let mut pairs = Vec::new();
for p in tbl.pairs::<String, LuaValue>() {
let (k, v) = p?;
validate_tag_key(&k)?;
let v_json = lua_to_json(&lua, v).map_err(LuaError::external)?;
let v_str = match &v_json {
serde_json::Value::String(s) => s.clone(),
other => serde_json::to_string(other).map_err(LuaError::external)?,
};
pairs.push((k, v_str));
}
pairs
}
};
let tag_keys: Vec<String> = tags_filter.iter().map(|(k, _)| k.clone()).collect();
let tag_values: Vec<String> = tags_filter.iter().map(|(_, v)| v.clone()).collect();
let tag_clauses: String = tag_keys
.iter()
.map(|k| format!(" AND json_extract(tags, '$.{k}') = ?"))
.collect();
let sql = format!(
"SELECT ts, value, tags FROM ts \
WHERE series = ? AND ts >= ? AND ts <= ?{tag_clauses} \
ORDER BY ts DESC, rowid DESC LIMIT 1"
);
let row_raw: Result<Option<(i64, String, Option<String>)>, String> =
tokio::task::spawn_blocking(move || {
let conn = conn
.lock()
.map_err(|e| format!("ts conn lock poisoned: {e}"))?;
let mut stmt = conn
.prepare(&sql)
.map_err(|e| format!("ts last prepare: {e}"))?;
let mut params: Vec<Box<dyn rusqlite::ToSql>> =
vec![Box::new(series), Box::new(i64::MIN), Box::new(i64::MAX)];
for v in tag_values {
params.push(Box::new(v));
}
let param_refs: Vec<&dyn rusqlite::ToSql> =
params.iter().map(|p| p.as_ref()).collect();
let mut rows = stmt
.query(param_refs.as_slice())
.map_err(|e| format!("ts last query: {e}"))?;
if let Some(row) = rows.next().map_err(|e| format!("ts last row: {e}"))? {
let ts_val: i64 = row.get(0).map_err(|e| format!("ts last ts col: {e}"))?;
let value_str: String =
row.get(1).map_err(|e| format!("ts last value col: {e}"))?;
let tags_str: Option<String> =
row.get(2).map_err(|e| format!("ts last tags col: {e}"))?;
Ok(Some((ts_val, value_str, tags_str)))
} else {
Ok(None)
}
})
.await
.map_err(|e| LuaError::external(format!("ts task: {e}")))?;
let row_opt = row_raw.map_err(|e| {
tracing::warn!(error = %e, "ts last failed");
LuaError::external(e)
})?;
match row_opt {
None => Ok(LuaValue::Nil),
Some((ts_val, value_str, tags_str)) => {
let row_tbl = lua.create_table()?;
row_tbl.set("ts", LuaValue::Integer(ts_val))?;
let v_json: serde_json::Value =
serde_json::from_str(&value_str).map_err(LuaError::external)?;
let v_lv = json_to_lua(&lua, v_json)?;
row_tbl.set("value", v_lv)?;
let tags_lv = decode_tags_col(&lua, tags_str.as_deref())?;
row_tbl.set("tags", tags_lv)?;
Ok(LuaValue::Table(row_tbl))
}
}
}
})
}
fn decode_value_col(lua: &Lua, raw: Option<&str>) -> LuaResult<LuaValue> {
match raw {
None => Ok(LuaValue::Nil),
Some(s) => {
let v: serde_json::Value = serde_json::from_str(s).map_err(LuaError::external)?;
json_to_lua(lua, v)
}
}
}
fn decode_tags_col(lua: &Lua, raw: Option<&str>) -> LuaResult<LuaValue> {
decode_value_col(lua, raw)
}
#[cfg(test)]
mod tests {
use super::*;
use rusqlite::{params, Connection};
#[test]
fn raw_path_same_ms_preserves_insert_order() {
let sql = build_query_sql(None, None, &[], None, None).expect("build_query_sql");
assert!(
sql.contains("ORDER BY ts, rowid"),
"raw path SQL missing rowid tie-breaker: {sql}"
);
let conn = Connection::open_in_memory().expect("open in-memory sqlite");
conn.execute_batch(
"CREATE TABLE ts (series TEXT NOT NULL, ts INTEGER NOT NULL, \
tags TEXT, value TEXT NOT NULL); \
CREATE INDEX idx_ts_series_ts ON ts(series, ts);",
)
.expect("ddl");
conn.execute(
"INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
params!["s", 1000_i64, "\"first\""],
)
.expect("insert 1");
conn.execute(
"INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
params!["s", 1000_i64, "\"second\""],
)
.expect("insert 2");
conn.execute(
"INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
params!["s", 1000_i64, "\"third\""],
)
.expect("insert 3");
let mut stmt = conn.prepare(&sql).expect("prepare");
let rows: Vec<String> = stmt
.query_map(params!["s", i64::MIN, i64::MAX], |r| r.get::<_, String>(1))
.expect("query")
.collect::<Result<Vec<_>, _>>()
.expect("collect");
assert_eq!(
rows,
vec![
"\"first\"".to_string(),
"\"second\"".to_string(),
"\"third\"".to_string()
],
"raw path returned rows in non-INSERT order: {rows:?}"
);
}
#[test]
fn last_path_same_ms_returns_last_insert() {
let sql_last =
build_query_sql(Some("last"), None, &[], None, None).expect("build_query_sql last");
assert!(
sql_last.contains("ORDER BY ts DESC, rowid DESC LIMIT 1"),
"last path SQL missing rowid DESC tie-breaker: {sql_last}"
);
let conn = Connection::open_in_memory().expect("open in-memory sqlite");
conn.execute_batch(
"CREATE TABLE ts (series TEXT NOT NULL, ts INTEGER NOT NULL, \
tags TEXT, value TEXT NOT NULL); \
CREATE INDEX idx_ts_series_ts ON ts(series, ts);",
)
.expect("ddl");
for v in ["\"first\"", "\"second\"", "\"third\""] {
conn.execute(
"INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
params!["s", 1000_i64, v],
)
.expect("insert");
}
let make_last_sql = "SELECT ts, value, tags FROM ts \
WHERE series = ? AND ts >= ? AND ts <= ? \
ORDER BY ts DESC, rowid DESC LIMIT 1";
let mut stmt = conn.prepare(make_last_sql).expect("prepare");
let value: String = stmt
.query_row(params!["s", i64::MIN, i64::MAX], |r| r.get::<_, String>(1))
.expect("query_row");
assert_eq!(
value, "\"third\"",
"last path returned non-last INSERT value: {value}"
);
}
}