agent_block_core/bridge/ts.rs
1//! `std.ts.*` — SQLite-backed time-series primitive.
2//!
3//! This is the only storage bridge whose implementation is in-tree
4//! (mlua_batteries provides no TSDB module). DDL / append / query /
5//! last all live in this file.
6//!
7//! Backend: single `ts` table in `ts.sqlite` (or `:memory:`).
8//!
9//! Schema:
10//! ```sql
11//! CREATE TABLE IF NOT EXISTS ts (
12//! series TEXT NOT NULL,
13//! ts INTEGER NOT NULL,
14//! tags TEXT,
15//! value TEXT NOT NULL
16//! );
17//! CREATE INDEX IF NOT EXISTS idx_ts_series_ts ON ts(series, ts);
18//! ```
19//!
20//! Column notes:
21//! - `series`: logical stream name (e.g. `"cpu_load"`, `"agent_events"`)
22//! - `ts`: Unix timestamp in milliseconds (i64)
23//! - `tags`: JSON object (`{"task": "X", "phase": "Y"}`) or NULL; filtered
24//! via `json_extract` in queries — never compared as a serialised string
25//! - `value`: JSON-encoded payload; accepts both JSON numbers and JSON objects
26//! so that callers can append plain numeric metrics or structured MCP
27//! envelope payloads without loss (dual-type contract, Crux §3.8 C1)
28//!
29//! See `bridge/config.rs` for the ENV → path mapping (`AGENT_BLOCK_TS_PATH`).
30
31use std::sync::{Arc, Mutex};
32use std::time::{SystemTime, UNIX_EPOCH};
33
34use mlua::prelude::*;
35use rusqlite::Connection;
36
37use crate::bridge::{json_to_lua, lua_to_json};
38use crate::host::HostContext;
39
40// ── helpers ──────────────────────────────────────────────────────────────────
41
42/// Validate that every tag key contains only ASCII alphanumeric characters or
43/// underscores, guarding against SQL injection via format-string tag paths.
44///
45/// # Arguments
46///
47/// - `key`: the tag key to validate
48///
49/// # Errors
50///
51/// Returns a `LuaError` if `key` contains any character outside `[a-zA-Z0-9_]`.
52fn validate_tag_key(key: &str) -> LuaResult<()> {
53 if key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
54 Ok(())
55 } else {
56 Err(LuaError::external(
57 "ts tag key must be [a-zA-Z0-9_]+".to_string(),
58 ))
59 }
60}
61
62/// Build the SQL query string for `std.ts.query`.
63///
64/// Constructs a parameterised SQL string for one of three query shapes:
65/// - **raw** (`agg = None`): `SELECT ts, value, tags … ORDER BY ts, rowid LIMIT ? OFFSET ?`
66/// - **single-aggregate** (`agg = Some(_)`, `bucket_ms = None`):
67/// `SELECT <AGG_EXPR> FROM ts WHERE …` (single row, no LIMIT/OFFSET)
68/// - **time-bucketed** (`agg = Some(_)`, `bucket_ms = Some(_)`):
69/// `SELECT (ts/?)*? AS bucket_ts, <AGG_EXPR> … GROUP BY bucket_ts … LIMIT ? OFFSET ?`
70///
71/// The returned string uses positional `?` placeholders. The binding order is:
72/// `series, from_ts, to_ts, [tag_values…], [bucket_ms, bucket_ms], [limit, offset]`.
73///
74/// # Arguments
75///
76/// - `agg`: optional aggregation function name (`"count"`, `"sum"`, `"avg"`, `"last"`)
77/// - `bucket_ms`: optional bucket width in milliseconds (> 0)
78/// - `tag_keys`: ordered list of tag keys for the AND-filter; paths become
79/// `json_extract(tags, '$.<key>')` placeholders
80/// - `limit`: optional maximum row count (`>= 0`)
81/// - `offset`: optional row skip count (`>= 0`)
82///
83/// # Errors
84///
85/// Returns `Err(String)` for an unrecognised aggregation function name.
86fn build_query_sql(
87 agg: Option<&str>,
88 bucket_ms: Option<i64>,
89 tag_keys: &[String],
90 limit: Option<i64>,
91 offset: Option<i64>,
92) -> Result<String, String> {
93 // Build the shared WHERE clause fragment (after series / ts range filter).
94 // Each tag key adds one `AND json_extract(tags, '$.<key>') = ?` clause.
95 // This implements the Crux C2 conjunction contract: every k/v pair in
96 // opts.tags is evaluated independently via json_extract, never as a
97 // single serialised-string equality match.
98 let tag_clauses: String = tag_keys
99 .iter()
100 .map(|k| format!(" AND json_extract(tags, '$.{k}') = ?"))
101 .collect();
102
103 let where_clause = format!("WHERE series = ? AND ts >= ? AND ts <= ?{tag_clauses}");
104
105 // Helper to append LIMIT / OFFSET fragments (not used in single-agg mode).
106 let limit_clause = match (limit, offset) {
107 (Some(l), Some(o)) => format!(" LIMIT {l} OFFSET {o}"),
108 (Some(l), None) => format!(" LIMIT {l}"),
109 (None, Some(o)) => format!(" LIMIT -1 OFFSET {o}"),
110 (None, None) => String::new(),
111 };
112
113 match agg {
114 // ── path 1: raw rows ──────────────────────────────────────────────
115 None => {
116 let sql = format!(
117 "SELECT ts, value, tags FROM ts {where_clause} ORDER BY ts, rowid{limit_clause}"
118 );
119 Ok(sql)
120 }
121
122 // ── path 2 / 3: aggregate ─────────────────────────────────────────
123 Some(agg_name) => {
124 // Build the aggregate expression. Note that agg="last" is
125 // special: it is not a SQL aggregate function but an ORDER+LIMIT
126 // operation. For the time-bucketed case (path 3) we use MAX(ts)
127 // per bucket to identify the latest row, which requires the
128 // caller to do a second fetch (or we use a subquery). For
129 // simplicity the bucketed "last" uses MAX(ts) as a proxy for the
130 // last timestamp — callers needing the actual value should use
131 // a separate query. For single-agg "last" we use a full
132 // ORDER BY ts DESC LIMIT 1 subquery form.
133 let agg_expr: &str = match agg_name {
134 "count" => "COUNT(*)",
135 "sum" => "SUM(CAST(value AS REAL))",
136 "avg" => "AVG(CAST(value AS REAL))",
137 "last" => {
138 // handled specially per path below
139 "last"
140 }
141 other => return Err(format!("unknown agg: {other}")),
142 };
143
144 match bucket_ms {
145 // ── path 2: single aggregate (no bucket) ─────────────────
146 None => {
147 if agg_name == "last" {
148 // agg="last" + no bucket → ORDER BY ts DESC LIMIT 1
149 let sql = format!(
150 "SELECT value, tags, ts FROM ts {where_clause} ORDER BY ts DESC, rowid DESC LIMIT 1"
151 );
152 Ok(sql)
153 } else {
154 let sql = format!("SELECT {agg_expr} FROM ts {where_clause}");
155 Ok(sql)
156 }
157 }
158
159 // ── path 3: time-bucketed aggregate ───────────────────────
160 Some(bms) => {
161 // bucket_ts = (ts / bucket_ms) * bucket_ms (integer division)
162 // The bucket_ms literal is embedded directly into the SQL
163 // string (safe: validated positive i64, no user-supplied
164 // string content). Embedding avoids the parameter-ordering
165 // issue that arises when SELECT-clause `?` placeholders
166 // appear before WHERE-clause `?` placeholders — SQLite
167 // positional binding fills them left-to-right, so any `?`
168 // in the SELECT would consume series/from_ts/to_ts params.
169 let bucketed_agg_expr = if agg_name == "last" {
170 // For bucketed "last" we return the maximum ts in
171 // each bucket as a proxy for the last value.
172 // CAST(value AS REAL) would not be meaningful here;
173 // instead we expose MAX(ts) for the bucket boundary.
174 "MAX(ts)".to_string()
175 } else {
176 agg_expr.to_string()
177 };
178
179 let sql = format!(
180 "SELECT (ts / {bms}) * {bms} AS bucket_ts, {bucketed_agg_expr} AS agg_value \
181 FROM ts {where_clause} \
182 GROUP BY bucket_ts ORDER BY bucket_ts{limit_clause}"
183 );
184 Ok(sql)
185 }
186 }
187 }
188 }
189}
190
191// ── registration ─────────────────────────────────────────────────────────────
192
193/// Register the `std.ts` bridge into `lua`.
194///
195/// On first call this function:
196/// 1. Acquires the ts SQLite connection and runs the DDL (idempotent —
197/// `CREATE TABLE IF NOT EXISTS` / `CREATE INDEX IF NOT EXISTS`).
198/// 2. Installs `std.ts.append`, `std.ts.query`, and `std.ts.last` as async
199/// Lua functions.
200/// 3. Loads `ts_tools.lua` to provide `std.ts.register_tools`.
201///
202/// # Arguments
203///
204/// - `lua`: the Lua state to register into (main Isle or handler Isle)
205/// - `ctx`: host context providing `ts_conn` (Arc<Mutex<Connection>>)
206///
207/// # Errors
208///
209/// Returns a `LuaError` if:
210/// - the Mutex is poisoned (`ts conn lock poisoned`)
211/// - the DDL `execute_batch` fails (`ts DDL: <rusqlite error>`)
212/// - the `std` global is not a table or any `std.ts` assignment fails
213pub fn register(lua: &Lua, ctx: &HostContext) -> LuaResult<()> {
214 // ── DDL init ─────────────────────────────────────────────────────────
215 let conn = ctx.ts_conn.lock().map_err(|e| {
216 tracing::warn!(error = %e, "ts conn lock poisoned during DDL");
217 LuaError::external(format!("ts conn lock poisoned: {e}"))
218 })?;
219
220 conn.execute_batch(
221 "CREATE TABLE IF NOT EXISTS ts \
222 (series TEXT NOT NULL, ts INTEGER NOT NULL, \
223 tags TEXT, value TEXT NOT NULL); \
224 CREATE INDEX IF NOT EXISTS idx_ts_series_ts ON ts(series, ts);",
225 )
226 .map_err(|e| {
227 tracing::warn!(error = %e, "ts ddl failed");
228 LuaError::external(format!("ts DDL: {e}"))
229 })?;
230
231 drop(conn);
232
233 // ── Build std.ts table ────────────────────────────────────────────────
234 let ts_tbl = lua.create_table()?;
235
236 // ── std.ts.append ─────────────────────────────────────────────────────
237 ts_tbl.set("append", make_append(lua, Arc::clone(&ctx.ts_conn))?)?;
238
239 // ── std.ts.query ──────────────────────────────────────────────────────
240 ts_tbl.set("query", make_query(lua, Arc::clone(&ctx.ts_conn))?)?;
241
242 // ── std.ts.last ───────────────────────────────────────────────────────
243 ts_tbl.set("last", make_last(lua, Arc::clone(&ctx.ts_conn))?)?;
244
245 // ── Install into std global ───────────────────────────────────────────
246 let std_table: LuaTable = lua.globals().get("std")?;
247 std_table.set("ts", ts_tbl)?;
248
249 // ── Load ts_tools.lua (std.ts.register_tools) ─────────────────────────
250 lua.load(include_str!("ts_tools.lua"))
251 .set_name("std.ts.register_tools")
252 .exec()?;
253
254 Ok(())
255}
256
257// ── append ────────────────────────────────────────────────────────────────────
258
259/// Create the `std.ts.append(series, value, tags?, at?)` async function.
260///
261/// # Arguments
262///
263/// - `lua`: the Lua state
264/// - `conn`: shared SQLite connection
265///
266/// # Errors
267///
268/// Returns `LuaError` on Mutex poison, rusqlite error, or JSON encode error.
269fn make_append(lua: &Lua, conn: Arc<Mutex<Connection>>) -> LuaResult<LuaFunction> {
270 lua.create_async_function(
271 move |lua, (series, value, tags, at): (String, LuaValue, Option<LuaTable>, Option<i64>)| {
272 let conn = Arc::clone(&conn);
273 async move {
274 tracing::trace!(series = %series, "ts.append");
275
276 // ── resolve timestamp ─────────────────────────────────────
277 // unwrap_or_default: UNIX_EPOCH-before fallback → Duration::ZERO.
278 // This is a safe fallback path, not an unguarded unwrap.
279 let ts_ms = at.unwrap_or_else(|| {
280 SystemTime::now()
281 .duration_since(UNIX_EPOCH)
282 .unwrap_or_default()
283 .as_millis() as i64
284 });
285
286 // ── encode value before entering spawn_blocking ───────────
287 // lua_to_json requires &Lua (Lua VM access) and cannot be
288 // called inside spawn_blocking. Serialise to String here
289 // in the async context, then move the String into the closure.
290 let value_json = lua_to_json(&lua, value).map_err(LuaError::external)?;
291 let value_str = serde_json::to_string(&value_json).map_err(LuaError::external)?;
292
293 // ── encode tags ───────────────────────────────────────────
294 let tags_str: Option<String> = match tags {
295 None => None,
296 Some(tbl) => {
297 // Validate all tag keys before encoding.
298 for pair in tbl.clone().pairs::<String, LuaValue>() {
299 let (k, _) = pair?;
300 validate_tag_key(&k)?;
301 }
302 let tags_json =
303 lua_to_json(&lua, LuaValue::Table(tbl)).map_err(LuaError::external)?;
304 Some(serde_json::to_string(&tags_json).map_err(LuaError::external)?)
305 }
306 };
307
308 // ── blocking SQLite insert ────────────────────────────────
309 let result = tokio::task::spawn_blocking(move || {
310 let conn = conn
311 .lock()
312 .map_err(|e| format!("ts conn lock poisoned: {e}"))?;
313 conn.execute(
314 "INSERT INTO ts (series, ts, tags, value) VALUES (?1, ?2, ?3, ?4)",
315 rusqlite::params![series, ts_ms, tags_str, value_str],
316 )
317 .map_err(|e| format!("ts append: {e}"))?;
318 Ok::<(), String>(())
319 })
320 .await
321 .map_err(|e| LuaError::external(format!("ts task: {e}")))?;
322
323 result.map_err(|e| {
324 tracing::warn!(error = %e, "ts append failed");
325 LuaError::external(e)
326 })?;
327
328 Ok(LuaValue::Nil)
329 }
330 },
331 )
332}
333
334// ── query ─────────────────────────────────────────────────────────────────────
335
336/// Create the `std.ts.query(series, opts)` async function.
337///
338/// `opts` fields (all optional):
339/// - `from` (integer): start timestamp ms, default `i64::MIN`
340/// - `to` (integer): end timestamp ms, default `i64::MAX`
341/// - `tags` (table): AND-filter; each k/v pair becomes a `json_extract` clause
342/// - `agg` (string): aggregation — `"count"` | `"sum"` | `"avg"` | `"last"`
343/// - `bucket_ms` (integer > 0): bucket width; requires `agg`
344/// - `limit` (integer >= 0): maximum result rows
345/// - `offset` (integer >= 0): result rows to skip
346///
347/// Returns a Lua array of row tables:
348/// - raw mode: `{ ts, value, tags }`
349/// - single-agg (agg, no bucket): `{ value }` (scalar result)
350/// - bucketed-agg: `{ bucket_ts, value }`
351///
352/// Note: `sum`/`avg` treat `value` as a JSON number via `CAST(value AS REAL)`.
353/// Rows whose `value` is a JSON object produce `0.0` in SQLite's CAST — prefer
354/// number-only series when using `sum`/`avg`.
355///
356/// # Errors
357///
358/// Returns `LuaError` on validation failure, Mutex poison, or rusqlite error.
359fn make_query(lua: &Lua, conn: Arc<Mutex<Connection>>) -> LuaResult<LuaFunction> {
360 lua.create_async_function(move |lua, (series, opts): (String, Option<LuaTable>)| {
361 let conn = Arc::clone(&conn);
362 async move {
363 tracing::trace!(series = %series, "ts.query");
364
365 // ── parse opts ────────────────────────────────────────────
366 let from_ts: i64 = opts
367 .as_ref()
368 .and_then(|t| t.get::<Option<i64>>("from").ok().flatten())
369 .unwrap_or(i64::MIN);
370 let to_ts: i64 = opts
371 .as_ref()
372 .and_then(|t| t.get::<Option<i64>>("to").ok().flatten())
373 .unwrap_or(i64::MAX);
374
375 let agg: Option<String> = opts
376 .as_ref()
377 .and_then(|t| t.get::<Option<String>>("agg").ok().flatten());
378
379 let bucket_ms: Option<i64> = opts
380 .as_ref()
381 .and_then(|t| t.get::<Option<i64>>("bucket_ms").ok().flatten());
382
383 let limit: Option<i64> = opts
384 .as_ref()
385 .and_then(|t| t.get::<Option<i64>>("limit").ok().flatten());
386
387 let offset: Option<i64> = opts
388 .as_ref()
389 .and_then(|t| t.get::<Option<i64>>("offset").ok().flatten());
390
391 // ── validate opts ─────────────────────────────────────────
392 if let Some(bms) = bucket_ms {
393 if bms <= 0 {
394 return Err(LuaError::external(
395 "ts bucket_ms must be positive".to_string(),
396 ));
397 }
398 if agg.is_none() {
399 return Err(LuaError::external("ts bucket_ms requires agg".to_string()));
400 }
401 }
402 if let Some(l) = limit {
403 if l < 0 {
404 return Err(LuaError::external("ts opts.limit must be >= 0".to_string()));
405 }
406 }
407 if let Some(o) = offset {
408 if o < 0 {
409 return Err(LuaError::external(
410 "ts opts.offset must be >= 0".to_string(),
411 ));
412 }
413 }
414
415 // ── extract and validate tags filter ──────────────────────
416 // Tags k/v pairs are collected into (key, json_string) pairs
417 // before entering spawn_blocking so we can access the Lua VM.
418 let tags_filter: Vec<(String, String)> = match opts
419 .as_ref()
420 .and_then(|t| t.get::<Option<LuaTable>>("tags").ok().flatten())
421 {
422 None => vec![],
423 Some(tbl) => {
424 let mut pairs = Vec::new();
425 for p in tbl.pairs::<String, LuaValue>() {
426 let (k, v) = p?;
427 validate_tag_key(&k)?;
428 // Encode tag value as a JSON string for comparison.
429 let v_json = lua_to_json(&lua, v).map_err(LuaError::external)?;
430 let v_str = match &v_json {
431 serde_json::Value::String(s) => s.clone(),
432 other => serde_json::to_string(other).map_err(LuaError::external)?,
433 };
434 pairs.push((k, v_str));
435 }
436 pairs
437 }
438 };
439
440 let tag_keys: Vec<String> = tags_filter.iter().map(|(k, _)| k.clone()).collect();
441 let tag_values: Vec<String> = tags_filter.iter().map(|(_, v)| v.clone()).collect();
442
443 // ── build SQL ─────────────────────────────────────────────
444 let agg_ref = agg.as_deref();
445 let sql = build_query_sql(agg_ref, bucket_ms, &tag_keys, limit, offset)
446 .map_err(LuaError::external)?;
447
448 // ── execute in blocking thread ────────────────────────────
449 let is_single_agg = agg.is_some() && bucket_ms.is_none();
450 let is_last_single = agg.as_deref() == Some("last") && bucket_ms.is_none();
451 let is_bucketed = agg.is_some() && bucket_ms.is_some();
452
453 let rows_raw: Result<Vec<Vec<Option<String>>>, String> =
454 tokio::task::spawn_blocking(move || {
455 let conn = conn
456 .lock()
457 .map_err(|e| format!("ts conn lock poisoned: {e}"))?;
458
459 let mut stmt = conn
460 .prepare(&sql)
461 .map_err(|e| format!("ts query prepare: {e}"))?;
462
463 // Build the parameter list dynamically.
464 // Order: series, from_ts, to_ts, [tag_values…], [bucket_ms × 2 if bucketed]
465 let mut params: Vec<Box<dyn rusqlite::ToSql>> =
466 vec![Box::new(series), Box::new(from_ts), Box::new(to_ts)];
467 for v in tag_values {
468 params.push(Box::new(v));
469 }
470 // Note: bucket_ms is embedded as a literal in the SQL
471 // (see build_query_sql path 3), so no additional params
472 // are needed for the bucketed-aggregate case.
473
474 let param_refs: Vec<&dyn rusqlite::ToSql> =
475 params.iter().map(|p| p.as_ref()).collect();
476
477 let col_count = stmt.column_count();
478 let rows: Vec<Vec<Option<String>>> = stmt
479 .query(param_refs.as_slice())
480 .map_err(|e| format!("ts query exec: {e}"))?
481 .mapped(|row| {
482 let mut cols = Vec::with_capacity(col_count);
483 for i in 0..col_count {
484 // Use Value (not String) to handle INTEGER and REAL columns.
485 // rusqlite's FromSql for String only accepts Text/Blob and
486 // returns FromSqlError::InvalidType for INTEGER or REAL values
487 // (e.g. the `ts` column, COUNT(*), SUM, AVG, bucket_ts).
488 let v: rusqlite::types::Value =
489 row.get::<_, rusqlite::types::Value>(i)?;
490 let s = match v {
491 rusqlite::types::Value::Null => None,
492 rusqlite::types::Value::Integer(n) => Some(n.to_string()),
493 rusqlite::types::Value::Real(f) => Some(f.to_string()),
494 rusqlite::types::Value::Text(s) => Some(s),
495 rusqlite::types::Value::Blob(_) => None,
496 };
497 cols.push(s);
498 }
499 Ok(cols)
500 })
501 .collect::<Result<_, _>>()
502 .map_err(|e| format!("ts query row: {e}"))?;
503
504 Ok(rows)
505 })
506 .await
507 .map_err(|e| LuaError::external(format!("ts task: {e}")))?;
508
509 let rows_raw = rows_raw.map_err(|e| {
510 tracing::warn!(error = %e, "ts query failed");
511 LuaError::external(e)
512 })?;
513
514 // ── decode rows into Lua ───────────────────────────────────
515 // Column layout depends on the query path:
516 // raw: [ts(i64), value(text), tags(text|null)]
517 // single-agg: [agg_value(text|null)] (or [value,tags,ts] for last)
518 // bucketed: [bucket_ts(i64), agg_value(text|null)]
519 let result_table = lua.create_table()?;
520
521 if is_last_single {
522 // path 2 agg=last: columns are [value, tags, ts]
523 // Returns at most 1 row; wrap as single-element array for
524 // consistency with other agg modes.
525 for (idx, row) in rows_raw.iter().enumerate() {
526 let row_tbl = lua.create_table()?;
527 // col 0: value TEXT
528 let value_lv = decode_value_col(&lua, row.first().and_then(|s| s.as_deref()))?;
529 row_tbl.set("value", value_lv)?;
530 // col 1: tags TEXT|NULL
531 let tags_lv = decode_tags_col(&lua, row.get(1).and_then(|s| s.as_deref()))?;
532 row_tbl.set("tags", tags_lv)?;
533 // col 2: ts INTEGER
534 let ts_lv = if let Some(Some(s)) = row.get(2) {
535 let n: i64 = s.parse().map_err(LuaError::external)?;
536 LuaValue::Integer(n)
537 } else {
538 LuaValue::Nil
539 };
540 row_tbl.set("ts", ts_lv)?;
541 result_table.set(idx + 1, row_tbl)?;
542 }
543 } else if is_single_agg {
544 // path 2 (non-last): single column [agg_value]
545 for (idx, row) in rows_raw.iter().enumerate() {
546 let row_tbl = lua.create_table()?;
547 let agg_lv = decode_value_col(&lua, row.first().and_then(|s| s.as_deref()))?;
548 row_tbl.set("value", agg_lv)?;
549 result_table.set(idx + 1, row_tbl)?;
550 }
551 } else if is_bucketed {
552 // path 3: columns [bucket_ts, agg_value]
553 for (idx, row) in rows_raw.iter().enumerate() {
554 let row_tbl = lua.create_table()?;
555 // col 0: bucket_ts INTEGER (from (ts/?)*? expression)
556 let bts_lv = if let Some(Some(s)) = row.first() {
557 let n: i64 = s.parse().map_err(LuaError::external)?;
558 LuaValue::Integer(n)
559 } else {
560 LuaValue::Nil
561 };
562 row_tbl.set("bucket_ts", bts_lv)?;
563 // col 1: agg_value
564 let agg_lv = decode_value_col(&lua, row.get(1).and_then(|s| s.as_deref()))?;
565 row_tbl.set("value", agg_lv)?;
566 result_table.set(idx + 1, row_tbl)?;
567 }
568 } else {
569 // path 1: raw rows [ts, value, tags]
570 for (idx, row) in rows_raw.iter().enumerate() {
571 let row_tbl = lua.create_table()?;
572 // col 0: ts INTEGER
573 let ts_lv = if let Some(Some(s)) = row.first() {
574 let n: i64 = s.parse().map_err(LuaError::external)?;
575 LuaValue::Integer(n)
576 } else {
577 LuaValue::Nil
578 };
579 row_tbl.set("ts", ts_lv)?;
580 // col 1: value TEXT
581 let value_lv = decode_value_col(&lua, row.get(1).and_then(|s| s.as_deref()))?;
582 row_tbl.set("value", value_lv)?;
583 // col 2: tags TEXT|NULL
584 let tags_lv = decode_tags_col(&lua, row.get(2).and_then(|s| s.as_deref()))?;
585 row_tbl.set("tags", tags_lv)?;
586 result_table.set(idx + 1, row_tbl)?;
587 }
588 }
589
590 Ok(LuaValue::Table(result_table))
591 }
592 })
593}
594
595// ── last ──────────────────────────────────────────────────────────────────────
596
597/// Create the `std.ts.last(series, tags?)` async function.
598///
599/// Returns the most-recent data point for `series` (optionally filtered by
600/// `tags` using the same AND-conjunction as `std.ts.query`).
601///
602/// Return value:
603/// - `nil` — no matching row found
604/// - `{ ts = <i64>, value = <decoded>, tags = <table or nil> }` — latest row
605///
606/// # Arguments
607///
608/// - `lua`: the Lua state
609/// - `conn`: shared SQLite connection
610///
611/// # Errors
612///
613/// Returns `LuaError` on tag key validation failure, Mutex poison, or rusqlite
614/// error.
615fn make_last(lua: &Lua, conn: Arc<Mutex<Connection>>) -> LuaResult<LuaFunction> {
616 lua.create_async_function(move |lua, (series, tags): (String, Option<LuaTable>)| {
617 let conn = Arc::clone(&conn);
618 async move {
619 tracing::trace!(series = %series, "ts.last");
620
621 // ── extract and validate tags filter ──────────────────────
622 let tags_filter: Vec<(String, String)> = match tags {
623 None => vec![],
624 Some(tbl) => {
625 let mut pairs = Vec::new();
626 for p in tbl.pairs::<String, LuaValue>() {
627 let (k, v) = p?;
628 validate_tag_key(&k)?;
629 let v_json = lua_to_json(&lua, v).map_err(LuaError::external)?;
630 let v_str = match &v_json {
631 serde_json::Value::String(s) => s.clone(),
632 other => serde_json::to_string(other).map_err(LuaError::external)?,
633 };
634 pairs.push((k, v_str));
635 }
636 pairs
637 }
638 };
639
640 let tag_keys: Vec<String> = tags_filter.iter().map(|(k, _)| k.clone()).collect();
641 let tag_values: Vec<String> = tags_filter.iter().map(|(_, v)| v.clone()).collect();
642
643 // Build tag_clauses for WHERE.
644 let tag_clauses: String = tag_keys
645 .iter()
646 .map(|k| format!(" AND json_extract(tags, '$.{k}') = ?"))
647 .collect();
648
649 let sql = format!(
650 "SELECT ts, value, tags FROM ts \
651 WHERE series = ? AND ts >= ? AND ts <= ?{tag_clauses} \
652 ORDER BY ts DESC, rowid DESC LIMIT 1"
653 );
654
655 let row_raw: Result<Option<(i64, String, Option<String>)>, String> =
656 tokio::task::spawn_blocking(move || {
657 let conn = conn
658 .lock()
659 .map_err(|e| format!("ts conn lock poisoned: {e}"))?;
660 let mut stmt = conn
661 .prepare(&sql)
662 .map_err(|e| format!("ts last prepare: {e}"))?;
663
664 let mut params: Vec<Box<dyn rusqlite::ToSql>> =
665 vec![Box::new(series), Box::new(i64::MIN), Box::new(i64::MAX)];
666 for v in tag_values {
667 params.push(Box::new(v));
668 }
669 let param_refs: Vec<&dyn rusqlite::ToSql> =
670 params.iter().map(|p| p.as_ref()).collect();
671
672 let mut rows = stmt
673 .query(param_refs.as_slice())
674 .map_err(|e| format!("ts last query: {e}"))?;
675
676 if let Some(row) = rows.next().map_err(|e| format!("ts last row: {e}"))? {
677 let ts_val: i64 = row.get(0).map_err(|e| format!("ts last ts col: {e}"))?;
678 let value_str: String =
679 row.get(1).map_err(|e| format!("ts last value col: {e}"))?;
680 let tags_str: Option<String> =
681 row.get(2).map_err(|e| format!("ts last tags col: {e}"))?;
682 Ok(Some((ts_val, value_str, tags_str)))
683 } else {
684 Ok(None)
685 }
686 })
687 .await
688 .map_err(|e| LuaError::external(format!("ts task: {e}")))?;
689
690 let row_opt = row_raw.map_err(|e| {
691 tracing::warn!(error = %e, "ts last failed");
692 LuaError::external(e)
693 })?;
694
695 match row_opt {
696 None => Ok(LuaValue::Nil),
697 Some((ts_val, value_str, tags_str)) => {
698 let row_tbl = lua.create_table()?;
699 row_tbl.set("ts", LuaValue::Integer(ts_val))?;
700
701 // Two-Phase decode: raw JSON string → serde_json::Value → LuaValue
702 let v_json: serde_json::Value =
703 serde_json::from_str(&value_str).map_err(LuaError::external)?;
704 let v_lv = json_to_lua(&lua, v_json)?;
705 row_tbl.set("value", v_lv)?;
706
707 let tags_lv = decode_tags_col(&lua, tags_str.as_deref())?;
708 row_tbl.set("tags", tags_lv)?;
709
710 Ok(LuaValue::Table(row_tbl))
711 }
712 }
713 }
714 })
715}
716
717// ── decode helpers ────────────────────────────────────────────────────────────
718
719/// Decode a SQLite `value` column (TEXT storing JSON) into a `LuaValue`.
720///
721/// Implements the Two-Phase deserialization pattern (Outline K-NEW):
722/// `String → serde_json::Value → LuaValue`. A NULL column (`None`) or a
723/// NULL aggregate result returns `LuaValue::Nil`.
724///
725/// # Arguments
726///
727/// - `lua`: the Lua state
728/// - `raw`: the raw column string, or `None` for SQL NULL
729///
730/// # Errors
731///
732/// Returns `LuaError` if the string is not valid JSON.
733fn decode_value_col(lua: &Lua, raw: Option<&str>) -> LuaResult<LuaValue> {
734 match raw {
735 None => Ok(LuaValue::Nil),
736 Some(s) => {
737 let v: serde_json::Value = serde_json::from_str(s).map_err(LuaError::external)?;
738 json_to_lua(lua, v)
739 }
740 }
741}
742
743/// Decode a SQLite `tags` column (TEXT storing a JSON object, or NULL) into a
744/// `LuaValue`.
745///
746/// NULL tags columns return `LuaValue::Nil` (row has no tags).
747///
748/// # Arguments
749///
750/// - `lua`: the Lua state
751/// - `raw`: the raw column string, or `None` for SQL NULL
752///
753/// # Errors
754///
755/// Returns `LuaError` if the string is not valid JSON.
756fn decode_tags_col(lua: &Lua, raw: Option<&str>) -> LuaResult<LuaValue> {
757 decode_value_col(lua, raw)
758}
759
760#[cfg(test)]
761mod tests {
762 use super::*;
763 use rusqlite::{params, Connection};
764
765 /// Same-ms INSERT order is preserved by `ORDER BY ts, rowid` (raw path).
766 ///
767 /// Inserts three rows sharing the same `ts` value into an in-memory SQLite
768 /// using the production DDL, runs the SQL produced by `build_query_sql`
769 /// (raw mode), and verifies the returned values match INSERT order exactly.
770 /// Also asserts that the generated SQL string contains the rowid tie-breaker.
771 ///
772 /// # Test categories
773 ///
774 /// - (T1) Happy path: normal INSERT + query flow with production DDL.
775 /// - (T2) Edge case: all rows share identical `ts` millisecond value.
776 /// - (T3) Regression guard: SQL string must contain `ORDER BY ts, rowid`.
777 #[test]
778 fn raw_path_same_ms_preserves_insert_order() {
779 // 1) Assert generated SQL contains the tie-breaker.
780 let sql = build_query_sql(None, None, &[], None, None).expect("build_query_sql");
781 assert!(
782 sql.contains("ORDER BY ts, rowid"),
783 "raw path SQL missing rowid tie-breaker: {sql}"
784 );
785
786 // 2) Execute against in-memory SQLite using production DDL.
787 // Safety: open_in_memory() only fails on internal SQLite allocation
788 // errors which cannot occur in a controlled test environment.
789 let conn = Connection::open_in_memory().expect("open in-memory sqlite");
790 conn.execute_batch(
791 "CREATE TABLE ts (series TEXT NOT NULL, ts INTEGER NOT NULL, \
792 tags TEXT, value TEXT NOT NULL); \
793 CREATE INDEX idx_ts_series_ts ON ts(series, ts);",
794 )
795 .expect("ddl");
796
797 // Insert three rows with identical ts=1000 ms.
798 conn.execute(
799 "INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
800 params!["s", 1000_i64, "\"first\""],
801 )
802 .expect("insert 1");
803 conn.execute(
804 "INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
805 params!["s", 1000_i64, "\"second\""],
806 )
807 .expect("insert 2");
808 conn.execute(
809 "INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
810 params!["s", 1000_i64, "\"third\""],
811 )
812 .expect("insert 3");
813
814 // Safety: prepare() only fails if SQL is malformed; sql is generated by
815 // build_query_sql which is already tested above.
816 let mut stmt = conn.prepare(&sql).expect("prepare");
817 let rows: Vec<String> = stmt
818 .query_map(params!["s", i64::MIN, i64::MAX], |r| r.get::<_, String>(1))
819 .expect("query")
820 .collect::<Result<Vec<_>, _>>()
821 .expect("collect");
822
823 assert_eq!(
824 rows,
825 vec![
826 "\"first\"".to_string(),
827 "\"second\"".to_string(),
828 "\"third\"".to_string()
829 ],
830 "raw path returned rows in non-INSERT order: {rows:?}"
831 );
832 }
833
834 /// Same-ms `last` returns the most recently INSERTed row.
835 ///
836 /// Verifies both the SQL string produced by `build_query_sql(Some("last"), ...)`
837 /// contains `ORDER BY ts DESC, rowid DESC LIMIT 1`, and that executing an
838 /// equivalent query against in-memory SQLite returns the last INSERT value
839 /// even when all rows share the same `ts` ms.
840 ///
841 /// # Test categories
842 ///
843 /// - (T1) Happy path: last value retrieval with production DDL.
844 /// - (T2) Edge case: all rows share identical `ts` millisecond value.
845 /// - (T3) Regression guard: SQL string must contain `ORDER BY ts DESC, rowid DESC LIMIT 1`.
846 #[test]
847 fn last_path_same_ms_returns_last_insert() {
848 // 1) Assert build_query_sql last form contains tie-breaker.
849 let sql_last =
850 build_query_sql(Some("last"), None, &[], None, None).expect("build_query_sql last");
851 assert!(
852 sql_last.contains("ORDER BY ts DESC, rowid DESC LIMIT 1"),
853 "last path SQL missing rowid DESC tie-breaker: {sql_last}"
854 );
855
856 // 2) Execute the make_last-equivalent SQL against in-memory SQLite.
857 // Safety: open_in_memory() only fails on internal SQLite allocation
858 // errors which cannot occur in a controlled test environment.
859 let conn = Connection::open_in_memory().expect("open in-memory sqlite");
860 conn.execute_batch(
861 "CREATE TABLE ts (series TEXT NOT NULL, ts INTEGER NOT NULL, \
862 tags TEXT, value TEXT NOT NULL); \
863 CREATE INDEX idx_ts_series_ts ON ts(series, ts);",
864 )
865 .expect("ddl");
866
867 for v in ["\"first\"", "\"second\"", "\"third\""] {
868 conn.execute(
869 "INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
870 params!["s", 1000_i64, v],
871 )
872 .expect("insert");
873 }
874
875 // make_last inline SQL form (mirrors src/bridge/ts.rs post-fix).
876 // This string is the expected form after the rowid tie-breaker fix;
877 // if make_last's format! drifts, the behaviour test below will catch it.
878 let make_last_sql = "SELECT ts, value, tags FROM ts \
879 WHERE series = ? AND ts >= ? AND ts <= ? \
880 ORDER BY ts DESC, rowid DESC LIMIT 1";
881
882 // Safety: make_last_sql is a literal string, prepare() cannot fail here.
883 let mut stmt = conn.prepare(make_last_sql).expect("prepare");
884 let value: String = stmt
885 .query_row(params!["s", i64::MIN, i64::MAX], |r| r.get::<_, String>(1))
886 .expect("query_row");
887
888 assert_eq!(
889 value, "\"third\"",
890 "last path returned non-last INSERT value: {value}"
891 );
892 }
893}