Skip to main content

agent_block_core/bridge/
ts.rs

1//! `std.ts.*` — SQLite-backed time-series primitive.
2//!
3//! This is the only storage bridge whose implementation is in-tree
4//! (mlua_batteries provides no TSDB module). DDL / append / query /
5//! last all live in this file.
6//!
7//! Backend: single `ts` table in `ts.sqlite` (or `:memory:`).
8//!
9//! Schema:
10//! ```sql
11//! CREATE TABLE IF NOT EXISTS ts (
12//!     series TEXT NOT NULL,
13//!     ts     INTEGER NOT NULL,
14//!     tags   TEXT,
15//!     value  TEXT NOT NULL
16//! );
17//! CREATE INDEX IF NOT EXISTS idx_ts_series_ts ON ts(series, ts);
18//! ```
19//!
20//! Column notes:
21//! - `series`: logical stream name (e.g. `"cpu_load"`, `"agent_events"`)
22//! - `ts`: Unix timestamp in milliseconds (i64)
23//! - `tags`: JSON object (`{"task": "X", "phase": "Y"}`) or NULL; filtered
24//!   via `json_extract` in queries — never compared as a serialised string
25//! - `value`: JSON-encoded payload; accepts both JSON numbers and JSON objects
26//!   so that callers can append plain numeric metrics or structured MCP
27//!   envelope payloads without loss (dual-type contract, Crux §3.8 C1)
28//!
29//! See `bridge/config.rs` for the ENV → path mapping (`AGENT_BLOCK_TS_PATH`).
30
31use std::sync::{Arc, Mutex};
32use std::time::{SystemTime, UNIX_EPOCH};
33
34use mlua::prelude::*;
35use rusqlite::Connection;
36
37use crate::bridge::{json_to_lua, lua_to_json};
38use crate::host::HostContext;
39
40// ── helpers ──────────────────────────────────────────────────────────────────
41
42/// Validate that every tag key contains only ASCII alphanumeric characters or
43/// underscores, guarding against SQL injection via format-string tag paths.
44///
45/// # Arguments
46///
47/// - `key`: the tag key to validate
48///
49/// # Errors
50///
51/// Returns a `LuaError` if `key` contains any character outside `[a-zA-Z0-9_]`.
52fn validate_tag_key(key: &str) -> LuaResult<()> {
53    if key.chars().all(|c| c.is_ascii_alphanumeric() || c == '_') {
54        Ok(())
55    } else {
56        Err(LuaError::external(
57            "ts tag key must be [a-zA-Z0-9_]+".to_string(),
58        ))
59    }
60}
61
62/// Build the SQL query string for `std.ts.query`.
63///
64/// Constructs a parameterised SQL string for one of three query shapes:
65/// - **raw** (`agg = None`): `SELECT ts, value, tags … ORDER BY ts, rowid LIMIT ? OFFSET ?`
66/// - **single-aggregate** (`agg = Some(_)`, `bucket_ms = None`):
67///   `SELECT <AGG_EXPR> FROM ts WHERE …` (single row, no LIMIT/OFFSET)
68/// - **time-bucketed** (`agg = Some(_)`, `bucket_ms = Some(_)`):
69///   `SELECT (ts/?)*? AS bucket_ts, <AGG_EXPR> … GROUP BY bucket_ts … LIMIT ? OFFSET ?`
70///
71/// The returned string uses positional `?` placeholders. The binding order is:
72/// `series, from_ts, to_ts, [tag_values…], [bucket_ms, bucket_ms], [limit, offset]`.
73///
74/// # Arguments
75///
76/// - `agg`: optional aggregation function name (`"count"`, `"sum"`, `"avg"`, `"last"`)
77/// - `bucket_ms`: optional bucket width in milliseconds (> 0)
78/// - `tag_keys`: ordered list of tag keys for the AND-filter; paths become
79///   `json_extract(tags, '$.<key>')` placeholders
80/// - `limit`: optional maximum row count (`>= 0`)
81/// - `offset`: optional row skip count (`>= 0`)
82///
83/// # Errors
84///
85/// Returns `Err(String)` for an unrecognised aggregation function name.
86fn build_query_sql(
87    agg: Option<&str>,
88    bucket_ms: Option<i64>,
89    tag_keys: &[String],
90    limit: Option<i64>,
91    offset: Option<i64>,
92) -> Result<String, String> {
93    // Build the shared WHERE clause fragment (after series / ts range filter).
94    // Each tag key adds one `AND json_extract(tags, '$.<key>') = ?` clause.
95    // This implements the Crux C2 conjunction contract: every k/v pair in
96    // opts.tags is evaluated independently via json_extract, never as a
97    // single serialised-string equality match.
98    let tag_clauses: String = tag_keys
99        .iter()
100        .map(|k| format!(" AND json_extract(tags, '$.{k}') = ?"))
101        .collect();
102
103    let where_clause = format!("WHERE series = ? AND ts >= ? AND ts <= ?{tag_clauses}");
104
105    // Helper to append LIMIT / OFFSET fragments (not used in single-agg mode).
106    let limit_clause = match (limit, offset) {
107        (Some(l), Some(o)) => format!(" LIMIT {l} OFFSET {o}"),
108        (Some(l), None) => format!(" LIMIT {l}"),
109        (None, Some(o)) => format!(" LIMIT -1 OFFSET {o}"),
110        (None, None) => String::new(),
111    };
112
113    match agg {
114        // ── path 1: raw rows ──────────────────────────────────────────────
115        None => {
116            let sql = format!(
117                "SELECT ts, value, tags FROM ts {where_clause} ORDER BY ts, rowid{limit_clause}"
118            );
119            Ok(sql)
120        }
121
122        // ── path 2 / 3: aggregate ─────────────────────────────────────────
123        Some(agg_name) => {
124            // Build the aggregate expression.  Note that agg="last" is
125            // special: it is not a SQL aggregate function but an ORDER+LIMIT
126            // operation.  For the time-bucketed case (path 3) we use MAX(ts)
127            // per bucket to identify the latest row, which requires the
128            // caller to do a second fetch (or we use a subquery).  For
129            // simplicity the bucketed "last" uses MAX(ts) as a proxy for the
130            // last timestamp — callers needing the actual value should use
131            // a separate query.  For single-agg "last" we use a full
132            // ORDER BY ts DESC LIMIT 1 subquery form.
133            let agg_expr: &str = match agg_name {
134                "count" => "COUNT(*)",
135                "sum" => "SUM(CAST(value AS REAL))",
136                "avg" => "AVG(CAST(value AS REAL))",
137                "last" => {
138                    // handled specially per path below
139                    "last"
140                }
141                other => return Err(format!("unknown agg: {other}")),
142            };
143
144            match bucket_ms {
145                // ── path 2: single aggregate (no bucket) ─────────────────
146                None => {
147                    if agg_name == "last" {
148                        // agg="last" + no bucket → ORDER BY ts DESC LIMIT 1
149                        let sql = format!(
150                            "SELECT value, tags, ts FROM ts {where_clause} ORDER BY ts DESC, rowid DESC LIMIT 1"
151                        );
152                        Ok(sql)
153                    } else {
154                        let sql = format!("SELECT {agg_expr} FROM ts {where_clause}");
155                        Ok(sql)
156                    }
157                }
158
159                // ── path 3: time-bucketed aggregate ───────────────────────
160                Some(bms) => {
161                    // bucket_ts = (ts / bucket_ms) * bucket_ms  (integer division)
162                    // The bucket_ms literal is embedded directly into the SQL
163                    // string (safe: validated positive i64, no user-supplied
164                    // string content).  Embedding avoids the parameter-ordering
165                    // issue that arises when SELECT-clause `?` placeholders
166                    // appear before WHERE-clause `?` placeholders — SQLite
167                    // positional binding fills them left-to-right, so any `?`
168                    // in the SELECT would consume series/from_ts/to_ts params.
169                    let bucketed_agg_expr = if agg_name == "last" {
170                        // For bucketed "last" we return the maximum ts in
171                        // each bucket as a proxy for the last value.
172                        // CAST(value AS REAL) would not be meaningful here;
173                        // instead we expose MAX(ts) for the bucket boundary.
174                        "MAX(ts)".to_string()
175                    } else {
176                        agg_expr.to_string()
177                    };
178
179                    let sql = format!(
180                        "SELECT (ts / {bms}) * {bms} AS bucket_ts, {bucketed_agg_expr} AS agg_value \
181                         FROM ts {where_clause} \
182                         GROUP BY bucket_ts ORDER BY bucket_ts{limit_clause}"
183                    );
184                    Ok(sql)
185                }
186            }
187        }
188    }
189}
190
191// ── registration ─────────────────────────────────────────────────────────────
192
193/// Register the `std.ts` bridge into `lua`.
194///
195/// On first call this function:
196/// 1. Acquires the ts SQLite connection and runs the DDL (idempotent —
197///    `CREATE TABLE IF NOT EXISTS` / `CREATE INDEX IF NOT EXISTS`).
198/// 2. Installs `std.ts.append`, `std.ts.query`, and `std.ts.last` as async
199///    Lua functions.
200/// 3. Loads `ts_tools.lua` to provide `std.ts.register_tools`.
201///
202/// # Arguments
203///
204/// - `lua`: the Lua state to register into (main Isle or handler Isle)
205/// - `ctx`: host context providing `ts_conn` (Arc<Mutex<Connection>>)
206///
207/// # Errors
208///
209/// Returns a `LuaError` if:
210/// - the Mutex is poisoned (`ts conn lock poisoned`)
211/// - the DDL `execute_batch` fails (`ts DDL: <rusqlite error>`)
212/// - the `std` global is not a table or any `std.ts` assignment fails
213pub fn register(lua: &Lua, ctx: &HostContext) -> LuaResult<()> {
214    // ── DDL init ─────────────────────────────────────────────────────────
215    let conn = ctx.ts_conn.lock().map_err(|e| {
216        tracing::warn!(error = %e, "ts conn lock poisoned during DDL");
217        LuaError::external(format!("ts conn lock poisoned: {e}"))
218    })?;
219
220    conn.execute_batch(
221        "CREATE TABLE IF NOT EXISTS ts \
222         (series TEXT NOT NULL, ts INTEGER NOT NULL, \
223          tags TEXT, value TEXT NOT NULL); \
224         CREATE INDEX IF NOT EXISTS idx_ts_series_ts ON ts(series, ts);",
225    )
226    .map_err(|e| {
227        tracing::warn!(error = %e, "ts ddl failed");
228        LuaError::external(format!("ts DDL: {e}"))
229    })?;
230
231    drop(conn);
232
233    // ── Build std.ts table ────────────────────────────────────────────────
234    let ts_tbl = lua.create_table()?;
235
236    // ── std.ts.append ─────────────────────────────────────────────────────
237    ts_tbl.set("append", make_append(lua, Arc::clone(&ctx.ts_conn))?)?;
238
239    // ── std.ts.query ──────────────────────────────────────────────────────
240    ts_tbl.set("query", make_query(lua, Arc::clone(&ctx.ts_conn))?)?;
241
242    // ── std.ts.last ───────────────────────────────────────────────────────
243    ts_tbl.set("last", make_last(lua, Arc::clone(&ctx.ts_conn))?)?;
244
245    // ── Install into std global ───────────────────────────────────────────
246    let std_table: LuaTable = lua.globals().get("std")?;
247    std_table.set("ts", ts_tbl)?;
248
249    // ── Load ts_tools.lua (std.ts.register_tools) ─────────────────────────
250    lua.load(include_str!("ts_tools.lua"))
251        .set_name("std.ts.register_tools")
252        .exec()?;
253
254    Ok(())
255}
256
257// ── append ────────────────────────────────────────────────────────────────────
258
259/// Create the `std.ts.append(series, value, tags?, at?)` async function.
260///
261/// # Arguments
262///
263/// - `lua`: the Lua state
264/// - `conn`: shared SQLite connection
265///
266/// # Errors
267///
268/// Returns `LuaError` on Mutex poison, rusqlite error, or JSON encode error.
269fn make_append(lua: &Lua, conn: Arc<Mutex<Connection>>) -> LuaResult<LuaFunction> {
270    lua.create_async_function(
271        move |lua, (series, value, tags, at): (String, LuaValue, Option<LuaTable>, Option<i64>)| {
272            let conn = Arc::clone(&conn);
273            async move {
274                tracing::trace!(series = %series, "ts.append");
275
276                // ── resolve timestamp ─────────────────────────────────────
277                // unwrap_or_default: UNIX_EPOCH-before fallback → Duration::ZERO.
278                // This is a safe fallback path, not an unguarded unwrap.
279                let ts_ms = at.unwrap_or_else(|| {
280                    SystemTime::now()
281                        .duration_since(UNIX_EPOCH)
282                        .unwrap_or_default()
283                        .as_millis() as i64
284                });
285
286                // ── encode value before entering spawn_blocking ───────────
287                // lua_to_json requires &Lua (Lua VM access) and cannot be
288                // called inside spawn_blocking.  Serialise to String here
289                // in the async context, then move the String into the closure.
290                let value_json = lua_to_json(&lua, value).map_err(LuaError::external)?;
291                let value_str = serde_json::to_string(&value_json).map_err(LuaError::external)?;
292
293                // ── encode tags ───────────────────────────────────────────
294                let tags_str: Option<String> = match tags {
295                    None => None,
296                    Some(tbl) => {
297                        // Validate all tag keys before encoding.
298                        for pair in tbl.clone().pairs::<String, LuaValue>() {
299                            let (k, _) = pair?;
300                            validate_tag_key(&k)?;
301                        }
302                        let tags_json =
303                            lua_to_json(&lua, LuaValue::Table(tbl)).map_err(LuaError::external)?;
304                        Some(serde_json::to_string(&tags_json).map_err(LuaError::external)?)
305                    }
306                };
307
308                // ── blocking SQLite insert ────────────────────────────────
309                let result = tokio::task::spawn_blocking(move || {
310                    let conn = conn
311                        .lock()
312                        .map_err(|e| format!("ts conn lock poisoned: {e}"))?;
313                    conn.execute(
314                        "INSERT INTO ts (series, ts, tags, value) VALUES (?1, ?2, ?3, ?4)",
315                        rusqlite::params![series, ts_ms, tags_str, value_str],
316                    )
317                    .map_err(|e| format!("ts append: {e}"))?;
318                    Ok::<(), String>(())
319                })
320                .await
321                .map_err(|e| LuaError::external(format!("ts task: {e}")))?;
322
323                result.map_err(|e| {
324                    tracing::warn!(error = %e, "ts append failed");
325                    LuaError::external(e)
326                })?;
327
328                Ok(LuaValue::Nil)
329            }
330        },
331    )
332}
333
334// ── query ─────────────────────────────────────────────────────────────────────
335
336/// Create the `std.ts.query(series, opts)` async function.
337///
338/// `opts` fields (all optional):
339/// - `from` (integer): start timestamp ms, default `i64::MIN`
340/// - `to` (integer): end timestamp ms, default `i64::MAX`
341/// - `tags` (table): AND-filter; each k/v pair becomes a `json_extract` clause
342/// - `agg` (string): aggregation — `"count"` | `"sum"` | `"avg"` | `"last"`
343/// - `bucket_ms` (integer > 0): bucket width; requires `agg`
344/// - `limit` (integer >= 0): maximum result rows
345/// - `offset` (integer >= 0): result rows to skip
346///
347/// Returns a Lua array of row tables:
348/// - raw mode: `{ ts, value, tags }`
349/// - single-agg (agg, no bucket): `{ value }` (scalar result)
350/// - bucketed-agg: `{ bucket_ts, value }`
351///
352/// Note: `sum`/`avg` treat `value` as a JSON number via `CAST(value AS REAL)`.
353/// Rows whose `value` is a JSON object produce `0.0` in SQLite's CAST — prefer
354/// number-only series when using `sum`/`avg`.
355///
356/// # Errors
357///
358/// Returns `LuaError` on validation failure, Mutex poison, or rusqlite error.
359fn make_query(lua: &Lua, conn: Arc<Mutex<Connection>>) -> LuaResult<LuaFunction> {
360    lua.create_async_function(move |lua, (series, opts): (String, Option<LuaTable>)| {
361        let conn = Arc::clone(&conn);
362        async move {
363            tracing::trace!(series = %series, "ts.query");
364
365            // ── parse opts ────────────────────────────────────────────
366            let from_ts: i64 = opts
367                .as_ref()
368                .and_then(|t| t.get::<Option<i64>>("from").ok().flatten())
369                .unwrap_or(i64::MIN);
370            let to_ts: i64 = opts
371                .as_ref()
372                .and_then(|t| t.get::<Option<i64>>("to").ok().flatten())
373                .unwrap_or(i64::MAX);
374
375            let agg: Option<String> = opts
376                .as_ref()
377                .and_then(|t| t.get::<Option<String>>("agg").ok().flatten());
378
379            let bucket_ms: Option<i64> = opts
380                .as_ref()
381                .and_then(|t| t.get::<Option<i64>>("bucket_ms").ok().flatten());
382
383            let limit: Option<i64> = opts
384                .as_ref()
385                .and_then(|t| t.get::<Option<i64>>("limit").ok().flatten());
386
387            let offset: Option<i64> = opts
388                .as_ref()
389                .and_then(|t| t.get::<Option<i64>>("offset").ok().flatten());
390
391            // ── validate opts ─────────────────────────────────────────
392            if let Some(bms) = bucket_ms {
393                if bms <= 0 {
394                    return Err(LuaError::external(
395                        "ts bucket_ms must be positive".to_string(),
396                    ));
397                }
398                if agg.is_none() {
399                    return Err(LuaError::external("ts bucket_ms requires agg".to_string()));
400                }
401            }
402            if let Some(l) = limit {
403                if l < 0 {
404                    return Err(LuaError::external("ts opts.limit must be >= 0".to_string()));
405                }
406            }
407            if let Some(o) = offset {
408                if o < 0 {
409                    return Err(LuaError::external(
410                        "ts opts.offset must be >= 0".to_string(),
411                    ));
412                }
413            }
414
415            // ── extract and validate tags filter ──────────────────────
416            // Tags k/v pairs are collected into (key, json_string) pairs
417            // before entering spawn_blocking so we can access the Lua VM.
418            let tags_filter: Vec<(String, String)> = match opts
419                .as_ref()
420                .and_then(|t| t.get::<Option<LuaTable>>("tags").ok().flatten())
421            {
422                None => vec![],
423                Some(tbl) => {
424                    let mut pairs = Vec::new();
425                    for p in tbl.pairs::<String, LuaValue>() {
426                        let (k, v) = p?;
427                        validate_tag_key(&k)?;
428                        // Encode tag value as a JSON string for comparison.
429                        let v_json = lua_to_json(&lua, v).map_err(LuaError::external)?;
430                        let v_str = match &v_json {
431                            serde_json::Value::String(s) => s.clone(),
432                            other => serde_json::to_string(other).map_err(LuaError::external)?,
433                        };
434                        pairs.push((k, v_str));
435                    }
436                    pairs
437                }
438            };
439
440            let tag_keys: Vec<String> = tags_filter.iter().map(|(k, _)| k.clone()).collect();
441            let tag_values: Vec<String> = tags_filter.iter().map(|(_, v)| v.clone()).collect();
442
443            // ── build SQL ─────────────────────────────────────────────
444            let agg_ref = agg.as_deref();
445            let sql = build_query_sql(agg_ref, bucket_ms, &tag_keys, limit, offset)
446                .map_err(LuaError::external)?;
447
448            // ── execute in blocking thread ────────────────────────────
449            let is_single_agg = agg.is_some() && bucket_ms.is_none();
450            let is_last_single = agg.as_deref() == Some("last") && bucket_ms.is_none();
451            let is_bucketed = agg.is_some() && bucket_ms.is_some();
452
453            let rows_raw: Result<Vec<Vec<Option<String>>>, String> =
454                tokio::task::spawn_blocking(move || {
455                    let conn = conn
456                        .lock()
457                        .map_err(|e| format!("ts conn lock poisoned: {e}"))?;
458
459                    let mut stmt = conn
460                        .prepare(&sql)
461                        .map_err(|e| format!("ts query prepare: {e}"))?;
462
463                    // Build the parameter list dynamically.
464                    // Order: series, from_ts, to_ts, [tag_values…], [bucket_ms × 2 if bucketed]
465                    let mut params: Vec<Box<dyn rusqlite::ToSql>> =
466                        vec![Box::new(series), Box::new(from_ts), Box::new(to_ts)];
467                    for v in tag_values {
468                        params.push(Box::new(v));
469                    }
470                    // Note: bucket_ms is embedded as a literal in the SQL
471                    // (see build_query_sql path 3), so no additional params
472                    // are needed for the bucketed-aggregate case.
473
474                    let param_refs: Vec<&dyn rusqlite::ToSql> =
475                        params.iter().map(|p| p.as_ref()).collect();
476
477                    let col_count = stmt.column_count();
478                    let rows: Vec<Vec<Option<String>>> = stmt
479                        .query(param_refs.as_slice())
480                        .map_err(|e| format!("ts query exec: {e}"))?
481                        .mapped(|row| {
482                            let mut cols = Vec::with_capacity(col_count);
483                            for i in 0..col_count {
484                                // Use Value (not String) to handle INTEGER and REAL columns.
485                                // rusqlite's FromSql for String only accepts Text/Blob and
486                                // returns FromSqlError::InvalidType for INTEGER or REAL values
487                                // (e.g. the `ts` column, COUNT(*), SUM, AVG, bucket_ts).
488                                let v: rusqlite::types::Value =
489                                    row.get::<_, rusqlite::types::Value>(i)?;
490                                let s = match v {
491                                    rusqlite::types::Value::Null => None,
492                                    rusqlite::types::Value::Integer(n) => Some(n.to_string()),
493                                    rusqlite::types::Value::Real(f) => Some(f.to_string()),
494                                    rusqlite::types::Value::Text(s) => Some(s),
495                                    rusqlite::types::Value::Blob(_) => None,
496                                };
497                                cols.push(s);
498                            }
499                            Ok(cols)
500                        })
501                        .collect::<Result<_, _>>()
502                        .map_err(|e| format!("ts query row: {e}"))?;
503
504                    Ok(rows)
505                })
506                .await
507                .map_err(|e| LuaError::external(format!("ts task: {e}")))?;
508
509            let rows_raw = rows_raw.map_err(|e| {
510                tracing::warn!(error = %e, "ts query failed");
511                LuaError::external(e)
512            })?;
513
514            // ── decode rows into Lua ───────────────────────────────────
515            // Column layout depends on the query path:
516            //   raw:         [ts(i64), value(text), tags(text|null)]
517            //   single-agg:  [agg_value(text|null)]  (or [value,tags,ts] for last)
518            //   bucketed:    [bucket_ts(i64), agg_value(text|null)]
519            let result_table = lua.create_table()?;
520
521            if is_last_single {
522                // path 2 agg=last: columns are [value, tags, ts]
523                // Returns at most 1 row; wrap as single-element array for
524                // consistency with other agg modes.
525                for (idx, row) in rows_raw.iter().enumerate() {
526                    let row_tbl = lua.create_table()?;
527                    // col 0: value TEXT
528                    let value_lv = decode_value_col(&lua, row.first().and_then(|s| s.as_deref()))?;
529                    row_tbl.set("value", value_lv)?;
530                    // col 1: tags TEXT|NULL
531                    let tags_lv = decode_tags_col(&lua, row.get(1).and_then(|s| s.as_deref()))?;
532                    row_tbl.set("tags", tags_lv)?;
533                    // col 2: ts INTEGER
534                    let ts_lv = if let Some(Some(s)) = row.get(2) {
535                        let n: i64 = s.parse().map_err(LuaError::external)?;
536                        LuaValue::Integer(n)
537                    } else {
538                        LuaValue::Nil
539                    };
540                    row_tbl.set("ts", ts_lv)?;
541                    result_table.set(idx + 1, row_tbl)?;
542                }
543            } else if is_single_agg {
544                // path 2 (non-last): single column [agg_value]
545                for (idx, row) in rows_raw.iter().enumerate() {
546                    let row_tbl = lua.create_table()?;
547                    let agg_lv = decode_value_col(&lua, row.first().and_then(|s| s.as_deref()))?;
548                    row_tbl.set("value", agg_lv)?;
549                    result_table.set(idx + 1, row_tbl)?;
550                }
551            } else if is_bucketed {
552                // path 3: columns [bucket_ts, agg_value]
553                for (idx, row) in rows_raw.iter().enumerate() {
554                    let row_tbl = lua.create_table()?;
555                    // col 0: bucket_ts INTEGER (from (ts/?)*? expression)
556                    let bts_lv = if let Some(Some(s)) = row.first() {
557                        let n: i64 = s.parse().map_err(LuaError::external)?;
558                        LuaValue::Integer(n)
559                    } else {
560                        LuaValue::Nil
561                    };
562                    row_tbl.set("bucket_ts", bts_lv)?;
563                    // col 1: agg_value
564                    let agg_lv = decode_value_col(&lua, row.get(1).and_then(|s| s.as_deref()))?;
565                    row_tbl.set("value", agg_lv)?;
566                    result_table.set(idx + 1, row_tbl)?;
567                }
568            } else {
569                // path 1: raw rows [ts, value, tags]
570                for (idx, row) in rows_raw.iter().enumerate() {
571                    let row_tbl = lua.create_table()?;
572                    // col 0: ts INTEGER
573                    let ts_lv = if let Some(Some(s)) = row.first() {
574                        let n: i64 = s.parse().map_err(LuaError::external)?;
575                        LuaValue::Integer(n)
576                    } else {
577                        LuaValue::Nil
578                    };
579                    row_tbl.set("ts", ts_lv)?;
580                    // col 1: value TEXT
581                    let value_lv = decode_value_col(&lua, row.get(1).and_then(|s| s.as_deref()))?;
582                    row_tbl.set("value", value_lv)?;
583                    // col 2: tags TEXT|NULL
584                    let tags_lv = decode_tags_col(&lua, row.get(2).and_then(|s| s.as_deref()))?;
585                    row_tbl.set("tags", tags_lv)?;
586                    result_table.set(idx + 1, row_tbl)?;
587                }
588            }
589
590            Ok(LuaValue::Table(result_table))
591        }
592    })
593}
594
595// ── last ──────────────────────────────────────────────────────────────────────
596
597/// Create the `std.ts.last(series, tags?)` async function.
598///
599/// Returns the most-recent data point for `series` (optionally filtered by
600/// `tags` using the same AND-conjunction as `std.ts.query`).
601///
602/// Return value:
603/// - `nil` — no matching row found
604/// - `{ ts = <i64>, value = <decoded>, tags = <table or nil> }` — latest row
605///
606/// # Arguments
607///
608/// - `lua`: the Lua state
609/// - `conn`: shared SQLite connection
610///
611/// # Errors
612///
613/// Returns `LuaError` on tag key validation failure, Mutex poison, or rusqlite
614/// error.
615fn make_last(lua: &Lua, conn: Arc<Mutex<Connection>>) -> LuaResult<LuaFunction> {
616    lua.create_async_function(move |lua, (series, tags): (String, Option<LuaTable>)| {
617        let conn = Arc::clone(&conn);
618        async move {
619            tracing::trace!(series = %series, "ts.last");
620
621            // ── extract and validate tags filter ──────────────────────
622            let tags_filter: Vec<(String, String)> = match tags {
623                None => vec![],
624                Some(tbl) => {
625                    let mut pairs = Vec::new();
626                    for p in tbl.pairs::<String, LuaValue>() {
627                        let (k, v) = p?;
628                        validate_tag_key(&k)?;
629                        let v_json = lua_to_json(&lua, v).map_err(LuaError::external)?;
630                        let v_str = match &v_json {
631                            serde_json::Value::String(s) => s.clone(),
632                            other => serde_json::to_string(other).map_err(LuaError::external)?,
633                        };
634                        pairs.push((k, v_str));
635                    }
636                    pairs
637                }
638            };
639
640            let tag_keys: Vec<String> = tags_filter.iter().map(|(k, _)| k.clone()).collect();
641            let tag_values: Vec<String> = tags_filter.iter().map(|(_, v)| v.clone()).collect();
642
643            // Build tag_clauses for WHERE.
644            let tag_clauses: String = tag_keys
645                .iter()
646                .map(|k| format!(" AND json_extract(tags, '$.{k}') = ?"))
647                .collect();
648
649            let sql = format!(
650                "SELECT ts, value, tags FROM ts \
651                 WHERE series = ? AND ts >= ? AND ts <= ?{tag_clauses} \
652                 ORDER BY ts DESC, rowid DESC LIMIT 1"
653            );
654
655            let row_raw: Result<Option<(i64, String, Option<String>)>, String> =
656                tokio::task::spawn_blocking(move || {
657                    let conn = conn
658                        .lock()
659                        .map_err(|e| format!("ts conn lock poisoned: {e}"))?;
660                    let mut stmt = conn
661                        .prepare(&sql)
662                        .map_err(|e| format!("ts last prepare: {e}"))?;
663
664                    let mut params: Vec<Box<dyn rusqlite::ToSql>> =
665                        vec![Box::new(series), Box::new(i64::MIN), Box::new(i64::MAX)];
666                    for v in tag_values {
667                        params.push(Box::new(v));
668                    }
669                    let param_refs: Vec<&dyn rusqlite::ToSql> =
670                        params.iter().map(|p| p.as_ref()).collect();
671
672                    let mut rows = stmt
673                        .query(param_refs.as_slice())
674                        .map_err(|e| format!("ts last query: {e}"))?;
675
676                    if let Some(row) = rows.next().map_err(|e| format!("ts last row: {e}"))? {
677                        let ts_val: i64 = row.get(0).map_err(|e| format!("ts last ts col: {e}"))?;
678                        let value_str: String =
679                            row.get(1).map_err(|e| format!("ts last value col: {e}"))?;
680                        let tags_str: Option<String> =
681                            row.get(2).map_err(|e| format!("ts last tags col: {e}"))?;
682                        Ok(Some((ts_val, value_str, tags_str)))
683                    } else {
684                        Ok(None)
685                    }
686                })
687                .await
688                .map_err(|e| LuaError::external(format!("ts task: {e}")))?;
689
690            let row_opt = row_raw.map_err(|e| {
691                tracing::warn!(error = %e, "ts last failed");
692                LuaError::external(e)
693            })?;
694
695            match row_opt {
696                None => Ok(LuaValue::Nil),
697                Some((ts_val, value_str, tags_str)) => {
698                    let row_tbl = lua.create_table()?;
699                    row_tbl.set("ts", LuaValue::Integer(ts_val))?;
700
701                    // Two-Phase decode: raw JSON string → serde_json::Value → LuaValue
702                    let v_json: serde_json::Value =
703                        serde_json::from_str(&value_str).map_err(LuaError::external)?;
704                    let v_lv = json_to_lua(&lua, v_json)?;
705                    row_tbl.set("value", v_lv)?;
706
707                    let tags_lv = decode_tags_col(&lua, tags_str.as_deref())?;
708                    row_tbl.set("tags", tags_lv)?;
709
710                    Ok(LuaValue::Table(row_tbl))
711                }
712            }
713        }
714    })
715}
716
717// ── decode helpers ────────────────────────────────────────────────────────────
718
719/// Decode a SQLite `value` column (TEXT storing JSON) into a `LuaValue`.
720///
721/// Implements the Two-Phase deserialization pattern (Outline K-NEW):
722/// `String → serde_json::Value → LuaValue`.  A NULL column (`None`) or a
723/// NULL aggregate result returns `LuaValue::Nil`.
724///
725/// # Arguments
726///
727/// - `lua`: the Lua state
728/// - `raw`: the raw column string, or `None` for SQL NULL
729///
730/// # Errors
731///
732/// Returns `LuaError` if the string is not valid JSON.
733fn decode_value_col(lua: &Lua, raw: Option<&str>) -> LuaResult<LuaValue> {
734    match raw {
735        None => Ok(LuaValue::Nil),
736        Some(s) => {
737            let v: serde_json::Value = serde_json::from_str(s).map_err(LuaError::external)?;
738            json_to_lua(lua, v)
739        }
740    }
741}
742
743/// Decode a SQLite `tags` column (TEXT storing a JSON object, or NULL) into a
744/// `LuaValue`.
745///
746/// NULL tags columns return `LuaValue::Nil` (row has no tags).
747///
748/// # Arguments
749///
750/// - `lua`: the Lua state
751/// - `raw`: the raw column string, or `None` for SQL NULL
752///
753/// # Errors
754///
755/// Returns `LuaError` if the string is not valid JSON.
756fn decode_tags_col(lua: &Lua, raw: Option<&str>) -> LuaResult<LuaValue> {
757    decode_value_col(lua, raw)
758}
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763    use rusqlite::{params, Connection};
764
765    /// Same-ms INSERT order is preserved by `ORDER BY ts, rowid` (raw path).
766    ///
767    /// Inserts three rows sharing the same `ts` value into an in-memory SQLite
768    /// using the production DDL, runs the SQL produced by `build_query_sql`
769    /// (raw mode), and verifies the returned values match INSERT order exactly.
770    /// Also asserts that the generated SQL string contains the rowid tie-breaker.
771    ///
772    /// # Test categories
773    ///
774    /// - (T1) Happy path: normal INSERT + query flow with production DDL.
775    /// - (T2) Edge case: all rows share identical `ts` millisecond value.
776    /// - (T3) Regression guard: SQL string must contain `ORDER BY ts, rowid`.
777    #[test]
778    fn raw_path_same_ms_preserves_insert_order() {
779        // 1) Assert generated SQL contains the tie-breaker.
780        let sql = build_query_sql(None, None, &[], None, None).expect("build_query_sql");
781        assert!(
782            sql.contains("ORDER BY ts, rowid"),
783            "raw path SQL missing rowid tie-breaker: {sql}"
784        );
785
786        // 2) Execute against in-memory SQLite using production DDL.
787        // Safety: open_in_memory() only fails on internal SQLite allocation
788        // errors which cannot occur in a controlled test environment.
789        let conn = Connection::open_in_memory().expect("open in-memory sqlite");
790        conn.execute_batch(
791            "CREATE TABLE ts (series TEXT NOT NULL, ts INTEGER NOT NULL, \
792             tags TEXT, value TEXT NOT NULL); \
793             CREATE INDEX idx_ts_series_ts ON ts(series, ts);",
794        )
795        .expect("ddl");
796
797        // Insert three rows with identical ts=1000 ms.
798        conn.execute(
799            "INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
800            params!["s", 1000_i64, "\"first\""],
801        )
802        .expect("insert 1");
803        conn.execute(
804            "INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
805            params!["s", 1000_i64, "\"second\""],
806        )
807        .expect("insert 2");
808        conn.execute(
809            "INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
810            params!["s", 1000_i64, "\"third\""],
811        )
812        .expect("insert 3");
813
814        // Safety: prepare() only fails if SQL is malformed; sql is generated by
815        // build_query_sql which is already tested above.
816        let mut stmt = conn.prepare(&sql).expect("prepare");
817        let rows: Vec<String> = stmt
818            .query_map(params!["s", i64::MIN, i64::MAX], |r| r.get::<_, String>(1))
819            .expect("query")
820            .collect::<Result<Vec<_>, _>>()
821            .expect("collect");
822
823        assert_eq!(
824            rows,
825            vec![
826                "\"first\"".to_string(),
827                "\"second\"".to_string(),
828                "\"third\"".to_string()
829            ],
830            "raw path returned rows in non-INSERT order: {rows:?}"
831        );
832    }
833
834    /// Same-ms `last` returns the most recently INSERTed row.
835    ///
836    /// Verifies both the SQL string produced by `build_query_sql(Some("last"), ...)`
837    /// contains `ORDER BY ts DESC, rowid DESC LIMIT 1`, and that executing an
838    /// equivalent query against in-memory SQLite returns the last INSERT value
839    /// even when all rows share the same `ts` ms.
840    ///
841    /// # Test categories
842    ///
843    /// - (T1) Happy path: last value retrieval with production DDL.
844    /// - (T2) Edge case: all rows share identical `ts` millisecond value.
845    /// - (T3) Regression guard: SQL string must contain `ORDER BY ts DESC, rowid DESC LIMIT 1`.
846    #[test]
847    fn last_path_same_ms_returns_last_insert() {
848        // 1) Assert build_query_sql last form contains tie-breaker.
849        let sql_last =
850            build_query_sql(Some("last"), None, &[], None, None).expect("build_query_sql last");
851        assert!(
852            sql_last.contains("ORDER BY ts DESC, rowid DESC LIMIT 1"),
853            "last path SQL missing rowid DESC tie-breaker: {sql_last}"
854        );
855
856        // 2) Execute the make_last-equivalent SQL against in-memory SQLite.
857        // Safety: open_in_memory() only fails on internal SQLite allocation
858        // errors which cannot occur in a controlled test environment.
859        let conn = Connection::open_in_memory().expect("open in-memory sqlite");
860        conn.execute_batch(
861            "CREATE TABLE ts (series TEXT NOT NULL, ts INTEGER NOT NULL, \
862             tags TEXT, value TEXT NOT NULL); \
863             CREATE INDEX idx_ts_series_ts ON ts(series, ts);",
864        )
865        .expect("ddl");
866
867        for v in ["\"first\"", "\"second\"", "\"third\""] {
868            conn.execute(
869                "INSERT INTO ts (series, ts, tags, value) VALUES (?, ?, NULL, ?)",
870                params!["s", 1000_i64, v],
871            )
872            .expect("insert");
873        }
874
875        // make_last inline SQL form (mirrors src/bridge/ts.rs post-fix).
876        // This string is the expected form after the rowid tie-breaker fix;
877        // if make_last's format! drifts, the behaviour test below will catch it.
878        let make_last_sql = "SELECT ts, value, tags FROM ts \
879             WHERE series = ? AND ts >= ? AND ts <= ? \
880             ORDER BY ts DESC, rowid DESC LIMIT 1";
881
882        // Safety: make_last_sql is a literal string, prepare() cannot fail here.
883        let mut stmt = conn.prepare(make_last_sql).expect("prepare");
884        let value: String = stmt
885            .query_row(params!["s", i64::MIN, i64::MAX], |r| r.get::<_, String>(1))
886            .expect("query_row");
887
888        assert_eq!(
889            value, "\"third\"",
890            "last path returned non-last INSERT value: {value}"
891        );
892    }
893}