Skip to main content

pond/
sql.rs

1//! `pond_sql_query`: read-only DataFusion SQL over the three Lance tables
2//! (`sessions` / `messages` / `parts`), registered as `LanceTableProvider`s
3//! (behind plan-time views that rename `id` to `message_id` / `session_id`)
4//! on a fresh per-call `SessionContext`. Read-only is enforced in two layers - a
5//! single-`SELECT` pre-parse and `sql_with_options` with DDL/DML/statements all
6//! disabled - so no statement that mutates the corpus or touches the filesystem
7//! (INSERT/UPDATE/DELETE/CREATE/DROP/COPY/CREATE EXTERNAL TABLE/SET) can run.
8//! Results render inline (row-capped) or export to a parquet/ndjson file the
9//! caller fetches via the `pond-sql-export://` resource (`src/transport.rs`).
10
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use anyhow::anyhow;
16use arrow_json::LineDelimitedWriter;
17use lance::Dataset;
18use lance::datafusion::LanceTableProvider;
19use lance::deps::arrow_array::builder::{
20    BooleanBuilder, Float64Builder, Int64Builder, StringBuilder,
21};
22use lance::deps::arrow_array::{
23    Array, ArrayRef, GenericStringArray, LargeBinaryArray, OffsetSizeTrait, RecordBatch,
24    StringArray, StringViewArray,
25};
26use lance::deps::arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
27use lance::deps::datafusion::arrow::util::pretty::pretty_format_batches;
28use lance::deps::datafusion::catalog::{Session, TableFunctionImpl, TableProvider};
29use lance::deps::datafusion::common::ScalarValue;
30use lance::deps::datafusion::datasource::{ViewTable, provider_as_source};
31use lance::deps::datafusion::error::DataFusionError;
32use lance::deps::datafusion::execution::SessionStateBuilder;
33use lance::deps::datafusion::execution::runtime_env::RuntimeEnvBuilder;
34use lance::deps::datafusion::logical_expr::{
35    ColumnarValue, LogicalPlanBuilder, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
36    TypeSignature, Volatility,
37};
38use lance::deps::datafusion::logical_expr::{Expr, TableType};
39use lance::deps::datafusion::physical_plan::ExecutionPlan;
40use lance::deps::datafusion::prelude::{SQLOptions, SessionConfig, SessionContext, col};
41use lance::deps::datafusion::sql::parser::{DFParser, Statement as DfStatement};
42use lance::deps::datafusion::sql::sqlparser::ast::{SetExpr, Statement as SqlStatement};
43use lance_arrow::SchemaExt;
44use lance_datafusion::udf::register_functions;
45use lance_index::scalar::FullTextSearchQuery;
46use lance_index::scalar::inverted::parser::from_json;
47use parquet::arrow::ArrowWriter;
48
49/// Per-query memory ceiling for the DataFusion runtime. Not enforced on every
50/// operator (datafusion caveat), so the timeout below is the hard backstop.
51const MEM_LIMIT_BYTES: usize = 512 * 1024 * 1024;
52/// Wall-clock cap on `collect()`. DataFusion 53 has no built-in query timeout,
53/// so this `tokio::time::timeout` is the only guard against a runaway plan.
54const QUERY_TIMEOUT: Duration = Duration::from_secs(30);
55/// Byte budget for the inline (rendered table) result; rows are dropped to fit.
56const INLINE_BUDGET_BYTES: usize = 80_000;
57/// Hard ceiling on an export artifact: base64'd over `resources/read` it costs
58/// ~1.33x this in the response, so keep it well under any process envelope.
59const MAX_EXPORT_BYTES: usize = 100 * 1024 * 1024;
60/// Default inline row cap when the caller passes no `limit`.
61pub const DEFAULT_INLINE_ROWS: usize = 100;
62/// Upper bound on the caller-supplied inline `limit`.
63pub const MAX_INLINE_ROWS: usize = 1_000;
64
65/// Export serialization format. Vector columns are excluded and JSON columns
66/// are decoded to text before encoding (see [`displayable`]).
67#[derive(Debug, Clone, Copy)]
68pub enum Format {
69    Parquet,
70    Ndjson,
71}
72
73impl Format {
74    pub fn ext(self) -> &'static str {
75        match self {
76            Self::Parquet => "parquet",
77            Self::Ndjson => "ndjson",
78        }
79    }
80
81    pub fn mime(self) -> &'static str {
82        match self {
83            Self::Parquet => "application/vnd.apache.parquet",
84            Self::Ndjson => "application/x-ndjson",
85        }
86    }
87}
88
89/// How `pond_sql_query` returns results.
90#[derive(Debug, Clone, Copy)]
91pub enum Mode {
92    /// Render a row-capped table into the tool result.
93    Inline,
94    /// Write the full result to a file and return a `pond-sql-export://` link.
95    Export(Format),
96}
97
98/// The Lance datasets a query references, fetched fresh per call so each query
99/// sees a current snapshot (the handle freshness gate runs on each
100/// `Store::dataset`). A field is `None` when the query never names that table -
101/// the caller skips opening it, avoiding the slow `parts.lance` open on the
102/// common messages-only query (spec.md#search). See [`mentions_table`].
103pub struct Tables {
104    pub sessions: Option<Arc<Dataset>>,
105    pub messages: Option<Arc<Dataset>>,
106    pub parts: Option<Arc<Dataset>>,
107}
108
109/// Whether `sql` references the table named `table`. A DataFusion query can
110/// only reach a registered table by writing its name literally - no alias hides
111/// the base name - so this lowercase word-boundary scan never yields a false
112/// negative. At worst it matches the name inside a string or column literal and
113/// opens a table the query won't touch: a cheap, safe false positive. Lets the
114/// caller open only the datasets a query needs.
115pub fn mentions_table(sql: &str, table: &str) -> bool {
116    sql.to_ascii_lowercase()
117        .split(|c: char| !c.is_alphanumeric() && c != '_')
118        .any(|token| token == table)
119}
120
121/// Result of a successful `run`.
122pub enum Outcome {
123    /// A rendered, row-capped table (already includes the metrics footer).
124    Inline(String),
125    /// Encoded export bytes plus metadata for the caller's summary/resource.
126    Export {
127        bytes: Vec<u8>,
128        format: Format,
129        rows: usize,
130        columns: Vec<String>,
131    },
132}
133
134/// Two error channels: `Query` is caller-fixable (parse/plan/exec/limits) and
135/// the tool surfaces it as an `isError` result so the model self-corrects;
136/// `Infra` is an internal failure surfaced as a protocol error.
137#[derive(Debug)]
138pub enum SqlError {
139    Query(String),
140    Infra(anyhow::Error),
141}
142
143fn infra(error: ArrowError) -> SqlError {
144    SqlError::Infra(anyhow::Error::new(error))
145}
146
147/// Execute one read-only SQL query and return either a rendered table, a JSON
148/// payload, or encoded export bytes.
149pub async fn run(
150    tables: &Tables,
151    sql: &str,
152    mode: Mode,
153    inline_rows: usize,
154) -> Result<Outcome, SqlError> {
155    let parsed = parse_and_gate(sql)?;
156    if matches!(parsed.kind, StatementKind::Explain) && matches!(mode, Mode::Export(_)) {
157        return Err(SqlError::Query(
158            "EXPLAIN returns a plan, not a result set; use format=text (or json) to read it"
159                .to_owned(),
160        ));
161    }
162    if projection_mentions_vector(parsed.projection_query()) {
163        return Err(SqlError::Query(
164            "the `vector` column is not selectable from pond_sql_query (it is a \
165             FixedSizeList<f32> embedding, ~600 bytes per row and not useful in a result). \
166             For semantic search use pond_search. Filtering on it is allowed in WHERE \
167             (e.g. `vector IS NOT NULL`)."
168                .to_owned(),
169        ));
170    }
171    if jsonb_cast_misuse(sql) {
172        return Err(SqlError::Query(
173            "CAST / `::` does not work on the binary JSONB columns (variant_data, options) - \
174             when the bytes happen to be valid text it can even silently return garbage. \
175             Stringify the whole value with json_extract(col, '$') or read one field with \
176             json_extract(col, '$.field')."
177                .to_owned(),
178        ));
179    }
180    if jsonb_fulldoc_like_scan(sql) {
181        return Err(SqlError::Query(
182            "a leading-wildcard LIKE over the whole JSONB document - \
183             json_extract(variant_data, '$') LIKE '%...%' - stringifies and scans every row, \
184             so over parts it will not finish within the time limit. There is no substring \
185             index on tool bodies yet (TODO #47: lance v8 FM-Index). Instead match a single \
186             field with json_extract(variant_data, '$.field') LIKE '...', scope to one session \
187             with session_id = '<id>' and read it with pond_get, or search conversational text \
188             with contains_tokens(search_text, '...')."
189                .to_owned(),
190        ));
191    }
192    let ctx = build_context()?;
193    register(&ctx, tables)?;
194
195    // Defense in depth on top of the pre-parse gate: SQLOptions blocks DDL/DML
196    // at planning time. `allow_statements` stays false for a plain SELECT (the
197    // parse-time gate already rejects SET/SHOW etc.) but must be true for
198    // EXPLAIN, which DataFusion classifies as a Statement node. The inner
199    // query of an EXPLAIN was vetted by the gate above.
200    let options = SQLOptions::new()
201        .with_allow_ddl(false)
202        .with_allow_dml(false)
203        .with_allow_statements(matches!(parsed.kind, StatementKind::Explain));
204    let df = ctx
205        .sql_with_options(sql, options)
206        .await
207        .map_err(|error| SqlError::Query(enrich(&format!("SQL error: {error}"))))?;
208
209    // Captured before `collect()` consumes `df`, so an empty result still
210    // renders its column headers.
211    let result_schema = Arc::new(df.schema().as_arrow().clone());
212    let started = Instant::now();
213    // TODO(#47): substring hunts inside parts.variant_data (json_extract +
214    // LIKE full scans) are the dominant real-world cause of this timeout. The
215    // planned fix is lance v8's FM-Index on variant_data (raw-byte substring
216    // search via `contains(variant_data, 'needle')`); until it lands, the
217    // message steers agents to predicates the current indexes can serve.
218    let collected = tokio::time::timeout(QUERY_TIMEOUT, df.collect())
219        .await
220        .map_err(|_| {
221            SqlError::Query(format!(
222                "query exceeded the {}s limit; add a narrower WHERE or a LIMIT. If you were \
223                 substring-scanning variant_data (json_extract + LIKE), there is no \
224                 substring index on tool bodies yet: filter parts by type and \
225                 json_get_string(variant_data, 'name') first, or search conversational \
226                 text with contains_tokens(search_text, '...') instead.",
227                QUERY_TIMEOUT.as_secs()
228            ))
229        })?
230        .map_err(|error| SqlError::Query(enrich(&format!("SQL error: {error}"))))?;
231    let elapsed = started.elapsed();
232
233    let display: Vec<RecordBatch> = if collected.is_empty() {
234        vec![displayable(&RecordBatch::new_empty(result_schema)).map_err(infra)?]
235    } else {
236        collected
237            .into_iter()
238            .map(|batch| displayable(&batch))
239            .collect::<Result<_, _>>()
240            .map_err(infra)?
241    };
242
243    match mode {
244        Mode::Inline => Ok(Outcome::Inline(
245            render_inline(&display, inline_rows, elapsed).map_err(infra)?,
246        )),
247        Mode::Export(format) => {
248            let rows = display.iter().map(RecordBatch::num_rows).sum();
249            let columns = display
250                .first()
251                .map(|batch| {
252                    batch
253                        .schema()
254                        .fields()
255                        .iter()
256                        .map(|field| field.name().clone())
257                        .collect::<Vec<_>>()
258                })
259                .unwrap_or_default();
260            let bytes = match format {
261                Format::Parquet => encode_parquet(&display)?,
262                Format::Ndjson => encode_ndjson(&display)?,
263            };
264            if bytes.len() > MAX_EXPORT_BYTES {
265                return Err(SqlError::Query(format!(
266                    "export is {} bytes, over the {MAX_EXPORT_BYTES} byte limit; \
267                     narrow the query or aggregate",
268                    bytes.len()
269                )));
270            }
271            Ok(Outcome::Export {
272                bytes,
273                format,
274                rows,
275                columns,
276            })
277        }
278    }
279}
280
281/// Top-level statement shape allowed past the read-only gate.
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283enum StatementKind {
284    /// A plain `Query` (SELECT/WITH/VALUES/UNION).
285    Query,
286    /// `EXPLAIN [ANALYZE] <query>` - planning info only, no mutation.
287    Explain,
288}
289
290/// Parsed top-level statement, normalized so downstream checks always see a
291/// projection-bearing `Query` regardless of whether the user wrote `SELECT`
292/// or `EXPLAIN SELECT`. DataFusion's parser wraps EXPLAIN in its own
293/// `DfStatement::Explain` variant (separate from sqlparser's
294/// `SqlStatement::Explain`), so the gate has to peel both layers.
295struct ParsedStatement {
296    kind: StatementKind,
297    query: lance::deps::datafusion::sql::sqlparser::ast::Query,
298}
299
300impl ParsedStatement {
301    fn projection_query(&self) -> &lance::deps::datafusion::sql::sqlparser::ast::Query {
302        &self.query
303    }
304}
305
306/// Read-only gate: parse the SQL and require exactly one top-level `Query` or
307/// `EXPLAIN <Query>`. Rejects DDL/DML/COPY/SET/SHOW and multi-statement input,
308/// which `SQLOptions` alone does not catch at planning time. EXPLAIN of a
309/// non-Query (e.g. `EXPLAIN INSERT ...`) is also rejected: EXPLAIN itself is
310/// read-only, but letting the inner shape be DDL/DML widens the surface area
311/// the gate has to reason about for no real agent gain.
312fn parse_and_gate(sql: &str) -> Result<ParsedStatement, SqlError> {
313    let statements = DFParser::parse_sql(sql)
314        .map_err(|error| SqlError::Query(format!("SQL parse error: {error}")))?;
315    if statements.len() != 1 {
316        return Err(SqlError::Query(
317            "pond_sql_query runs exactly one statement; submit a single SELECT".to_owned(),
318        ));
319    }
320    let Some(front) = statements.front() else {
321        return Err(read_only_rejection());
322    };
323    match front {
324        DfStatement::Statement(boxed) => match boxed.as_ref() {
325            SqlStatement::Query(query) => Ok(ParsedStatement {
326                kind: StatementKind::Query,
327                query: query.as_ref().clone(),
328            }),
329            _ => Err(read_only_rejection()),
330        },
331        DfStatement::Explain(explain) => match explain.statement.as_ref() {
332            DfStatement::Statement(inner) => match inner.as_ref() {
333                SqlStatement::Query(query) => Ok(ParsedStatement {
334                    kind: StatementKind::Explain,
335                    query: query.as_ref().clone(),
336                }),
337                _ => Err(read_only_rejection()),
338            },
339            _ => Err(read_only_rejection()),
340        },
341        _ => Err(read_only_rejection()),
342    }
343}
344
345fn read_only_rejection() -> SqlError {
346    SqlError::Query(
347        "pond_sql_query is read-only: only a single SELECT/WITH (or EXPLAIN of one) is \
348         allowed (no INSERT/UPDATE/DELETE/CREATE/DROP/COPY/SET)"
349            .to_owned(),
350    )
351}
352
353/// Reject any top-level projection that explicitly references the embedding
354/// `vector` column. Today such queries silently return an empty column (the
355/// FixedSizeList<f32> is stripped by `displayable`), which wastes agent tokens
356/// diagnosing. WHERE/HAVING references stay legal - the doc lets agents filter
357/// on it (e.g. `WHERE vector IS NOT NULL`); only projecting the column out is
358/// blocked. Heuristic: tokenize each top-level SELECT item and look for a bare
359/// `vector` identifier. Covers `SELECT vector`, `SELECT id, vector`,
360/// `SELECT m.vector`, and `SELECT array_length(vector)`. Wildcards (`*` /
361/// `messages.*`) keep the existing silent-strip behavior since they don't name
362/// the column explicitly.
363fn projection_mentions_vector(query: &lance::deps::datafusion::sql::sqlparser::ast::Query) -> bool {
364    walk_set_expr_for_vector(query.body.as_ref())
365}
366
367fn walk_set_expr_for_vector(expr: &SetExpr) -> bool {
368    match expr {
369        SetExpr::Select(select) => select
370            .projection
371            .iter()
372            .any(|item| mentions_vector_token(&item.to_string())),
373        SetExpr::Query(inner) => walk_set_expr_for_vector(inner.body.as_ref()),
374        SetExpr::SetOperation { left, right, .. } => {
375            walk_set_expr_for_vector(left) || walk_set_expr_for_vector(right)
376        }
377        _ => false,
378    }
379}
380
381fn mentions_vector_token(text: &str) -> bool {
382    text.split(|c: char| !c.is_alphanumeric() && c != '_')
383        .any(|token| token == "vector")
384}
385
386/// Plan-time gate for CAST / `::` on the binary JSONB columns. The runtime
387/// failure is data-dependent (CAST only errors when a non-UTF8 byte is hit;
388/// JSONB header bytes are often valid ASCII, so it can silently "succeed" and
389/// return binary garbage), so reject before scanning. Token-scan heuristic in
390/// the spirit of `projection_mentions_vector`; an aliased column that slips
391/// through still hits the `enrich` runtime hint.
392fn jsonb_cast_misuse(sql: &str) -> bool {
393    const JSONB_COLUMNS: [&str; 2] = ["variant_data", "options"];
394    let lowered = sql.to_ascii_lowercase();
395    let bytes = lowered.as_bytes();
396    let is_ident = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
397
398    // `<col> :: <type>`
399    for column in JSONB_COLUMNS {
400        let mut start = 0;
401        while let Some(pos) = lowered[start..].find(column) {
402            let begin = start + pos;
403            let end = begin + column.len();
404            start = end;
405            let bounded = (begin == 0 || !is_ident(bytes[begin - 1]))
406                && (end == bytes.len() || !is_ident(bytes[end]));
407            if bounded && lowered[end..].trim_start().starts_with("::") {
408                return true;
409            }
410        }
411    }
412
413    // `CAST(<qualifier.>col AS <type>`
414    let mut start = 0;
415    while let Some(pos) = lowered[start..].find("cast") {
416        let begin = start + pos;
417        start = begin + 4;
418        if begin > 0 && is_ident(bytes[begin - 1]) {
419            continue;
420        }
421        let Some(open) = lowered[begin + 4..].trim_start().strip_prefix('(') else {
422            continue;
423        };
424        let mut operand = open.trim_start();
425        if let Some(dot) = operand.find('.')
426            && dot > 0
427            && operand.as_bytes()[..dot].iter().all(|b| is_ident(*b))
428        {
429            operand = &operand[dot + 1..];
430        }
431        for column in JSONB_COLUMNS {
432            if let Some(after) = operand.strip_prefix(column)
433                && !after.starts_with(|c: char| c.is_ascii_alphanumeric() || c == '_')
434                && after
435                    .trim_start()
436                    .strip_prefix("as")
437                    .is_some_and(|rest| rest.starts_with(char::is_whitespace))
438            {
439                return true;
440            }
441        }
442    }
443    false
444}
445
446/// Plan-time gate for the one substring shape that reliably exhausts the
447/// wall-clock cap: a leading-wildcard LIKE/ILIKE over the *whole-document*
448/// stringify of a binary JSONB column - `json_extract(variant_data|options,
449/// '$') LIKE '%...%'`. That materializes every row's entire JSONB blob just to
450/// substring-scan it, and the leading `%` defeats every index; over parts
451/// (>1M rows) it does not finish, even scoped to a day. A single-field extract
452/// (`'$.name'`) or any non-leading pattern is left to run - only the
453/// whole-document murder shape is rejected, so the agent gets the indexed path
454/// in milliseconds instead of a timeout. Token-scan heuristic in the spirit of
455/// `jsonb_cast_misuse`; the timeout message remains the backstop for anything
456/// that slips through.
457/// TODO(#47): lance v8's FM-Index gives raw-byte substring search
458/// (`contains(variant_data, 'needle')`); retire this gate once it lands.
459fn jsonb_fulldoc_like_scan(sql: &str) -> bool {
460    const JSONB_COLUMNS: [&str; 2] = ["variant_data", "options"];
461    const NEEDLE: &str = "json_extract";
462    let lowered = sql.to_ascii_lowercase();
463    let bytes = lowered.as_bytes();
464    let is_ident = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
465
466    let mut start = 0;
467    while let Some(pos) = lowered[start..].find(NEEDLE) {
468        let begin = start + pos;
469        start = begin + NEEDLE.len();
470        if begin > 0 && is_ident(bytes[begin - 1]) {
471            continue;
472        }
473        let Some(rest) = lowered[start..].trim_start().strip_prefix('(') else {
474            continue;
475        };
476        let mut operand = rest.trim_start();
477        // optional `qualifier.`
478        if let Some(dot) = operand.find('.')
479            && dot > 0
480            && operand.as_bytes()[..dot].iter().all(|b| is_ident(*b))
481        {
482            operand = &operand[dot + 1..];
483        }
484        let Some(col) = JSONB_COLUMNS.into_iter().find(|c| operand.starts_with(c)) else {
485            continue;
486        };
487        // Require the whole-document path `, '$' )` exactly - a single-field
488        // extract (`'$.name'`) is fine and must keep running.
489        let tail = operand[col.len()..].trim_start();
490        let Some(tail) = tail
491            .strip_prefix(',')
492            .map(str::trim_start)
493            .and_then(|t| t.strip_prefix("'$'"))
494            .map(str::trim_start)
495            .and_then(|t| t.strip_prefix(')'))
496        else {
497            continue;
498        };
499        // Step past any wrapper close-parens (`lower(...)`/`upper(...)`).
500        let mut tail = tail.trim_start();
501        while let Some(next) = tail.strip_prefix(')') {
502            tail = next.trim_start();
503        }
504        if let Some(next) = tail.strip_prefix("not")
505            && next.starts_with(char::is_whitespace)
506        {
507            tail = next.trim_start();
508        }
509        for op in ["like", "ilike"] {
510            if let Some(next) = tail.strip_prefix(op)
511                && next.starts_with(char::is_whitespace)
512                && next.trim_start().starts_with("'%")
513            {
514                return true;
515            }
516        }
517    }
518    false
519}
520
521fn build_context() -> Result<SessionContext, SqlError> {
522    let runtime = RuntimeEnvBuilder::new()
523        .with_memory_limit(MEM_LIMIT_BYTES, 1.0)
524        .build_arc()
525        .map_err(|error| SqlError::Infra(anyhow!("datafusion runtime init failed: {error}")))?;
526    // information_schema is the standard self-discovery path (SELECT ... FROM
527    // information_schema.columns); agents reach for it before any doc.
528    let state = SessionStateBuilder::new()
529        .with_config(SessionConfig::new().with_information_schema(true))
530        .with_runtime_env(runtime)
531        .with_default_features()
532        .build();
533    Ok(SessionContext::new_with_state(state))
534}
535
536/// Plan-time key renames: each table's storage `id` is exposed under a
537/// self-describing name so the same value never changes name between tables -
538/// agents copy column names across queries. One source drives both the
539/// registered views and fts() output so they cannot diverge.
540fn renamed_key(table: &str) -> Option<&'static str> {
541    match table {
542        "messages" => Some("message_id"),
543        "sessions" => Some("session_id"),
544        _ => None,
545    }
546}
547
548fn register(ctx: &SessionContext, tables: &Tables) -> Result<(), SqlError> {
549    for (name, dataset) in [
550        ("sessions", &tables.sessions),
551        ("messages", &tables.messages),
552    ] {
553        let Some(dataset) = dataset else { continue };
554        // LanceTableProvider (not the bare Dataset impl) so WHERE/projection/
555        // limit push into Lance's indexed scan; (false, false) hides _rowid /
556        // _rowaddr from the SQL schema. The view applies `renamed_key`
557        // plan-time only; storage keeps `id`.
558        let provider = LanceTableProvider::new(dataset.clone(), false, false);
559        let key = renamed_key(name).unwrap_or("id");
560        let view = renamed_view(name, Arc::new(provider), "id", key)
561            .map_err(|error| SqlError::Infra(anyhow!("build {name} view: {error}")))?;
562        ctx.register_table(name, Arc::new(view))
563            .map_err(|error| SqlError::Infra(anyhow!("register table {name}: {error}")))?;
564    }
565    // `parts` hides the `data` blob column behind a projecting view: blob
566    // columns scan as `{position, size}` descriptor structs, so any SQL touch
567    // dies in the planner with an opaque CAST error. The view inlines at plan
568    // time - filters still push into the Lance scan underneath.
569    if let Some(parts) = &tables.parts {
570        let provider = LanceTableProvider::new(parts.clone(), false, false);
571        let keep: Vec<_> = parts
572            .schema()
573            .fields
574            .iter()
575            .filter(|field| field.name != "data")
576            .map(|field| col(field.name.as_str()))
577            .collect();
578        let plan = LogicalPlanBuilder::scan("parts", provider_as_source(Arc::new(provider)), None)
579            .and_then(|builder| builder.project(keep))
580            .and_then(LogicalPlanBuilder::build)
581            .map_err(|error| SqlError::Infra(anyhow!("build parts view: {error}")))?;
582        ctx.register_table("parts", Arc::new(ViewTable::new(plan, None)))
583            .map_err(|error| SqlError::Infra(anyhow!("register table parts: {error}")))?;
584    }
585    // `fts('messages', '{...}')` BM25 search-in-SQL (vendored provider with a
586    // declared `_score` column - see `ScoredFtsUdtf`), and lance's JSON /
587    // contains_tokens UDFs for filtering inside the JSON columns. Only the
588    // referenced tables are present, matching the registered views above.
589    let datasets = [
590        ("sessions", &tables.sessions),
591        ("messages", &tables.messages),
592        ("parts", &tables.parts),
593    ]
594    .into_iter()
595    .filter_map(|(name, dataset)| dataset.clone().map(|d| (name.to_owned(), d)))
596    .collect();
597    let fts = ScoredFtsUdtf { datasets };
598    ctx.register_udtf("fts", Arc::new(fts));
599    register_functions(ctx);
600    // Shadow lance's strict json_get_* by name: the strict versions abort the
601    // whole scan when any row's field is non-scalar (e.g. tool_result `result`
602    // arrays), turning one polymorphic value into a dead query.
603    for udf in lenient_json_udfs() {
604        ctx.register_udf(udf);
605    }
606    // `any_value` (Postgres 16 / DuckDB / BigQuery - agents reach for it)
607    // doesn't exist in DataFusion 53; alias first_value, which satisfies the
608    // same contract (any_value promises no ordering, so first-encountered is
609    // a valid answer). register_udaf indexes aliases.
610    if let Some(first_value) = ctx.state().aggregate_functions().get("first_value") {
611        ctx.register_udaf(first_value.as_ref().clone().with_aliases(["any_value"]));
612    }
613    // `fts` as a *scalar* exists only to fail at plan time with the correction:
614    // agents pattern-match FTS into WHERE (MySQL MATCH / Postgres @@ priors)
615    // and DataFusion's stock error is "Did you mean 'cos'?". Scalar and
616    // table-function registries are separate namespaces, so the real fts()
617    // UDTF in FROM position is unaffected.
618    ctx.register_udf(ScalarUDF::new_from_impl(FtsMisuse::new()));
619    Ok(())
620}
621
622/// Wrap `provider` in a view projecting every column, with `from` renamed to
623/// `to`. The view inlines at plan time, so filters and projections still push
624/// into the underlying Lance scan.
625fn renamed_view(
626    scan_name: &str,
627    provider: Arc<dyn TableProvider>,
628    from: &str,
629    to: &str,
630) -> Result<ViewTable, DataFusionError> {
631    let projection: Vec<_> = provider
632        .schema()
633        .fields()
634        .iter()
635        .map(|field| {
636            let column = col(field.name().as_str());
637            if field.name() == from {
638                column.alias(to)
639            } else {
640                column
641            }
642        })
643        .collect();
644    let plan = LogicalPlanBuilder::scan(scan_name, provider_as_source(provider), None)?
645        .project(projection)?
646        .build()?;
647    Ok(ViewTable::new(plan, None))
648}
649
650const FTS_MISUSE: &str = "fts is a table function and goes in FROM, not in WHERE or the \
651    projection. For filtering use WHERE contains_tokens(search_text, 'word1 word2') (all \
652    words must match; index-accelerated). For ranked results: SELECT m.message_id, f._score \
653    FROM fts('messages', '{\"match\":{\"column\":\"search_text\",\"terms\":\"...\"}}') f \
654    JOIN messages m ON m.message_id = f.message_id ORDER BY f._score DESC.";
655
656/// See the registration comment: a plan-time teaching error for `WHERE fts(...)`.
657#[derive(Debug, PartialEq, Eq, Hash)]
658struct FtsMisuse {
659    signature: Signature,
660}
661
662impl FtsMisuse {
663    fn new() -> Self {
664        Self {
665            signature: Signature::variadic_any(Volatility::Immutable),
666        }
667    }
668}
669
670impl ScalarUDFImpl for FtsMisuse {
671    fn as_any(&self) -> &dyn std::any::Any {
672        self
673    }
674
675    fn name(&self) -> &str {
676        "fts"
677    }
678
679    fn signature(&self) -> &Signature {
680        &self.signature
681    }
682
683    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
684        Err(DataFusionError::Plan(FTS_MISUSE.to_owned()))
685    }
686
687    fn invoke_with_args(
688        &self,
689        _args: ScalarFunctionArgs,
690    ) -> Result<ColumnarValue, DataFusionError> {
691        Err(DataFusionError::Plan(FTS_MISUSE.to_owned()))
692    }
693}
694
695/// Vendored replacement for lance's `FtsQueryUDTF` (lance-7.0.0
696/// src/dataset/udtf.rs). The upstream provider omits `_score` from its
697/// declared schema while leaving the scanner's scoring autoprojection on, so
698/// `_score` is physically appended but logically unknown: naming it in SQL
699/// fails ("No field named _score") and any aggregate over fts() dies on
700/// DataFusion's physical-vs-logical schema check (COUNT plans 0 columns,
701/// receives 1). This provider declares `_score` as a regular nullable Float32
702/// column, projects it explicitly, and disables the autoprojection - which is
703/// also lance's documented intended end state for score columns
704/// (scanner.rs "_score/_distance should become regular output columns").
705/// Delete once fixed upstream.
706#[derive(Debug)]
707struct ScoredFtsUdtf {
708    datasets: HashMap<String, Arc<Dataset>>,
709}
710
711impl TableFunctionImpl for ScoredFtsUdtf {
712    fn call(
713        &self,
714        expr: &[Expr],
715    ) -> Result<Arc<dyn TableProvider>, lance::deps::datafusion::error::DataFusionError> {
716        let [table_expr, query_expr] = expr else {
717            return Err(DataFusionError::Execution(
718                "fts() takes (table_name, fts_query_json)".to_owned(),
719            ));
720        };
721        let Expr::Literal(ScalarValue::Utf8(Some(table_name)), _) = table_expr else {
722            return Err(DataFusionError::Execution(
723                "fts() first argument must be a table name string".to_owned(),
724            ));
725        };
726        let Expr::Literal(ScalarValue::Utf8(Some(fts_query)), _) = query_expr else {
727            return Err(DataFusionError::Execution(
728                "fts() second argument must be the fts query as a JSON string".to_owned(),
729            ));
730        };
731        let dataset = self.datasets.get(table_name).ok_or_else(|| {
732            DataFusionError::Execution(format!("fts(): table {table_name} not found"))
733        })?;
734        let mut full_schema = Schema::from(dataset.schema());
735        full_schema = full_schema
736            .try_with_column(Field::new(SCORE_COLUMN, DataType::Float32, true))
737            .map_err(|error| DataFusionError::ArrowError(Box::new(error), None))?;
738        let provider: Arc<dyn TableProvider> = Arc::new(ScoredFtsProvider {
739            dataset: dataset.clone(),
740            fts_query: FullTextSearchQuery::new_query(from_json(fts_query)?),
741            full_schema: Arc::new(full_schema),
742        });
743        // Same `renamed_key` as the registered views, so fts() output joins
744        // without a name switch.
745        match renamed_key(table_name) {
746            Some(key) => Ok(Arc::new(renamed_view("fts", provider, "id", key)?)),
747            None => Ok(provider),
748        }
749    }
750}
751
752const SCORE_COLUMN: &str = "_score";
753
754#[derive(Debug)]
755struct ScoredFtsProvider {
756    dataset: Arc<Dataset>,
757    fts_query: FullTextSearchQuery,
758    full_schema: SchemaRef,
759}
760
761#[async_trait::async_trait]
762impl TableProvider for ScoredFtsProvider {
763    fn as_any(&self) -> &dyn std::any::Any {
764        self
765    }
766
767    fn schema(&self) -> SchemaRef {
768        self.full_schema.clone()
769    }
770
771    fn table_type(&self) -> TableType {
772        TableType::Temporary
773    }
774
775    async fn scan(
776        &self,
777        _state: &dyn Session,
778        projection: Option<&Vec<usize>>,
779        filters: &[Expr],
780        limit: Option<usize>,
781    ) -> Result<Arc<dyn ExecutionPlan>, lance::deps::datafusion::error::DataFusionError> {
782        let mut scan = self.dataset.scan();
783        scan.full_text_search(self.fts_query.clone())?;
784        // `_score` is a declared column projected explicitly below; with the
785        // autoprojection off, the physical batch always matches the logical
786        // plan (the mismatch is what breaks aggregates upstream).
787        scan.disable_scoring_autoprojection();
788        match projection {
789            Some(projection) if projection.is_empty() => {
790                scan.empty_project()?;
791            }
792            Some(projection) => {
793                let columns: Vec<&str> = projection
794                    .iter()
795                    .map(|idx| self.full_schema.field(*idx).name().as_str())
796                    .collect();
797                scan.project(&columns)?;
798            }
799            None => {
800                let columns: Vec<&str> = self
801                    .full_schema
802                    .fields()
803                    .iter()
804                    .map(|field| field.name().as_str())
805                    .collect();
806                scan.project(&columns)?;
807            }
808        }
809        if let Some(combined) = filters
810            .iter()
811            .cloned()
812            .reduce(|left, right| left.and(right))
813        {
814            scan.filter_expr(combined);
815        }
816        scan.limit(limit.map(|l| l as i64), None)?;
817        scan.create_plan().await.map_err(DataFusionError::from)
818    }
819}
820
821/// The four scalar shapes the lenient JSON getters produce.
822#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
823enum JsonGet {
824    Text,
825    Int,
826    Float,
827    Bool,
828}
829
830/// Deepest key path the lenient getters accept; deeper nesting is what
831/// json_extract's JSONPath is for.
832const MAX_JSON_KEYS: usize = 6;
833
834/// Lenient replacements for lance's `json_get_string` / `_int` / `_float` /
835/// `_bool`. The strict originals call jsonb's exact converters and turn one
836/// non-scalar field value into a query-wide abort ("Failed to convert to
837/// string: InvalidCast"). Lenient semantics: a string getter serializes
838/// objects/arrays to JSON text; the typed getters return NULL on a
839/// non-coercible value. Unlike lance's one-key originals they take a variadic
840/// key path - `json_get_string(col, 'a', 'b')` - the datafusion-functions-json
841/// convention agents reach for first. Registered after `register_functions`
842/// so they shadow by name.
843fn lenient_json_udfs() -> [ScalarUDF; 4] {
844    let make = |name: &'static str, kind: JsonGet, return_type: DataType| {
845        ScalarUDF::new_from_impl(LenientJsonGet {
846            name,
847            kind,
848            return_type,
849            signature: json_key_path_signature(),
850        })
851    };
852    [
853        make("json_get_string", JsonGet::Text, DataType::Utf8),
854        make("json_get_int", JsonGet::Int, DataType::Int64),
855        make("json_get_float", JsonGet::Float, DataType::Float64),
856        make("json_get_bool", JsonGet::Bool, DataType::Boolean),
857    ]
858}
859
860/// `(LargeBinary, Utf8)` through `(LargeBinary, Utf8 x MAX_JSON_KEYS)`.
861fn json_key_path_signature() -> Signature {
862    let arities = (1..=MAX_JSON_KEYS)
863        .map(|keys| {
864            let mut types = vec![DataType::LargeBinary];
865            types.extend(std::iter::repeat_n(DataType::Utf8, keys));
866            TypeSignature::Exact(types)
867        })
868        .collect();
869    Signature::one_of(arities, Volatility::Immutable)
870}
871
872/// See [`lenient_json_udfs`].
873#[derive(Debug, PartialEq, Eq, Hash)]
874struct LenientJsonGet {
875    name: &'static str,
876    kind: JsonGet,
877    return_type: DataType,
878    signature: Signature,
879}
880
881impl ScalarUDFImpl for LenientJsonGet {
882    fn as_any(&self) -> &dyn std::any::Any {
883        self
884    }
885
886    fn name(&self) -> &str {
887        self.name
888    }
889
890    fn signature(&self) -> &Signature {
891        &self.signature
892    }
893
894    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
895        Ok(self.return_type.clone())
896    }
897
898    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue, DataFusionError> {
899        json_get_lenient(&args.args, &self.kind)
900    }
901}
902
903/// One step of the key walk: object member by name, array element by index.
904fn json_step(raw: jsonb::RawJsonb<'_>, key: &str) -> Option<jsonb::OwnedJsonb> {
905    let value = if raw.is_object().unwrap_or(false) {
906        raw.get_by_name(key, false).ok().flatten()
907    } else if raw.is_array().unwrap_or(false) {
908        key.parse::<usize>()
909            .ok()
910            .and_then(|index| raw.get_by_index(index).ok().flatten())
911    } else {
912        None
913    };
914    value.filter(|value| !value.as_raw().is_null().unwrap_or(false))
915}
916
917fn json_get_lenient(
918    args: &[ColumnarValue],
919    kind: &JsonGet,
920) -> Result<ColumnarValue, DataFusionError> {
921    let arrays = ColumnarValue::values_to_arrays(args)?;
922    let Some((jsonb_arg, key_args)) = arrays.split_first().filter(|(_, keys)| !keys.is_empty())
923    else {
924        return Err(DataFusionError::Execution(
925            "json_get_* takes (json_column, 'key', ...) - at least one key".to_owned(),
926        ));
927    };
928    let jsonb_array = jsonb_arg
929        .as_any()
930        .downcast_ref::<LargeBinaryArray>()
931        .ok_or_else(|| {
932            DataFusionError::Execution(
933                "json_get_* argument 1 must be a JSON column (variant_data, options)".to_owned(),
934            )
935        })?;
936    let key_arrays: Vec<&StringArray> = key_args
937        .iter()
938        .map(|key_arg| {
939            key_arg
940                .as_any()
941                .downcast_ref::<StringArray>()
942                .ok_or_else(|| {
943                    DataFusionError::Execution("json_get_* keys must be string literals".to_owned())
944                })
945        })
946        .collect::<Result<_, _>>()?;
947
948    let field = |row: usize| -> Option<jsonb::OwnedJsonb> {
949        if jsonb_array.is_null(row) {
950            return None;
951        }
952        let mut keys = key_arrays.iter();
953        let first = keys.next()?;
954        if first.is_null(row) {
955            return None;
956        }
957        let mut current = json_step(
958            jsonb::RawJsonb::new(jsonb_array.value(row)),
959            first.value(row),
960        )?;
961        for key_array in keys {
962            if key_array.is_null(row) {
963                return None;
964            }
965            current = json_step(current.as_raw(), key_array.value(row))?;
966        }
967        Some(current)
968    };
969
970    let rows = jsonb_array.len();
971    let array: Arc<dyn Array> = match kind {
972        JsonGet::Text => {
973            let mut builder = StringBuilder::with_capacity(rows, 1024);
974            for row in 0..rows {
975                match field(row) {
976                    // Scalar strings come back unquoted; objects/arrays/
977                    // numbers serialize to JSON text instead of erroring.
978                    Some(value) => match value.as_raw().to_str() {
979                        Ok(text) => builder.append_value(text),
980                        Err(_) => builder.append_value(value.to_string()),
981                    },
982                    None => builder.append_null(),
983                }
984            }
985            Arc::new(builder.finish())
986        }
987        JsonGet::Int => {
988            let mut builder = Int64Builder::with_capacity(rows);
989            for row in 0..rows {
990                builder.append_option(field(row).and_then(|value| value.as_raw().to_i64().ok()));
991            }
992            Arc::new(builder.finish())
993        }
994        JsonGet::Float => {
995            let mut builder = Float64Builder::with_capacity(rows);
996            for row in 0..rows {
997                builder.append_option(field(row).and_then(|value| value.as_raw().to_f64().ok()));
998            }
999            Arc::new(builder.finish())
1000        }
1001        JsonGet::Bool => {
1002            let mut builder = BooleanBuilder::with_capacity(rows);
1003            for row in 0..rows {
1004                builder.append_option(field(row).and_then(|value| value.as_raw().to_bool().ok()));
1005            }
1006            Arc::new(builder.finish())
1007        }
1008    };
1009    Ok(ColumnarValue::Array(array))
1010}
1011
1012/// Failures name the fix: append a recovery hint to the DataFusion error
1013/// classes agents actually hit, so a failed call teaches the correct next
1014/// query instead of starting a guessing loop. First match wins.
1015fn enrich(message: &str) -> String {
1016    const HINTS: &[(&str, &str)] = &[
1017        (
1018            "No field named",
1019            "columns are messages(session_id, message_id, timestamp, role, source_agent, \
1020             project, content [system-role only], search_text [the conversational text], \
1021             embedding_model, options) | sessions(session_id, parent_session_id, \
1022             parent_message_id, source_agent, created_at, project, options) | \
1023             parts(session_id, message_id, id, ordinal, type, provenance, variant_data, \
1024             options). Part bodies (tool params/results, text) live in parts.variant_data - \
1025             read them with json_extract(variant_data, '$.field'). For text search use \
1026             contains_tokens(search_text, '...') in WHERE, or the fts('messages', ...) \
1027             table function in FROM for ranked results; to read a transcript use pond_get. \
1028             Full doc: resource schema://pond-sql.",
1029        ),
1030        (
1031            "Encountered non UTF-8 data",
1032            "JSON columns (variant_data, options) are binary JSONB - CAST / ::text does not \
1033             work on them. Stringify the whole value with json_extract(col, '$'), or fetch \
1034             one field with json_extract(col, '$.field').",
1035        ),
1036        (
1037            "Resources exhausted",
1038            "the query ran out of memory - usually from carrying whole JSON columns \
1039             (variant_data, options) through a join or sort. Project narrow fields with \
1040             json_extract(col, '$.field') instead of whole columns, filter before joining, \
1041             or export the full set with format=parquet.",
1042        ),
1043        (
1044            "LIKE prefix queries are not supported for bitmap indexes",
1045            "prefix LIKE ('x%') and starts_with() fail on bitmap-indexed columns \
1046             (messages.source_agent, messages.role). Use equality, \
1047             split_part(source_agent, '/', 1) = '...', or an infix pattern (LIKE '%x%').",
1048        ),
1049        (
1050            "call to 'json_",
1051            "JSON function signatures: json_get_string|json_get_int|json_get_float|\
1052             json_get_bool(col, 'key', ...) walk a key path (array steps by numeric \
1053             index); json_get(col, 'key') returns JSONB for chaining; json_extract(col, \
1054             '$.a.b') takes a JSONPath and returns JSON text of any value (the right tool \
1055             for deeply nested or mixed-type fields).",
1056        ),
1057        (
1058            "Invalid function 'json",
1059            "available JSON functions: json_get_string, json_get_int, json_get_float, \
1060             json_get_bool (col, 'key', ...); json_get(col, 'key') -> JSONB for chaining; \
1061             json_extract(col, '$.a.b') -> JSON text; json_array_contains; \
1062             json_array_length. See resource schema://pond-sql.",
1063        ),
1064        (
1065            // Defensive: lance's fts `boolean` query can plan a CollectLeft
1066            // HashJoin over multi-partition match arms, which the optimizer
1067            // does not always repair (works through pond's vendored fts()
1068            // provider; kept for any path that still trips it).
1069            "does not satisfy distribution requirements",
1070            "this fts query shape planned an unexecutable join. For AND semantics use a \
1071             single match query with operator And: fts('messages', \
1072             '{\"match\":{\"column\":\"search_text\",\"terms\":\"a b\",\"operator\":\"And\"}}'), \
1073             optionally with LIKE post-filters in WHERE.",
1074        ),
1075        (
1076            "position is not found but required for phrase queries",
1077            "the full-text index is built without positions, so \"phrase\" queries are \
1078             unavailable. Use a match query with operator And plus LIKE post-filters for \
1079             exact-substring matching.",
1080        ),
1081    ];
1082    for (pattern, hint) in HINTS {
1083        if message.contains(pattern) {
1084            return format!("{message}\nhint: {hint}");
1085        }
1086    }
1087    message.to_owned()
1088}
1089
1090/// Decode lance JSONB columns to JSON text, then drop columns that don't render
1091/// readably (the embedding `vector` FixedSizeList and any leftover binary).
1092fn displayable(batch: &RecordBatch) -> Result<RecordBatch, ArrowError> {
1093    let decoded = lance_arrow::json::convert_lance_json_to_arrow(batch)?;
1094    let keep: Vec<usize> = decoded
1095        .schema()
1096        .fields()
1097        .iter()
1098        .enumerate()
1099        .filter(|(_, field)| is_displayable(field.data_type()))
1100        .map(|(index, _)| index)
1101        .collect();
1102    decoded.project(&keep)
1103}
1104
1105fn is_displayable(data_type: &DataType) -> bool {
1106    !matches!(
1107        data_type,
1108        DataType::FixedSizeList(_, _)
1109            | DataType::Binary
1110            | DataType::LargeBinary
1111            | DataType::BinaryView
1112            | DataType::FixedSizeBinary(_)
1113    )
1114}
1115
1116/// One physical line per row: embedded newlines in cell values (markdown,
1117/// multi-line commands) otherwise explode a row across many table lines that
1118/// hard-wrap unreadably in narrow clients. The literal two-char `\n` matches
1119/// the JSON escaping agents already read, and keeps row boundaries
1120/// unambiguous. Inline table mode only - json and export modes keep raw data.
1121fn collapse_newlines(batches: &[RecordBatch]) -> Result<Vec<RecordBatch>, ArrowError> {
1122    fn escape<O: OffsetSizeTrait>(array: &GenericStringArray<O>) -> ArrayRef {
1123        let escaped: GenericStringArray<O> =
1124            array.iter().map(|value| value.map(escape_cell)).collect();
1125        Arc::new(escaped)
1126    }
1127    fn escape_cell(text: &str) -> std::borrow::Cow<'_, str> {
1128        if text.contains(['\n', '\r']) {
1129            std::borrow::Cow::Owned(text.replace("\r\n", "\\n").replace(['\n', '\r'], "\\n"))
1130        } else {
1131            std::borrow::Cow::Borrowed(text)
1132        }
1133    }
1134    batches
1135        .iter()
1136        .map(|batch| {
1137            let columns: Vec<ArrayRef> = batch
1138                .columns()
1139                .iter()
1140                .map(|array| match array.data_type() {
1141                    DataType::Utf8 => array
1142                        .as_any()
1143                        .downcast_ref::<StringArray>()
1144                        .map_or_else(|| array.clone(), escape),
1145                    DataType::LargeUtf8 => array
1146                        .as_any()
1147                        .downcast_ref::<GenericStringArray<i64>>()
1148                        .map_or_else(|| array.clone(), escape),
1149                    DataType::Utf8View => array
1150                        .as_any()
1151                        .downcast_ref::<StringViewArray>()
1152                        .map_or_else(
1153                            || array.clone(),
1154                            |view| {
1155                                let escaped: StringViewArray =
1156                                    view.iter().map(|value| value.map(escape_cell)).collect();
1157                                Arc::new(escaped)
1158                            },
1159                        ),
1160                    _ => array.clone(),
1161                })
1162                .collect();
1163            RecordBatch::try_new(batch.schema(), columns)
1164        })
1165        .collect()
1166}
1167
1168fn render_inline(
1169    display: &[RecordBatch],
1170    max_rows: usize,
1171    elapsed: Duration,
1172) -> Result<String, ArrowError> {
1173    let total: usize = display.iter().map(RecordBatch::num_rows).sum();
1174    let elapsed_ms = elapsed.as_millis();
1175    if total == 0 {
1176        // Still render the header so the caller sees the result columns.
1177        return Ok(format!(
1178            "0 rows ({elapsed_ms} ms).\n{}",
1179            pretty_format_batches(display)?
1180        ));
1181    }
1182    let render = |shown: usize| -> Result<String, ArrowError> {
1183        let limited = collapse_newlines(&limit_batches(display, shown))?;
1184        Ok(pretty_format_batches(&limited)?.to_string())
1185    };
1186    let mut shown = total.min(max_rows);
1187    let mut table = render(shown)?;
1188    while table.len() > INLINE_BUDGET_BYTES && shown > 1 {
1189        shown = (shown / 2).max(1);
1190        table = render(shown)?;
1191    }
1192    let mut out = format!("{total} row(s) in {elapsed_ms} ms; showing {shown}.\n{table}");
1193    if shown < total {
1194        out.push_str(&format!(
1195            "\n... {} row(s) omitted. To page: ORDER BY <indexed col> (e.g. timestamp, \
1196             message_id), then in the next call add `WHERE (col, message_id) < \
1197             (<last_col>, <last_message_id>)` - keyset pagination, see schema://pond-sql. \
1198             For the full set: format=parquet or format=ndjson.",
1199            total - shown
1200        ));
1201    }
1202    Ok(out)
1203}
1204
1205fn limit_batches(batches: &[RecordBatch], max_rows: usize) -> Vec<RecordBatch> {
1206    let mut out = Vec::new();
1207    let mut remaining = max_rows;
1208    for batch in batches {
1209        if remaining == 0 {
1210            break;
1211        }
1212        if batch.num_rows() <= remaining {
1213            remaining -= batch.num_rows();
1214            out.push(batch.clone());
1215        } else {
1216            out.push(batch.slice(0, remaining));
1217            remaining = 0;
1218        }
1219    }
1220    out
1221}
1222
1223fn encode_parquet(batches: &[RecordBatch]) -> Result<Vec<u8>, SqlError> {
1224    let schema = batches
1225        .first()
1226        .map(RecordBatch::schema)
1227        .ok_or_else(|| SqlError::Query("query returned no columns to export".to_owned()))?;
1228    let mut buffer = Vec::new();
1229    let mut writer = ArrowWriter::try_new(&mut buffer, schema, None)
1230        .map_err(|error| SqlError::Infra(anyhow!("parquet init failed: {error}")))?;
1231    for batch in batches {
1232        writer
1233            .write(batch)
1234            .map_err(|error| SqlError::Infra(anyhow!("parquet write failed: {error}")))?;
1235    }
1236    writer
1237        .close()
1238        .map_err(|error| SqlError::Infra(anyhow!("parquet close failed: {error}")))?;
1239    Ok(buffer)
1240}
1241
1242fn encode_ndjson(batches: &[RecordBatch]) -> Result<Vec<u8>, SqlError> {
1243    let mut buffer = Vec::new();
1244    {
1245        let mut writer = LineDelimitedWriter::new(&mut buffer);
1246        let refs: Vec<&RecordBatch> = batches.iter().collect();
1247        writer
1248            .write_batches(&refs)
1249            .map_err(|error| SqlError::Infra(anyhow!("ndjson write failed: {error}")))?;
1250        writer
1251            .finish()
1252            .map_err(|error| SqlError::Infra(anyhow!("ndjson finish failed: {error}")))?;
1253    }
1254    Ok(buffer)
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259    #![allow(clippy::expect_used)]
1260
1261    use super::*;
1262
1263    fn rejected(sql: &str) -> bool {
1264        matches!(parse_and_gate(sql), Err(SqlError::Query(_)))
1265    }
1266
1267    fn parses_as(sql: &str, expected: StatementKind) -> bool {
1268        match parse_and_gate(sql) {
1269            Ok(parsed) => matches!(
1270                (&parsed.kind, &expected),
1271                (StatementKind::Query, StatementKind::Query)
1272                    | (StatementKind::Explain, StatementKind::Explain)
1273            ),
1274            Err(_) => false,
1275        }
1276    }
1277
1278    #[test]
1279    fn mentions_table_is_sound_for_open_pruning() {
1280        // Referenced => must be detected (no false negative, else a valid query
1281        // breaks). Case-insensitive: DataFusion lowercases identifiers.
1282        assert!(mentions_table("SELECT * FROM messages", "messages"));
1283        assert!(mentions_table("select * from MESSAGES", "messages"));
1284        assert!(mentions_table(
1285            "SELECT s.id FROM sessions s JOIN parts p ON s.id = p.session_id",
1286            "parts",
1287        ));
1288        assert!(mentions_table(
1289            "SELECT * FROM fts('messages', '{\"match\":{}}')",
1290            "messages",
1291        ));
1292        assert!(mentions_table(
1293            "WITH x AS (SELECT * FROM sessions) SELECT * FROM x",
1294            "sessions",
1295        ));
1296        // Not referenced => skip the open. Word boundary: a longer identifier
1297        // that merely contains the name is not a reference.
1298        assert!(!mentions_table("SELECT * FROM messages", "parts"));
1299        assert!(!mentions_table("SELECT * FROM messages", "sessions"));
1300        assert!(!mentions_table(
1301            "SELECT counterparts FROM messages",
1302            "parts"
1303        ));
1304    }
1305
1306    #[test]
1307    fn allows_single_select_and_cte() {
1308        assert!(parses_as("SELECT 1", StatementKind::Query));
1309        assert!(parses_as(
1310            "SELECT role, count(*) FROM messages GROUP BY role",
1311            StatementKind::Query
1312        ));
1313        assert!(parses_as(
1314            "WITH t AS (SELECT 1 AS a) SELECT a FROM t",
1315            StatementKind::Query
1316        ));
1317    }
1318
1319    #[test]
1320    fn allows_explain_of_select() {
1321        assert!(parses_as("EXPLAIN SELECT 1", StatementKind::Explain));
1322        assert!(parses_as(
1323            "EXPLAIN ANALYZE SELECT role FROM messages",
1324            StatementKind::Explain
1325        ));
1326    }
1327
1328    #[test]
1329    fn rejects_explain_of_non_query() {
1330        // EXPLAIN of a side-effecting statement: the inner statement is what
1331        // would matter; reject to keep the surface tight.
1332        assert!(rejected("EXPLAIN INSERT INTO messages VALUES ('x')"));
1333    }
1334
1335    #[test]
1336    fn rejects_writes_and_side_effects() {
1337        assert!(rejected("INSERT INTO messages VALUES ('x')"));
1338        assert!(rejected("UPDATE messages SET role = 'x'"));
1339        assert!(rejected("DELETE FROM messages"));
1340        assert!(rejected("CREATE TABLE t (x INT)"));
1341        assert!(rejected("CREATE VIEW v AS SELECT 1"));
1342        assert!(rejected("DROP TABLE messages"));
1343        assert!(rejected(
1344            "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION '/etc'"
1345        ));
1346        assert!(rejected("COPY (SELECT 1) TO '/tmp/x.parquet'"));
1347        assert!(rejected("SET a = 1"));
1348    }
1349
1350    #[test]
1351    fn rejects_multiple_statements() {
1352        assert!(rejected("SELECT 1; SELECT 2"));
1353        assert!(rejected("SELECT 1; DROP TABLE messages"));
1354    }
1355
1356    #[test]
1357    fn rejects_unparseable() {
1358        assert!(rejected("NOT SQL AT ALL ;;"));
1359    }
1360
1361    fn mentions_vector(sql: &str) -> bool {
1362        match parse_and_gate(sql) {
1363            Ok(parsed) => projection_mentions_vector(parsed.projection_query()),
1364            Err(_) => false,
1365        }
1366    }
1367
1368    #[test]
1369    fn explicit_vector_projection_is_rejected() {
1370        assert!(mentions_vector("SELECT vector FROM messages"));
1371        assert!(mentions_vector("SELECT id, vector FROM messages"));
1372        assert!(mentions_vector("SELECT m.vector FROM messages m"));
1373        assert!(mentions_vector("SELECT array_length(vector) FROM messages"));
1374        assert!(mentions_vector("EXPLAIN SELECT vector FROM messages"));
1375    }
1376
1377    #[test]
1378    fn enrich_appends_recovery_hints() {
1379        // One literal error string per class, captured from real failed calls.
1380        let cases = [
1381            (
1382                "SQL error: Schema error: No field named created_at.",
1383                "schema://pond-sql",
1384            ),
1385            (
1386                "SQL error: External error: Arrow error: Invalid argument error: \
1387                 Encountered non UTF-8 data",
1388                "json_extract",
1389            ),
1390            (
1391                "SQL error: External error: Not supported: LIKE prefix queries are not \
1392                 supported for bitmap indexes",
1393                "split_part",
1394            ),
1395            (
1396                "SQL error: Error during planning: Failed to coerce arguments to satisfy \
1397                 a call to 'json_get_string' function",
1398                "JSONPath",
1399            ),
1400            (
1401                "SQL error: Error during planning: Invalid function 'json_get_json'.",
1402                "json_extract",
1403            ),
1404            (
1405                "SQL error: Resources exhausted: Additional allocation failed for \
1406                 HashJoinInput[0] with top memory consumers",
1407                "json_extract",
1408            ),
1409        ];
1410        for (raw, marker) in cases {
1411            let enriched = enrich(raw);
1412            assert!(enriched.starts_with(raw), "original kept: {enriched}");
1413            assert!(enriched.contains("hint:"), "hint appended: {enriched}");
1414            assert!(enriched.contains(marker), "hint names the fix: {enriched}");
1415        }
1416        // Unrecognized errors pass through untouched.
1417        assert_eq!(
1418            enrich("SQL error: division by zero"),
1419            "SQL error: division by zero"
1420        );
1421    }
1422
1423    #[test]
1424    fn select_star_and_where_vector_are_allowed() {
1425        // `SELECT *` falls through to the existing silent-strip in displayable.
1426        assert!(!mentions_vector("SELECT * FROM messages"));
1427        // Filtering on `vector` is documented as legal (`vector IS NOT NULL`).
1428        assert!(!mentions_vector(
1429            "SELECT message_id FROM messages WHERE vector IS NOT NULL"
1430        ));
1431    }
1432
1433    #[test]
1434    fn jsonb_cast_misuse_detects_cast_and_coloncolon() {
1435        for sql in [
1436            "SELECT CAST(variant_data AS VARCHAR) FROM parts",
1437            "SELECT cast(p.variant_data as text) FROM parts p",
1438            "SELECT variant_data::text FROM parts",
1439            "SELECT p.variant_data :: varchar FROM parts p",
1440            "SELECT options::text FROM messages",
1441            "SELECT lower(CAST(variant_data AS VARCHAR)) FROM parts",
1442        ] {
1443            assert!(jsonb_cast_misuse(sql), "should reject: {sql}");
1444        }
1445    }
1446
1447    #[test]
1448    fn jsonb_cast_misuse_allows_legitimate_use() {
1449        for sql in [
1450            "SELECT json_extract(variant_data, '$') FROM parts",
1451            "SELECT json_get_string(variant_data, 'name') FROM parts",
1452            "SELECT CAST(ordinal AS BIGINT) FROM parts",
1453            "SELECT timestamp::date FROM messages",
1454            // `options` as part of a longer identifier is not the column.
1455            "SELECT my_options::text FROM t",
1456            "SELECT CAST(json_extract(variant_data, '$.x') AS BIGINT) FROM parts",
1457        ] {
1458            assert!(!jsonb_cast_misuse(sql), "should allow: {sql}");
1459        }
1460    }
1461
1462    #[test]
1463    fn jsonb_fulldoc_like_scan_detects_whole_document_substring() {
1464        for sql in [
1465            "SELECT * FROM parts WHERE json_extract(variant_data, '$') LIKE '%needle%'",
1466            "SELECT * FROM parts p WHERE lower(json_extract(p.variant_data, '$')) LIKE '%x%'",
1467            "SELECT * FROM messages WHERE json_extract(options, '$') ILIKE '%y%'",
1468            "SELECT * FROM parts WHERE json_extract(variant_data,'$') NOT LIKE '%z%'",
1469            // The real timeout shape: day-scoped join still scans every part.
1470            "SELECT p.message_id FROM parts p JOIN messages m ON p.message_id = m.message_id \
1471             WHERE m.timestamp >= '2026-06-11' AND lower(json_extract(p.variant_data, '$')) \
1472             LIKE '%weekly limit%'",
1473        ] {
1474            assert!(jsonb_fulldoc_like_scan(sql), "should reject: {sql}");
1475        }
1476    }
1477
1478    #[test]
1479    fn jsonb_fulldoc_like_scan_allows_targeted_and_nonleading() {
1480        for sql in [
1481            // single-field extract, not the whole document
1482            "SELECT * FROM parts WHERE json_extract(variant_data, '$.name') LIKE '%x%'",
1483            // non-leading (prefix) pattern can be served without a full stringify
1484            "SELECT * FROM parts WHERE json_extract(variant_data, '$') LIKE 'pre%'",
1485            // plain text LIKE has no whole-document stringify
1486            "SELECT * FROM messages WHERE search_text LIKE '%x%'",
1487            // indexed predicate, the path agents should take
1488            "SELECT * FROM messages WHERE contains_tokens(search_text, 'x')",
1489            // projecting the stringified value is fine; no LIKE scan
1490            "SELECT json_extract(variant_data, '$') FROM parts LIMIT 1",
1491        ] {
1492            assert!(!jsonb_fulldoc_like_scan(sql), "should allow: {sql}");
1493        }
1494    }
1495
1496    #[test]
1497    fn render_inline_collapses_newlines_in_cells() {
1498        let schema = Arc::new(Schema::new(vec![Field::new("t", DataType::Utf8, true)]));
1499        let batch = RecordBatch::try_new(
1500            schema,
1501            vec![Arc::new(StringArray::from(vec![Some(
1502                "line one\nline two\r\nline three",
1503            )]))],
1504        )
1505        .expect("single-column batch");
1506        let out = render_inline(&[batch], 10, Duration::from_millis(1)).expect("render succeeds");
1507        assert!(
1508            out.contains("line one\\nline two\\nline three"),
1509            "newlines collapse to literal \\n: {out}"
1510        );
1511        // The data row renders as one physical line: header rule, header,
1512        // rule, row, rule - the row itself never wraps.
1513        let row_lines: Vec<&str> = out
1514            .lines()
1515            .filter(|line| line.contains("line one"))
1516            .collect();
1517        assert_eq!(row_lines.len(), 1, "one physical line per row: {out}");
1518    }
1519}