Skip to main content

pond/
sql.rs

1//! `pond_sql_query`: read-only DataFusion SQL over the three Lance tables
2//! (`sessions` / `messages` / `parts`), registered as `LanceTableProvider`s
3//! (behind plan-time views that rename `id` to `message_id` / `session_id`)
4//! on a fresh per-call `SessionContext`. Read-only is enforced in two layers - a
5//! single-`SELECT` pre-parse and `sql_with_options` with DDL/DML/statements all
6//! disabled - so no statement that mutates the corpus or touches the filesystem
7//! (INSERT/UPDATE/DELETE/CREATE/DROP/COPY/CREATE EXTERNAL TABLE/SET) can run.
8//! Results render inline (row-capped) or export to a parquet/ndjson file the
9//! caller fetches via the `pond-sql-export://` resource (`src/transport.rs`).
10
11use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use anyhow::anyhow;
16use arrow_json::LineDelimitedWriter;
17use lance::Dataset;
18use lance::datafusion::LanceTableProvider;
19use lance::deps::arrow_array::builder::{
20    BooleanBuilder, Float64Builder, Int64Builder, StringBuilder,
21};
22use lance::deps::arrow_array::{
23    Array, ArrayRef, GenericStringArray, LargeBinaryArray, OffsetSizeTrait, RecordBatch,
24    StringArray, StringViewArray,
25};
26use lance::deps::arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
27use lance::deps::datafusion::arrow::util::pretty::pretty_format_batches;
28use lance::deps::datafusion::catalog::{Session, TableFunctionImpl, TableProvider};
29use lance::deps::datafusion::common::ScalarValue;
30use lance::deps::datafusion::datasource::{ViewTable, provider_as_source};
31use lance::deps::datafusion::error::DataFusionError;
32use lance::deps::datafusion::execution::SessionStateBuilder;
33use lance::deps::datafusion::execution::runtime_env::RuntimeEnvBuilder;
34use lance::deps::datafusion::logical_expr::{
35    ColumnarValue, LogicalPlanBuilder, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
36    TypeSignature, Volatility,
37};
38use lance::deps::datafusion::logical_expr::{Expr, TableType};
39use lance::deps::datafusion::physical_plan::ExecutionPlan;
40use lance::deps::datafusion::prelude::{SQLOptions, SessionConfig, SessionContext, col};
41use lance::deps::datafusion::sql::parser::{DFParser, Statement as DfStatement};
42use lance::deps::datafusion::sql::sqlparser::ast::{SetExpr, Statement as SqlStatement};
43use lance_arrow::SchemaExt;
44use lance_datafusion::udf::register_functions;
45use lance_index::scalar::FullTextSearchQuery;
46use lance_index::scalar::inverted::parser::from_json;
47use parquet::arrow::ArrowWriter;
48use serde_json::{Map as JsonMap, Value as JsonValue, json};
49
50/// Per-query memory ceiling for the DataFusion runtime. Not enforced on every
51/// operator (datafusion caveat), so the timeout below is the hard backstop.
52const MEM_LIMIT_BYTES: usize = 512 * 1024 * 1024;
53/// Wall-clock cap on `collect()`. DataFusion 53 has no built-in query timeout,
54/// so this `tokio::time::timeout` is the only guard against a runaway plan.
55const QUERY_TIMEOUT: Duration = Duration::from_secs(30);
56/// Byte budget for the inline (rendered table) result; rows are dropped to fit.
57const INLINE_BUDGET_BYTES: usize = 80_000;
58/// Hard ceiling on an export artifact: base64'd over `resources/read` it costs
59/// ~1.33x this in the response, so keep it well under any process envelope.
60const MAX_EXPORT_BYTES: usize = 100 * 1024 * 1024;
61/// Default inline row cap when the caller passes no `limit`.
62pub const DEFAULT_INLINE_ROWS: usize = 100;
63/// Upper bound on the caller-supplied inline `limit`.
64pub const MAX_INLINE_ROWS: usize = 1_000;
65
66/// Export serialization format. Vector columns are excluded and JSON columns
67/// are decoded to text before encoding (see [`displayable`]).
68#[derive(Debug, Clone, Copy)]
69pub enum Format {
70    Parquet,
71    Ndjson,
72}
73
74impl Format {
75    pub fn ext(self) -> &'static str {
76        match self {
77            Self::Parquet => "parquet",
78            Self::Ndjson => "ndjson",
79        }
80    }
81
82    pub fn mime(self) -> &'static str {
83        match self {
84            Self::Parquet => "application/vnd.apache.parquet",
85            Self::Ndjson => "application/x-ndjson",
86        }
87    }
88}
89
90/// How `pond_sql_query` returns results.
91#[derive(Debug, Clone, Copy)]
92pub enum Mode {
93    /// Render a row-capped table into the tool result.
94    Inline,
95    /// Return a row-capped JSON payload; the MCP layer surfaces it through
96    /// `structuredContent` (with a stringified text fallback for clients that
97    /// do not surface the structured channel). Empirically validated on Claude
98    /// Code 2.1.165: when both channels carry the same payload, the agent reads
99    /// the structured one and the text block is a soft-landing for other
100    /// clients (spec 2025-11-25 server SHOULD).
101    InlineJson,
102    /// Write the full result to a file and return a `pond-sql-export://` link.
103    Export(Format),
104}
105
106/// The three Lance datasets, fetched fresh per call so each query sees a
107/// current snapshot (the handle freshness gate runs on each `Store::dataset`).
108pub struct Tables {
109    pub sessions: Arc<Dataset>,
110    pub messages: Arc<Dataset>,
111    pub parts: Arc<Dataset>,
112}
113
114/// Result of a successful `run`.
115pub enum Outcome {
116    /// A rendered, row-capped table (already includes the metrics footer).
117    Inline(String),
118    /// A row-capped JSON payload with metadata fields (`total_rows`,
119    /// `shown_rows`, `truncated`, `elapsed_ms`, `columns`, `rows`).
120    InlineJson(JsonValue),
121    /// Encoded export bytes plus metadata for the caller's summary/resource.
122    Export {
123        bytes: Vec<u8>,
124        format: Format,
125        rows: usize,
126        columns: Vec<String>,
127    },
128}
129
130/// Two error channels: `Query` is caller-fixable (parse/plan/exec/limits) and
131/// the tool surfaces it as an `isError` result so the model self-corrects;
132/// `Infra` is an internal failure surfaced as a protocol error.
133#[derive(Debug)]
134pub enum SqlError {
135    Query(String),
136    Infra(anyhow::Error),
137}
138
139fn infra(error: ArrowError) -> SqlError {
140    SqlError::Infra(anyhow::Error::new(error))
141}
142
143/// Execute one read-only SQL query and return either a rendered table, a JSON
144/// payload, or encoded export bytes.
145pub async fn run(
146    tables: &Tables,
147    sql: &str,
148    mode: Mode,
149    inline_rows: usize,
150) -> Result<Outcome, SqlError> {
151    let parsed = parse_and_gate(sql)?;
152    if matches!(parsed.kind, StatementKind::Explain) && matches!(mode, Mode::Export(_)) {
153        return Err(SqlError::Query(
154            "EXPLAIN returns a plan, not a result set; use format=text (or json) to read it"
155                .to_owned(),
156        ));
157    }
158    if projection_mentions_vector(parsed.projection_query()) {
159        return Err(SqlError::Query(
160            "the `vector` column is not selectable from pond_sql_query (it is a \
161             FixedSizeList<f32> embedding, ~600 bytes per row and not useful in a result). \
162             For semantic search use pond_search. Filtering on it is allowed in WHERE \
163             (e.g. `vector IS NOT NULL`)."
164                .to_owned(),
165        ));
166    }
167    if jsonb_cast_misuse(sql) {
168        return Err(SqlError::Query(
169            "CAST / `::` does not work on the binary JSONB columns (variant_data, options) - \
170             when the bytes happen to be valid text it can even silently return garbage. \
171             Stringify the whole value with json_extract(col, '$') or read one field with \
172             json_extract(col, '$.field')."
173                .to_owned(),
174        ));
175    }
176    if jsonb_fulldoc_like_scan(sql) {
177        return Err(SqlError::Query(
178            "a leading-wildcard LIKE over the whole JSONB document - \
179             json_extract(variant_data, '$') LIKE '%...%' - stringifies and scans every row, \
180             so over parts it will not finish within the time limit. There is no substring \
181             index on tool bodies yet (TODO #47: lance v8 FM-Index). Instead match a single \
182             field with json_extract(variant_data, '$.field') LIKE '...', scope to one session \
183             with session_id = '<id>' and read it with pond_get, or search conversational text \
184             with contains_tokens(search_text, '...')."
185                .to_owned(),
186        ));
187    }
188    let ctx = build_context()?;
189    register(&ctx, tables)?;
190
191    // Defense in depth on top of the pre-parse gate: SQLOptions blocks DDL/DML
192    // at planning time. `allow_statements` stays false for a plain SELECT (the
193    // parse-time gate already rejects SET/SHOW etc.) but must be true for
194    // EXPLAIN, which DataFusion classifies as a Statement node. The inner
195    // query of an EXPLAIN was vetted by the gate above.
196    let options = SQLOptions::new()
197        .with_allow_ddl(false)
198        .with_allow_dml(false)
199        .with_allow_statements(matches!(parsed.kind, StatementKind::Explain));
200    let df = ctx
201        .sql_with_options(sql, options)
202        .await
203        .map_err(|error| SqlError::Query(enrich(&format!("SQL error: {error}"))))?;
204
205    // Captured before `collect()` consumes `df`, so an empty result still
206    // renders its column headers.
207    let result_schema = Arc::new(df.schema().as_arrow().clone());
208    let started = Instant::now();
209    // TODO(#47): substring hunts inside parts.variant_data (json_extract +
210    // LIKE full scans) are the dominant real-world cause of this timeout. The
211    // planned fix is lance v8's FM-Index on variant_data (raw-byte substring
212    // search via `contains(variant_data, 'needle')`); until it lands, the
213    // message steers agents to predicates the current indexes can serve.
214    let collected = tokio::time::timeout(QUERY_TIMEOUT, df.collect())
215        .await
216        .map_err(|_| {
217            SqlError::Query(format!(
218                "query exceeded the {}s limit; add a narrower WHERE or a LIMIT. If you were \
219                 substring-scanning variant_data (json_extract + LIKE), there is no \
220                 substring index on tool bodies yet: filter parts by type and \
221                 json_get_string(variant_data, 'name') first, or search conversational \
222                 text with contains_tokens(search_text, '...') instead.",
223                QUERY_TIMEOUT.as_secs()
224            ))
225        })?
226        .map_err(|error| SqlError::Query(enrich(&format!("SQL error: {error}"))))?;
227    let elapsed = started.elapsed();
228
229    let display: Vec<RecordBatch> = if collected.is_empty() {
230        vec![displayable(&RecordBatch::new_empty(result_schema)).map_err(infra)?]
231    } else {
232        collected
233            .iter()
234            .map(displayable)
235            .collect::<Result<_, _>>()
236            .map_err(infra)?
237    };
238
239    match mode {
240        Mode::Inline => Ok(Outcome::Inline(
241            render_inline(&display, inline_rows, elapsed).map_err(infra)?,
242        )),
243        Mode::InlineJson => Ok(Outcome::InlineJson(render_inline_json(
244            &display,
245            inline_rows,
246            elapsed,
247        )?)),
248        Mode::Export(format) => {
249            let rows = display.iter().map(RecordBatch::num_rows).sum();
250            let columns = display
251                .first()
252                .map(|batch| {
253                    batch
254                        .schema()
255                        .fields()
256                        .iter()
257                        .map(|field| field.name().clone())
258                        .collect::<Vec<_>>()
259                })
260                .unwrap_or_default();
261            let bytes = match format {
262                Format::Parquet => encode_parquet(&display)?,
263                Format::Ndjson => encode_ndjson(&display)?,
264            };
265            if bytes.len() > MAX_EXPORT_BYTES {
266                return Err(SqlError::Query(format!(
267                    "export is {} bytes, over the {MAX_EXPORT_BYTES} byte limit; \
268                     narrow the query or aggregate",
269                    bytes.len()
270                )));
271            }
272            Ok(Outcome::Export {
273                bytes,
274                format,
275                rows,
276                columns,
277            })
278        }
279    }
280}
281
282/// Top-level statement shape allowed past the read-only gate.
283#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284enum StatementKind {
285    /// A plain `Query` (SELECT/WITH/VALUES/UNION).
286    Query,
287    /// `EXPLAIN [ANALYZE] <query>` - planning info only, no mutation.
288    Explain,
289}
290
291/// Parsed top-level statement, normalized so downstream checks always see a
292/// projection-bearing `Query` regardless of whether the user wrote `SELECT`
293/// or `EXPLAIN SELECT`. DataFusion's parser wraps EXPLAIN in its own
294/// `DfStatement::Explain` variant (separate from sqlparser's
295/// `SqlStatement::Explain`), so the gate has to peel both layers.
296struct ParsedStatement {
297    kind: StatementKind,
298    query: lance::deps::datafusion::sql::sqlparser::ast::Query,
299}
300
301impl ParsedStatement {
302    fn projection_query(&self) -> &lance::deps::datafusion::sql::sqlparser::ast::Query {
303        &self.query
304    }
305}
306
307/// Read-only gate: parse the SQL and require exactly one top-level `Query` or
308/// `EXPLAIN <Query>`. Rejects DDL/DML/COPY/SET/SHOW and multi-statement input,
309/// which `SQLOptions` alone does not catch at planning time. EXPLAIN of a
310/// non-Query (e.g. `EXPLAIN INSERT ...`) is also rejected: EXPLAIN itself is
311/// read-only, but letting the inner shape be DDL/DML widens the surface area
312/// the gate has to reason about for no real agent gain.
313fn parse_and_gate(sql: &str) -> Result<ParsedStatement, SqlError> {
314    let statements = DFParser::parse_sql(sql)
315        .map_err(|error| SqlError::Query(format!("SQL parse error: {error}")))?;
316    if statements.len() != 1 {
317        return Err(SqlError::Query(
318            "pond_sql_query runs exactly one statement; submit a single SELECT".to_owned(),
319        ));
320    }
321    let Some(front) = statements.front() else {
322        return Err(read_only_rejection());
323    };
324    match front {
325        DfStatement::Statement(boxed) => match boxed.as_ref() {
326            SqlStatement::Query(query) => Ok(ParsedStatement {
327                kind: StatementKind::Query,
328                query: query.as_ref().clone(),
329            }),
330            _ => Err(read_only_rejection()),
331        },
332        DfStatement::Explain(explain) => match explain.statement.as_ref() {
333            DfStatement::Statement(inner) => match inner.as_ref() {
334                SqlStatement::Query(query) => Ok(ParsedStatement {
335                    kind: StatementKind::Explain,
336                    query: query.as_ref().clone(),
337                }),
338                _ => Err(read_only_rejection()),
339            },
340            _ => Err(read_only_rejection()),
341        },
342        _ => Err(read_only_rejection()),
343    }
344}
345
346fn read_only_rejection() -> SqlError {
347    SqlError::Query(
348        "pond_sql_query is read-only: only a single SELECT/WITH (or EXPLAIN of one) is \
349         allowed (no INSERT/UPDATE/DELETE/CREATE/DROP/COPY/SET)"
350            .to_owned(),
351    )
352}
353
354/// Reject any top-level projection that explicitly references the embedding
355/// `vector` column. Today such queries silently return an empty column (the
356/// FixedSizeList<f32> is stripped by `displayable`), which wastes agent tokens
357/// diagnosing. WHERE/HAVING references stay legal - the doc lets agents filter
358/// on it (e.g. `WHERE vector IS NOT NULL`); only projecting the column out is
359/// blocked. Heuristic: tokenize each top-level SELECT item and look for a bare
360/// `vector` identifier. Covers `SELECT vector`, `SELECT id, vector`,
361/// `SELECT m.vector`, and `SELECT array_length(vector)`. Wildcards (`*` /
362/// `messages.*`) keep the existing silent-strip behavior since they don't name
363/// the column explicitly.
364fn projection_mentions_vector(query: &lance::deps::datafusion::sql::sqlparser::ast::Query) -> bool {
365    walk_set_expr_for_vector(query.body.as_ref())
366}
367
368fn walk_set_expr_for_vector(expr: &SetExpr) -> bool {
369    match expr {
370        SetExpr::Select(select) => select
371            .projection
372            .iter()
373            .any(|item| mentions_vector_token(&item.to_string())),
374        SetExpr::Query(inner) => walk_set_expr_for_vector(inner.body.as_ref()),
375        SetExpr::SetOperation { left, right, .. } => {
376            walk_set_expr_for_vector(left) || walk_set_expr_for_vector(right)
377        }
378        _ => false,
379    }
380}
381
382fn mentions_vector_token(text: &str) -> bool {
383    text.split(|c: char| !c.is_alphanumeric() && c != '_')
384        .any(|token| token == "vector")
385}
386
387/// Plan-time gate for CAST / `::` on the binary JSONB columns. The runtime
388/// failure is data-dependent (CAST only errors when a non-UTF8 byte is hit;
389/// JSONB header bytes are often valid ASCII, so it can silently "succeed" and
390/// return binary garbage), so reject before scanning. Token-scan heuristic in
391/// the spirit of `projection_mentions_vector`; an aliased column that slips
392/// through still hits the `enrich` runtime hint.
393fn jsonb_cast_misuse(sql: &str) -> bool {
394    const JSONB_COLUMNS: [&str; 2] = ["variant_data", "options"];
395    let lowered = sql.to_ascii_lowercase();
396    let bytes = lowered.as_bytes();
397    let is_ident = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
398
399    // `<col> :: <type>`
400    for column in JSONB_COLUMNS {
401        let mut start = 0;
402        while let Some(pos) = lowered[start..].find(column) {
403            let begin = start + pos;
404            let end = begin + column.len();
405            start = end;
406            let bounded = (begin == 0 || !is_ident(bytes[begin - 1]))
407                && (end == bytes.len() || !is_ident(bytes[end]));
408            if bounded && lowered[end..].trim_start().starts_with("::") {
409                return true;
410            }
411        }
412    }
413
414    // `CAST(<qualifier.>col AS <type>`
415    let mut start = 0;
416    while let Some(pos) = lowered[start..].find("cast") {
417        let begin = start + pos;
418        start = begin + 4;
419        if begin > 0 && is_ident(bytes[begin - 1]) {
420            continue;
421        }
422        let Some(open) = lowered[begin + 4..].trim_start().strip_prefix('(') else {
423            continue;
424        };
425        let mut operand = open.trim_start();
426        if let Some(dot) = operand.find('.')
427            && dot > 0
428            && operand.as_bytes()[..dot].iter().all(|b| is_ident(*b))
429        {
430            operand = &operand[dot + 1..];
431        }
432        for column in JSONB_COLUMNS {
433            if let Some(after) = operand.strip_prefix(column)
434                && !after.starts_with(|c: char| c.is_ascii_alphanumeric() || c == '_')
435                && after
436                    .trim_start()
437                    .strip_prefix("as")
438                    .is_some_and(|rest| rest.starts_with(char::is_whitespace))
439            {
440                return true;
441            }
442        }
443    }
444    false
445}
446
447/// Plan-time gate for the one substring shape that reliably exhausts the
448/// wall-clock cap: a leading-wildcard LIKE/ILIKE over the *whole-document*
449/// stringify of a binary JSONB column - `json_extract(variant_data|options,
450/// '$') LIKE '%...%'`. That materializes every row's entire JSONB blob just to
451/// substring-scan it, and the leading `%` defeats every index; over parts
452/// (>1M rows) it does not finish, even scoped to a day. A single-field extract
453/// (`'$.name'`) or any non-leading pattern is left to run - only the
454/// whole-document murder shape is rejected, so the agent gets the indexed path
455/// in milliseconds instead of a timeout. Token-scan heuristic in the spirit of
456/// `jsonb_cast_misuse`; the timeout message remains the backstop for anything
457/// that slips through.
458/// TODO(#47): lance v8's FM-Index gives raw-byte substring search
459/// (`contains(variant_data, 'needle')`); retire this gate once it lands.
460fn jsonb_fulldoc_like_scan(sql: &str) -> bool {
461    const JSONB_COLUMNS: [&str; 2] = ["variant_data", "options"];
462    const NEEDLE: &str = "json_extract";
463    let lowered = sql.to_ascii_lowercase();
464    let bytes = lowered.as_bytes();
465    let is_ident = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
466
467    let mut start = 0;
468    while let Some(pos) = lowered[start..].find(NEEDLE) {
469        let begin = start + pos;
470        start = begin + NEEDLE.len();
471        if begin > 0 && is_ident(bytes[begin - 1]) {
472            continue;
473        }
474        let Some(rest) = lowered[start..].trim_start().strip_prefix('(') else {
475            continue;
476        };
477        let mut operand = rest.trim_start();
478        // optional `qualifier.`
479        if let Some(dot) = operand.find('.')
480            && dot > 0
481            && operand.as_bytes()[..dot].iter().all(|b| is_ident(*b))
482        {
483            operand = &operand[dot + 1..];
484        }
485        let Some(col) = JSONB_COLUMNS.into_iter().find(|c| operand.starts_with(c)) else {
486            continue;
487        };
488        // Require the whole-document path `, '$' )` exactly - a single-field
489        // extract (`'$.name'`) is fine and must keep running.
490        let tail = operand[col.len()..].trim_start();
491        let Some(tail) = tail
492            .strip_prefix(',')
493            .map(str::trim_start)
494            .and_then(|t| t.strip_prefix("'$'"))
495            .map(str::trim_start)
496            .and_then(|t| t.strip_prefix(')'))
497        else {
498            continue;
499        };
500        // Step past any wrapper close-parens (`lower(...)`/`upper(...)`).
501        let mut tail = tail.trim_start();
502        while let Some(next) = tail.strip_prefix(')') {
503            tail = next.trim_start();
504        }
505        if let Some(next) = tail.strip_prefix("not")
506            && next.starts_with(char::is_whitespace)
507        {
508            tail = next.trim_start();
509        }
510        for op in ["like", "ilike"] {
511            if let Some(next) = tail.strip_prefix(op)
512                && next.starts_with(char::is_whitespace)
513                && next.trim_start().starts_with("'%")
514            {
515                return true;
516            }
517        }
518    }
519    false
520}
521
522fn build_context() -> Result<SessionContext, SqlError> {
523    let runtime = RuntimeEnvBuilder::new()
524        .with_memory_limit(MEM_LIMIT_BYTES, 1.0)
525        .build_arc()
526        .map_err(|error| SqlError::Infra(anyhow!("datafusion runtime init failed: {error}")))?;
527    // information_schema is the standard self-discovery path (SELECT ... FROM
528    // information_schema.columns); agents reach for it before any doc.
529    let state = SessionStateBuilder::new()
530        .with_config(SessionConfig::new().with_information_schema(true))
531        .with_runtime_env(runtime)
532        .with_default_features()
533        .build();
534    Ok(SessionContext::new_with_state(state))
535}
536
537/// Plan-time key renames: each table's storage `id` is exposed under a
538/// self-describing name so the same value never changes name between tables -
539/// agents copy column names across queries. One source drives both the
540/// registered views and fts() output so they cannot diverge.
541fn renamed_key(table: &str) -> Option<&'static str> {
542    match table {
543        "messages" => Some("message_id"),
544        "sessions" => Some("session_id"),
545        _ => None,
546    }
547}
548
549fn register(ctx: &SessionContext, tables: &Tables) -> Result<(), SqlError> {
550    for (name, dataset) in [
551        ("sessions", &tables.sessions),
552        ("messages", &tables.messages),
553    ] {
554        // LanceTableProvider (not the bare Dataset impl) so WHERE/projection/
555        // limit push into Lance's indexed scan; (false, false) hides _rowid /
556        // _rowaddr from the SQL schema. The view applies `renamed_key`
557        // plan-time only; storage keeps `id`.
558        let provider = LanceTableProvider::new(dataset.clone(), false, false);
559        let key = renamed_key(name).unwrap_or("id");
560        let view = renamed_view(name, Arc::new(provider), "id", key)
561            .map_err(|error| SqlError::Infra(anyhow!("build {name} view: {error}")))?;
562        ctx.register_table(name, Arc::new(view))
563            .map_err(|error| SqlError::Infra(anyhow!("register table {name}: {error}")))?;
564    }
565    // `parts` hides the `data` blob column behind a projecting view: blob
566    // columns scan as `{position, size}` descriptor structs, so any SQL touch
567    // dies in the planner with an opaque CAST error. The view inlines at plan
568    // time - filters still push into the Lance scan underneath.
569    let provider = LanceTableProvider::new(tables.parts.clone(), false, false);
570    let keep: Vec<_> = tables
571        .parts
572        .schema()
573        .fields
574        .iter()
575        .filter(|field| field.name != "data")
576        .map(|field| col(field.name.as_str()))
577        .collect();
578    let plan = LogicalPlanBuilder::scan("parts", provider_as_source(Arc::new(provider)), None)
579        .and_then(|builder| builder.project(keep))
580        .and_then(LogicalPlanBuilder::build)
581        .map_err(|error| SqlError::Infra(anyhow!("build parts view: {error}")))?;
582    ctx.register_table("parts", Arc::new(ViewTable::new(plan, None)))
583        .map_err(|error| SqlError::Infra(anyhow!("register table parts: {error}")))?;
584    // `fts('messages', '{...}')` BM25 search-in-SQL (vendored provider with a
585    // declared `_score` column - see `ScoredFtsUdtf`), and lance's JSON /
586    // contains_tokens UDFs for filtering inside the JSON columns.
587    let fts = ScoredFtsUdtf {
588        datasets: HashMap::from([
589            ("sessions".to_owned(), tables.sessions.clone()),
590            ("messages".to_owned(), tables.messages.clone()),
591            ("parts".to_owned(), tables.parts.clone()),
592        ]),
593    };
594    ctx.register_udtf("fts", Arc::new(fts));
595    register_functions(ctx);
596    // Shadow lance's strict json_get_* by name: the strict versions abort the
597    // whole scan when any row's field is non-scalar (e.g. tool_result `result`
598    // arrays), turning one polymorphic value into a dead query.
599    for udf in lenient_json_udfs() {
600        ctx.register_udf(udf);
601    }
602    // `any_value` (Postgres 16 / DuckDB / BigQuery - agents reach for it)
603    // doesn't exist in DataFusion 53; alias first_value, which satisfies the
604    // same contract (any_value promises no ordering, so first-encountered is
605    // a valid answer). register_udaf indexes aliases.
606    if let Some(first_value) = ctx.state().aggregate_functions().get("first_value") {
607        ctx.register_udaf(first_value.as_ref().clone().with_aliases(["any_value"]));
608    }
609    // `fts` as a *scalar* exists only to fail at plan time with the correction:
610    // agents pattern-match FTS into WHERE (MySQL MATCH / Postgres @@ priors)
611    // and DataFusion's stock error is "Did you mean 'cos'?". Scalar and
612    // table-function registries are separate namespaces, so the real fts()
613    // UDTF in FROM position is unaffected.
614    ctx.register_udf(ScalarUDF::new_from_impl(FtsMisuse::new()));
615    Ok(())
616}
617
618/// Wrap `provider` in a view projecting every column, with `from` renamed to
619/// `to`. The view inlines at plan time, so filters and projections still push
620/// into the underlying Lance scan.
621fn renamed_view(
622    scan_name: &str,
623    provider: Arc<dyn TableProvider>,
624    from: &str,
625    to: &str,
626) -> Result<ViewTable, DataFusionError> {
627    let projection: Vec<_> = provider
628        .schema()
629        .fields()
630        .iter()
631        .map(|field| {
632            let column = col(field.name().as_str());
633            if field.name() == from {
634                column.alias(to)
635            } else {
636                column
637            }
638        })
639        .collect();
640    let plan = LogicalPlanBuilder::scan(scan_name, provider_as_source(provider), None)?
641        .project(projection)?
642        .build()?;
643    Ok(ViewTable::new(plan, None))
644}
645
646const FTS_MISUSE: &str = "fts is a table function and goes in FROM, not in WHERE or the \
647    projection. For filtering use WHERE contains_tokens(search_text, 'word1 word2') (all \
648    words must match; index-accelerated). For ranked results: SELECT m.message_id, f._score \
649    FROM fts('messages', '{\"match\":{\"column\":\"search_text\",\"terms\":\"...\"}}') f \
650    JOIN messages m ON m.message_id = f.message_id ORDER BY f._score DESC.";
651
652/// See the registration comment: a plan-time teaching error for `WHERE fts(...)`.
653#[derive(Debug, PartialEq, Eq, Hash)]
654struct FtsMisuse {
655    signature: Signature,
656}
657
658impl FtsMisuse {
659    fn new() -> Self {
660        Self {
661            signature: Signature::variadic_any(Volatility::Immutable),
662        }
663    }
664}
665
666impl ScalarUDFImpl for FtsMisuse {
667    fn as_any(&self) -> &dyn std::any::Any {
668        self
669    }
670
671    fn name(&self) -> &str {
672        "fts"
673    }
674
675    fn signature(&self) -> &Signature {
676        &self.signature
677    }
678
679    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
680        Err(DataFusionError::Plan(FTS_MISUSE.to_owned()))
681    }
682
683    fn invoke_with_args(
684        &self,
685        _args: ScalarFunctionArgs,
686    ) -> Result<ColumnarValue, DataFusionError> {
687        Err(DataFusionError::Plan(FTS_MISUSE.to_owned()))
688    }
689}
690
691/// Vendored replacement for lance's `FtsQueryUDTF` (lance-7.0.0
692/// src/dataset/udtf.rs). The upstream provider omits `_score` from its
693/// declared schema while leaving the scanner's scoring autoprojection on, so
694/// `_score` is physically appended but logically unknown: naming it in SQL
695/// fails ("No field named _score") and any aggregate over fts() dies on
696/// DataFusion's physical-vs-logical schema check (COUNT plans 0 columns,
697/// receives 1). This provider declares `_score` as a regular nullable Float32
698/// column, projects it explicitly, and disables the autoprojection - which is
699/// also lance's documented intended end state for score columns
700/// (scanner.rs "_score/_distance should become regular output columns").
701/// Delete once fixed upstream.
702#[derive(Debug)]
703struct ScoredFtsUdtf {
704    datasets: HashMap<String, Arc<Dataset>>,
705}
706
707impl TableFunctionImpl for ScoredFtsUdtf {
708    fn call(
709        &self,
710        expr: &[Expr],
711    ) -> Result<Arc<dyn TableProvider>, lance::deps::datafusion::error::DataFusionError> {
712        let [table_expr, query_expr] = expr else {
713            return Err(DataFusionError::Execution(
714                "fts() takes (table_name, fts_query_json)".to_owned(),
715            ));
716        };
717        let Expr::Literal(ScalarValue::Utf8(Some(table_name)), _) = table_expr else {
718            return Err(DataFusionError::Execution(
719                "fts() first argument must be a table name string".to_owned(),
720            ));
721        };
722        let Expr::Literal(ScalarValue::Utf8(Some(fts_query)), _) = query_expr else {
723            return Err(DataFusionError::Execution(
724                "fts() second argument must be the fts query as a JSON string".to_owned(),
725            ));
726        };
727        let dataset = self.datasets.get(table_name).ok_or_else(|| {
728            DataFusionError::Execution(format!("fts(): table {table_name} not found"))
729        })?;
730        let mut full_schema = Schema::from(dataset.schema());
731        full_schema = full_schema
732            .try_with_column(Field::new(SCORE_COLUMN, DataType::Float32, true))
733            .map_err(|error| DataFusionError::ArrowError(Box::new(error), None))?;
734        let provider: Arc<dyn TableProvider> = Arc::new(ScoredFtsProvider {
735            dataset: dataset.clone(),
736            fts_query: FullTextSearchQuery::new_query(from_json(fts_query)?),
737            full_schema: Arc::new(full_schema),
738        });
739        // Same `renamed_key` as the registered views, so fts() output joins
740        // without a name switch.
741        match renamed_key(table_name) {
742            Some(key) => Ok(Arc::new(renamed_view("fts", provider, "id", key)?)),
743            None => Ok(provider),
744        }
745    }
746}
747
748const SCORE_COLUMN: &str = "_score";
749
750#[derive(Debug)]
751struct ScoredFtsProvider {
752    dataset: Arc<Dataset>,
753    fts_query: FullTextSearchQuery,
754    full_schema: SchemaRef,
755}
756
757#[async_trait::async_trait]
758impl TableProvider for ScoredFtsProvider {
759    fn as_any(&self) -> &dyn std::any::Any {
760        self
761    }
762
763    fn schema(&self) -> SchemaRef {
764        self.full_schema.clone()
765    }
766
767    fn table_type(&self) -> TableType {
768        TableType::Temporary
769    }
770
771    async fn scan(
772        &self,
773        _state: &dyn Session,
774        projection: Option<&Vec<usize>>,
775        filters: &[Expr],
776        limit: Option<usize>,
777    ) -> Result<Arc<dyn ExecutionPlan>, lance::deps::datafusion::error::DataFusionError> {
778        let mut scan = self.dataset.scan();
779        scan.full_text_search(self.fts_query.clone())?;
780        // `_score` is a declared column projected explicitly below; with the
781        // autoprojection off, the physical batch always matches the logical
782        // plan (the mismatch is what breaks aggregates upstream).
783        scan.disable_scoring_autoprojection();
784        match projection {
785            Some(projection) if projection.is_empty() => {
786                scan.empty_project()?;
787            }
788            Some(projection) => {
789                let columns: Vec<&str> = projection
790                    .iter()
791                    .map(|idx| self.full_schema.field(*idx).name().as_str())
792                    .collect();
793                scan.project(&columns)?;
794            }
795            None => {
796                let columns: Vec<&str> = self
797                    .full_schema
798                    .fields()
799                    .iter()
800                    .map(|field| field.name().as_str())
801                    .collect();
802                scan.project(&columns)?;
803            }
804        }
805        if let Some(combined) = filters
806            .iter()
807            .cloned()
808            .reduce(|left, right| left.and(right))
809        {
810            scan.filter_expr(combined);
811        }
812        scan.limit(limit.map(|l| l as i64), None)?;
813        scan.create_plan().await.map_err(DataFusionError::from)
814    }
815}
816
817/// The four scalar shapes the lenient JSON getters produce.
818#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
819enum JsonGet {
820    Text,
821    Int,
822    Float,
823    Bool,
824}
825
826/// Deepest key path the lenient getters accept; deeper nesting is what
827/// json_extract's JSONPath is for.
828const MAX_JSON_KEYS: usize = 6;
829
830/// Lenient replacements for lance's `json_get_string` / `_int` / `_float` /
831/// `_bool`. The strict originals call jsonb's exact converters and turn one
832/// non-scalar field value into a query-wide abort ("Failed to convert to
833/// string: InvalidCast"). Lenient semantics: a string getter serializes
834/// objects/arrays to JSON text; the typed getters return NULL on a
835/// non-coercible value. Unlike lance's one-key originals they take a variadic
836/// key path - `json_get_string(col, 'a', 'b')` - the datafusion-functions-json
837/// convention agents reach for first. Registered after `register_functions`
838/// so they shadow by name.
839fn lenient_json_udfs() -> [ScalarUDF; 4] {
840    let make = |name: &'static str, kind: JsonGet, return_type: DataType| {
841        ScalarUDF::new_from_impl(LenientJsonGet {
842            name,
843            kind,
844            return_type,
845            signature: json_key_path_signature(),
846        })
847    };
848    [
849        make("json_get_string", JsonGet::Text, DataType::Utf8),
850        make("json_get_int", JsonGet::Int, DataType::Int64),
851        make("json_get_float", JsonGet::Float, DataType::Float64),
852        make("json_get_bool", JsonGet::Bool, DataType::Boolean),
853    ]
854}
855
856/// `(LargeBinary, Utf8)` through `(LargeBinary, Utf8 x MAX_JSON_KEYS)`.
857fn json_key_path_signature() -> Signature {
858    let arities = (1..=MAX_JSON_KEYS)
859        .map(|keys| {
860            let mut types = vec![DataType::LargeBinary];
861            types.extend(std::iter::repeat_n(DataType::Utf8, keys));
862            TypeSignature::Exact(types)
863        })
864        .collect();
865    Signature::one_of(arities, Volatility::Immutable)
866}
867
868/// See [`lenient_json_udfs`].
869#[derive(Debug, PartialEq, Eq, Hash)]
870struct LenientJsonGet {
871    name: &'static str,
872    kind: JsonGet,
873    return_type: DataType,
874    signature: Signature,
875}
876
877impl ScalarUDFImpl for LenientJsonGet {
878    fn as_any(&self) -> &dyn std::any::Any {
879        self
880    }
881
882    fn name(&self) -> &str {
883        self.name
884    }
885
886    fn signature(&self) -> &Signature {
887        &self.signature
888    }
889
890    fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
891        Ok(self.return_type.clone())
892    }
893
894    fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue, DataFusionError> {
895        json_get_lenient(&args.args, &self.kind)
896    }
897}
898
899/// One step of the key walk: object member by name, array element by index.
900fn json_step(raw: jsonb::RawJsonb<'_>, key: &str) -> Option<jsonb::OwnedJsonb> {
901    let value = if raw.is_object().unwrap_or(false) {
902        raw.get_by_name(key, false).ok().flatten()
903    } else if raw.is_array().unwrap_or(false) {
904        key.parse::<usize>()
905            .ok()
906            .and_then(|index| raw.get_by_index(index).ok().flatten())
907    } else {
908        None
909    };
910    value.filter(|value| !value.as_raw().is_null().unwrap_or(false))
911}
912
913fn json_get_lenient(
914    args: &[ColumnarValue],
915    kind: &JsonGet,
916) -> Result<ColumnarValue, DataFusionError> {
917    let arrays = ColumnarValue::values_to_arrays(args)?;
918    let Some((jsonb_arg, key_args)) = arrays.split_first().filter(|(_, keys)| !keys.is_empty())
919    else {
920        return Err(DataFusionError::Execution(
921            "json_get_* takes (json_column, 'key', ...) - at least one key".to_owned(),
922        ));
923    };
924    let jsonb_array = jsonb_arg
925        .as_any()
926        .downcast_ref::<LargeBinaryArray>()
927        .ok_or_else(|| {
928            DataFusionError::Execution(
929                "json_get_* argument 1 must be a JSON column (variant_data, options)".to_owned(),
930            )
931        })?;
932    let key_arrays: Vec<&StringArray> = key_args
933        .iter()
934        .map(|key_arg| {
935            key_arg
936                .as_any()
937                .downcast_ref::<StringArray>()
938                .ok_or_else(|| {
939                    DataFusionError::Execution("json_get_* keys must be string literals".to_owned())
940                })
941        })
942        .collect::<Result<_, _>>()?;
943
944    let field = |row: usize| -> Option<jsonb::OwnedJsonb> {
945        if jsonb_array.is_null(row) {
946            return None;
947        }
948        let mut keys = key_arrays.iter();
949        let first = keys.next()?;
950        if first.is_null(row) {
951            return None;
952        }
953        let mut current = json_step(
954            jsonb::RawJsonb::new(jsonb_array.value(row)),
955            first.value(row),
956        )?;
957        for key_array in keys {
958            if key_array.is_null(row) {
959                return None;
960            }
961            current = json_step(current.as_raw(), key_array.value(row))?;
962        }
963        Some(current)
964    };
965
966    let rows = jsonb_array.len();
967    let array: Arc<dyn Array> = match kind {
968        JsonGet::Text => {
969            let mut builder = StringBuilder::with_capacity(rows, 1024);
970            for row in 0..rows {
971                match field(row) {
972                    // Scalar strings come back unquoted; objects/arrays/
973                    // numbers serialize to JSON text instead of erroring.
974                    Some(value) => match value.as_raw().to_str() {
975                        Ok(text) => builder.append_value(text),
976                        Err(_) => builder.append_value(value.to_string()),
977                    },
978                    None => builder.append_null(),
979                }
980            }
981            Arc::new(builder.finish())
982        }
983        JsonGet::Int => {
984            let mut builder = Int64Builder::with_capacity(rows);
985            for row in 0..rows {
986                builder.append_option(field(row).and_then(|value| value.as_raw().to_i64().ok()));
987            }
988            Arc::new(builder.finish())
989        }
990        JsonGet::Float => {
991            let mut builder = Float64Builder::with_capacity(rows);
992            for row in 0..rows {
993                builder.append_option(field(row).and_then(|value| value.as_raw().to_f64().ok()));
994            }
995            Arc::new(builder.finish())
996        }
997        JsonGet::Bool => {
998            let mut builder = BooleanBuilder::with_capacity(rows);
999            for row in 0..rows {
1000                builder.append_option(field(row).and_then(|value| value.as_raw().to_bool().ok()));
1001            }
1002            Arc::new(builder.finish())
1003        }
1004    };
1005    Ok(ColumnarValue::Array(array))
1006}
1007
1008/// Failures name the fix: append a recovery hint to the DataFusion error
1009/// classes agents actually hit, so a failed call teaches the correct next
1010/// query instead of starting a guessing loop. First match wins.
1011fn enrich(message: &str) -> String {
1012    const HINTS: &[(&str, &str)] = &[
1013        (
1014            "No field named",
1015            "columns are messages(session_id, message_id, timestamp, role, source_agent, \
1016             project, content [system-role only], search_text [the conversational text], \
1017             embedding_model, options) | sessions(session_id, parent_session_id, \
1018             parent_message_id, source_agent, created_at, project, options) | \
1019             parts(session_id, message_id, id, ordinal, type, provenance, variant_data, \
1020             options). Part bodies (tool params/results, text) live in parts.variant_data - \
1021             read them with json_extract(variant_data, '$.field'). For text search use \
1022             contains_tokens(search_text, '...') in WHERE, or the fts('messages', ...) \
1023             table function in FROM for ranked results; to read a transcript use pond_get. \
1024             Full doc: resource schema://pond-sql.",
1025        ),
1026        (
1027            "Encountered non UTF-8 data",
1028            "JSON columns (variant_data, options) are binary JSONB - CAST / ::text does not \
1029             work on them. Stringify the whole value with json_extract(col, '$'), or fetch \
1030             one field with json_extract(col, '$.field').",
1031        ),
1032        (
1033            "Resources exhausted",
1034            "the query ran out of memory - usually from carrying whole JSON columns \
1035             (variant_data, options) through a join or sort. Project narrow fields with \
1036             json_extract(col, '$.field') instead of whole columns, filter before joining, \
1037             or export the full set with format=parquet.",
1038        ),
1039        (
1040            "LIKE prefix queries are not supported for bitmap indexes",
1041            "prefix LIKE ('x%') and starts_with() fail on bitmap-indexed columns \
1042             (messages.source_agent, messages.role). Use equality, \
1043             split_part(source_agent, '/', 1) = '...', or an infix pattern (LIKE '%x%').",
1044        ),
1045        (
1046            "call to 'json_",
1047            "JSON function signatures: json_get_string|json_get_int|json_get_float|\
1048             json_get_bool(col, 'key', ...) walk a key path (array steps by numeric \
1049             index); json_get(col, 'key') returns JSONB for chaining; json_extract(col, \
1050             '$.a.b') takes a JSONPath and returns JSON text of any value (the right tool \
1051             for deeply nested or mixed-type fields).",
1052        ),
1053        (
1054            "Invalid function 'json",
1055            "available JSON functions: json_get_string, json_get_int, json_get_float, \
1056             json_get_bool (col, 'key', ...); json_get(col, 'key') -> JSONB for chaining; \
1057             json_extract(col, '$.a.b') -> JSON text; json_array_contains; \
1058             json_array_length. See resource schema://pond-sql.",
1059        ),
1060        (
1061            // Defensive: lance's fts `boolean` query can plan a CollectLeft
1062            // HashJoin over multi-partition match arms, which the optimizer
1063            // does not always repair (works through pond's vendored fts()
1064            // provider; kept for any path that still trips it).
1065            "does not satisfy distribution requirements",
1066            "this fts query shape planned an unexecutable join. For AND semantics use a \
1067             single match query with operator And: fts('messages', \
1068             '{\"match\":{\"column\":\"search_text\",\"terms\":\"a b\",\"operator\":\"And\"}}'), \
1069             optionally with LIKE post-filters in WHERE.",
1070        ),
1071        (
1072            "position is not found but required for phrase queries",
1073            "the full-text index is built without positions, so \"phrase\" queries are \
1074             unavailable. Use a match query with operator And plus LIKE post-filters for \
1075             exact-substring matching.",
1076        ),
1077    ];
1078    for (pattern, hint) in HINTS {
1079        if message.contains(pattern) {
1080            return format!("{message}\nhint: {hint}");
1081        }
1082    }
1083    message.to_owned()
1084}
1085
1086/// Decode lance JSONB columns to JSON text, then drop columns that don't render
1087/// readably (the embedding `vector` FixedSizeList and any leftover binary).
1088fn displayable(batch: &RecordBatch) -> Result<RecordBatch, ArrowError> {
1089    let decoded = lance_arrow::json::convert_lance_json_to_arrow(batch)?;
1090    let keep: Vec<usize> = decoded
1091        .schema()
1092        .fields()
1093        .iter()
1094        .enumerate()
1095        .filter(|(_, field)| is_displayable(field.data_type()))
1096        .map(|(index, _)| index)
1097        .collect();
1098    decoded.project(&keep)
1099}
1100
1101fn is_displayable(data_type: &DataType) -> bool {
1102    !matches!(
1103        data_type,
1104        DataType::FixedSizeList(_, _)
1105            | DataType::Binary
1106            | DataType::LargeBinary
1107            | DataType::BinaryView
1108            | DataType::FixedSizeBinary(_)
1109    )
1110}
1111
1112/// One physical line per row: embedded newlines in cell values (markdown,
1113/// multi-line commands) otherwise explode a row across many table lines that
1114/// hard-wrap unreadably in narrow clients. The literal two-char `\n` matches
1115/// the JSON escaping agents already read, and keeps row boundaries
1116/// unambiguous. Inline table mode only - json and export modes keep raw data.
1117fn collapse_newlines(batches: &[RecordBatch]) -> Result<Vec<RecordBatch>, ArrowError> {
1118    fn escape<O: OffsetSizeTrait>(array: &GenericStringArray<O>) -> ArrayRef {
1119        let escaped: GenericStringArray<O> =
1120            array.iter().map(|value| value.map(escape_cell)).collect();
1121        Arc::new(escaped)
1122    }
1123    fn escape_cell(text: &str) -> std::borrow::Cow<'_, str> {
1124        if text.contains(['\n', '\r']) {
1125            std::borrow::Cow::Owned(text.replace("\r\n", "\\n").replace(['\n', '\r'], "\\n"))
1126        } else {
1127            std::borrow::Cow::Borrowed(text)
1128        }
1129    }
1130    batches
1131        .iter()
1132        .map(|batch| {
1133            let columns: Vec<ArrayRef> = batch
1134                .columns()
1135                .iter()
1136                .map(|array| match array.data_type() {
1137                    DataType::Utf8 => array
1138                        .as_any()
1139                        .downcast_ref::<StringArray>()
1140                        .map_or_else(|| array.clone(), escape),
1141                    DataType::LargeUtf8 => array
1142                        .as_any()
1143                        .downcast_ref::<GenericStringArray<i64>>()
1144                        .map_or_else(|| array.clone(), escape),
1145                    DataType::Utf8View => array
1146                        .as_any()
1147                        .downcast_ref::<StringViewArray>()
1148                        .map_or_else(
1149                            || array.clone(),
1150                            |view| {
1151                                let escaped: StringViewArray =
1152                                    view.iter().map(|value| value.map(escape_cell)).collect();
1153                                Arc::new(escaped)
1154                            },
1155                        ),
1156                    _ => array.clone(),
1157                })
1158                .collect();
1159            RecordBatch::try_new(batch.schema(), columns)
1160        })
1161        .collect()
1162}
1163
1164fn render_inline(
1165    display: &[RecordBatch],
1166    max_rows: usize,
1167    elapsed: Duration,
1168) -> Result<String, ArrowError> {
1169    let total: usize = display.iter().map(RecordBatch::num_rows).sum();
1170    let elapsed_ms = elapsed.as_millis();
1171    if total == 0 {
1172        // Still render the header so the caller sees the result columns.
1173        return Ok(format!(
1174            "0 rows ({elapsed_ms} ms).\n{}",
1175            pretty_format_batches(display)?
1176        ));
1177    }
1178    let render = |shown: usize| -> Result<String, ArrowError> {
1179        let limited = collapse_newlines(&limit_batches(display, shown))?;
1180        Ok(pretty_format_batches(&limited)?.to_string())
1181    };
1182    let mut shown = total.min(max_rows);
1183    let mut table = render(shown)?;
1184    while table.len() > INLINE_BUDGET_BYTES && shown > 1 {
1185        shown = (shown / 2).max(1);
1186        table = render(shown)?;
1187    }
1188    let mut out = format!("{total} row(s) in {elapsed_ms} ms; showing {shown}.\n{table}");
1189    if shown < total {
1190        out.push_str(&format!(
1191            "\n... {} row(s) omitted. To page: ORDER BY <indexed col> (e.g. timestamp, \
1192             message_id), then in the next call add `WHERE (col, message_id) < \
1193             (<last_col>, <last_message_id>)` - keyset pagination, see schema://pond-sql. \
1194             For the full set: format=parquet or format=ndjson.",
1195            total - shown
1196        ));
1197    }
1198    Ok(out)
1199}
1200
1201/// JSON sibling of `render_inline`: same row cap and byte-budget shrinking,
1202/// returned as a `JsonValue` so the MCP layer can hand it to
1203/// `CallToolResult::structured` (text fallback + structured channel in one
1204/// call, see [`Mode::InlineJson`]).
1205fn render_inline_json(
1206    display: &[RecordBatch],
1207    max_rows: usize,
1208    elapsed: Duration,
1209) -> Result<JsonValue, SqlError> {
1210    let total: usize = display.iter().map(RecordBatch::num_rows).sum();
1211    let columns: Vec<String> = display
1212        .first()
1213        .map(|batch| {
1214            batch
1215                .schema()
1216                .fields()
1217                .iter()
1218                .map(|field| field.name().clone())
1219                .collect()
1220        })
1221        .unwrap_or_default();
1222    let elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX);
1223
1224    if total == 0 {
1225        return Ok(json!({
1226            "total_rows": 0,
1227            "shown_rows": 0,
1228            "truncated": false,
1229            "elapsed_ms": elapsed_ms,
1230            "columns": columns,
1231            "rows": [],
1232        }));
1233    }
1234
1235    let mut shown = total.min(max_rows);
1236    let mut rows = batches_to_json_rows(&limit_batches(display, shown))?;
1237    let mut serialized = serde_json::to_string(&rows)
1238        .map_err(|error| SqlError::Infra(anyhow!("json serialize: {error}")))?;
1239    while serialized.len() > INLINE_BUDGET_BYTES && shown > 1 {
1240        shown = (shown / 2).max(1);
1241        rows = batches_to_json_rows(&limit_batches(display, shown))?;
1242        serialized = serde_json::to_string(&rows)
1243            .map_err(|error| SqlError::Infra(anyhow!("json serialize: {error}")))?;
1244    }
1245
1246    let mut payload = JsonMap::new();
1247    payload.insert("total_rows".to_owned(), json!(total));
1248    payload.insert("shown_rows".to_owned(), json!(shown));
1249    payload.insert("truncated".to_owned(), json!(shown < total));
1250    payload.insert("elapsed_ms".to_owned(), json!(elapsed_ms));
1251    payload.insert("columns".to_owned(), json!(columns));
1252    payload.insert("rows".to_owned(), JsonValue::Array(rows));
1253    if shown < total {
1254        payload.insert(
1255            "next_steps".to_owned(),
1256            json!(format!(
1257                "{} row(s) omitted; ORDER BY + keyset (`WHERE (col, message_id) < \
1258                 (<last_col>, <last_message_id>)`) to page, or format=parquet|ndjson for \
1259                 the full set. See schema://pond-sql.",
1260                total - shown
1261            )),
1262        );
1263    }
1264    Ok(JsonValue::Object(payload))
1265}
1266
1267/// Convert RecordBatches to a JSON array of row objects via the existing
1268/// NDJSON writer (handles all Arrow types, including the decoded JSON columns
1269/// that come out of `displayable`).
1270fn batches_to_json_rows(batches: &[RecordBatch]) -> Result<Vec<JsonValue>, SqlError> {
1271    if batches.iter().all(|batch| batch.num_rows() == 0) {
1272        return Ok(Vec::new());
1273    }
1274    let mut buffer = Vec::new();
1275    {
1276        let mut writer = LineDelimitedWriter::new(&mut buffer);
1277        let refs: Vec<&RecordBatch> = batches.iter().collect();
1278        writer
1279            .write_batches(&refs)
1280            .map_err(|error| SqlError::Infra(anyhow!("ndjson encode: {error}")))?;
1281        writer
1282            .finish()
1283            .map_err(|error| SqlError::Infra(anyhow!("ndjson finish: {error}")))?;
1284    }
1285    let text = String::from_utf8(buffer)
1286        .map_err(|error| SqlError::Infra(anyhow!("ndjson not utf-8: {error}")))?;
1287    text.lines()
1288        .filter(|line| !line.is_empty())
1289        .map(|line| {
1290            serde_json::from_str::<JsonValue>(line)
1291                .map_err(|error| SqlError::Infra(anyhow!("ndjson parse: {error}")))
1292        })
1293        .collect()
1294}
1295
1296fn limit_batches(batches: &[RecordBatch], max_rows: usize) -> Vec<RecordBatch> {
1297    let mut out = Vec::new();
1298    let mut remaining = max_rows;
1299    for batch in batches {
1300        if remaining == 0 {
1301            break;
1302        }
1303        if batch.num_rows() <= remaining {
1304            remaining -= batch.num_rows();
1305            out.push(batch.clone());
1306        } else {
1307            out.push(batch.slice(0, remaining));
1308            remaining = 0;
1309        }
1310    }
1311    out
1312}
1313
1314fn encode_parquet(batches: &[RecordBatch]) -> Result<Vec<u8>, SqlError> {
1315    let schema = batches
1316        .first()
1317        .map(RecordBatch::schema)
1318        .ok_or_else(|| SqlError::Query("query returned no columns to export".to_owned()))?;
1319    let mut buffer = Vec::new();
1320    let mut writer = ArrowWriter::try_new(&mut buffer, schema, None)
1321        .map_err(|error| SqlError::Infra(anyhow!("parquet init failed: {error}")))?;
1322    for batch in batches {
1323        writer
1324            .write(batch)
1325            .map_err(|error| SqlError::Infra(anyhow!("parquet write failed: {error}")))?;
1326    }
1327    writer
1328        .close()
1329        .map_err(|error| SqlError::Infra(anyhow!("parquet close failed: {error}")))?;
1330    Ok(buffer)
1331}
1332
1333fn encode_ndjson(batches: &[RecordBatch]) -> Result<Vec<u8>, SqlError> {
1334    let mut buffer = Vec::new();
1335    {
1336        let mut writer = LineDelimitedWriter::new(&mut buffer);
1337        let refs: Vec<&RecordBatch> = batches.iter().collect();
1338        writer
1339            .write_batches(&refs)
1340            .map_err(|error| SqlError::Infra(anyhow!("ndjson write failed: {error}")))?;
1341        writer
1342            .finish()
1343            .map_err(|error| SqlError::Infra(anyhow!("ndjson finish failed: {error}")))?;
1344    }
1345    Ok(buffer)
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350    #![allow(clippy::expect_used)]
1351
1352    use super::*;
1353
1354    fn rejected(sql: &str) -> bool {
1355        matches!(parse_and_gate(sql), Err(SqlError::Query(_)))
1356    }
1357
1358    fn parses_as(sql: &str, expected: StatementKind) -> bool {
1359        match parse_and_gate(sql) {
1360            Ok(parsed) => matches!(
1361                (&parsed.kind, &expected),
1362                (StatementKind::Query, StatementKind::Query)
1363                    | (StatementKind::Explain, StatementKind::Explain)
1364            ),
1365            Err(_) => false,
1366        }
1367    }
1368
1369    #[test]
1370    fn allows_single_select_and_cte() {
1371        assert!(parses_as("SELECT 1", StatementKind::Query));
1372        assert!(parses_as(
1373            "SELECT role, count(*) FROM messages GROUP BY role",
1374            StatementKind::Query
1375        ));
1376        assert!(parses_as(
1377            "WITH t AS (SELECT 1 AS a) SELECT a FROM t",
1378            StatementKind::Query
1379        ));
1380    }
1381
1382    #[test]
1383    fn allows_explain_of_select() {
1384        assert!(parses_as("EXPLAIN SELECT 1", StatementKind::Explain));
1385        assert!(parses_as(
1386            "EXPLAIN ANALYZE SELECT role FROM messages",
1387            StatementKind::Explain
1388        ));
1389    }
1390
1391    #[test]
1392    fn rejects_explain_of_non_query() {
1393        // EXPLAIN of a side-effecting statement: the inner statement is what
1394        // would matter; reject to keep the surface tight.
1395        assert!(rejected("EXPLAIN INSERT INTO messages VALUES ('x')"));
1396    }
1397
1398    #[test]
1399    fn rejects_writes_and_side_effects() {
1400        assert!(rejected("INSERT INTO messages VALUES ('x')"));
1401        assert!(rejected("UPDATE messages SET role = 'x'"));
1402        assert!(rejected("DELETE FROM messages"));
1403        assert!(rejected("CREATE TABLE t (x INT)"));
1404        assert!(rejected("CREATE VIEW v AS SELECT 1"));
1405        assert!(rejected("DROP TABLE messages"));
1406        assert!(rejected(
1407            "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION '/etc'"
1408        ));
1409        assert!(rejected("COPY (SELECT 1) TO '/tmp/x.parquet'"));
1410        assert!(rejected("SET a = 1"));
1411    }
1412
1413    #[test]
1414    fn rejects_multiple_statements() {
1415        assert!(rejected("SELECT 1; SELECT 2"));
1416        assert!(rejected("SELECT 1; DROP TABLE messages"));
1417    }
1418
1419    #[test]
1420    fn rejects_unparseable() {
1421        assert!(rejected("NOT SQL AT ALL ;;"));
1422    }
1423
1424    fn mentions_vector(sql: &str) -> bool {
1425        match parse_and_gate(sql) {
1426            Ok(parsed) => projection_mentions_vector(parsed.projection_query()),
1427            Err(_) => false,
1428        }
1429    }
1430
1431    #[test]
1432    fn explicit_vector_projection_is_rejected() {
1433        assert!(mentions_vector("SELECT vector FROM messages"));
1434        assert!(mentions_vector("SELECT id, vector FROM messages"));
1435        assert!(mentions_vector("SELECT m.vector FROM messages m"));
1436        assert!(mentions_vector("SELECT array_length(vector) FROM messages"));
1437        assert!(mentions_vector("EXPLAIN SELECT vector FROM messages"));
1438    }
1439
1440    #[test]
1441    fn enrich_appends_recovery_hints() {
1442        // One literal error string per class, captured from real failed calls.
1443        let cases = [
1444            (
1445                "SQL error: Schema error: No field named created_at.",
1446                "schema://pond-sql",
1447            ),
1448            (
1449                "SQL error: External error: Arrow error: Invalid argument error: \
1450                 Encountered non UTF-8 data",
1451                "json_extract",
1452            ),
1453            (
1454                "SQL error: External error: Not supported: LIKE prefix queries are not \
1455                 supported for bitmap indexes",
1456                "split_part",
1457            ),
1458            (
1459                "SQL error: Error during planning: Failed to coerce arguments to satisfy \
1460                 a call to 'json_get_string' function",
1461                "JSONPath",
1462            ),
1463            (
1464                "SQL error: Error during planning: Invalid function 'json_get_json'.",
1465                "json_extract",
1466            ),
1467            (
1468                "SQL error: Resources exhausted: Additional allocation failed for \
1469                 HashJoinInput[0] with top memory consumers",
1470                "json_extract",
1471            ),
1472        ];
1473        for (raw, marker) in cases {
1474            let enriched = enrich(raw);
1475            assert!(enriched.starts_with(raw), "original kept: {enriched}");
1476            assert!(enriched.contains("hint:"), "hint appended: {enriched}");
1477            assert!(enriched.contains(marker), "hint names the fix: {enriched}");
1478        }
1479        // Unrecognized errors pass through untouched.
1480        assert_eq!(
1481            enrich("SQL error: division by zero"),
1482            "SQL error: division by zero"
1483        );
1484    }
1485
1486    #[test]
1487    fn select_star_and_where_vector_are_allowed() {
1488        // `SELECT *` falls through to the existing silent-strip in displayable.
1489        assert!(!mentions_vector("SELECT * FROM messages"));
1490        // Filtering on `vector` is documented as legal (`vector IS NOT NULL`).
1491        assert!(!mentions_vector(
1492            "SELECT message_id FROM messages WHERE vector IS NOT NULL"
1493        ));
1494    }
1495
1496    #[test]
1497    fn jsonb_cast_misuse_detects_cast_and_coloncolon() {
1498        for sql in [
1499            "SELECT CAST(variant_data AS VARCHAR) FROM parts",
1500            "SELECT cast(p.variant_data as text) FROM parts p",
1501            "SELECT variant_data::text FROM parts",
1502            "SELECT p.variant_data :: varchar FROM parts p",
1503            "SELECT options::text FROM messages",
1504            "SELECT lower(CAST(variant_data AS VARCHAR)) FROM parts",
1505        ] {
1506            assert!(jsonb_cast_misuse(sql), "should reject: {sql}");
1507        }
1508    }
1509
1510    #[test]
1511    fn jsonb_cast_misuse_allows_legitimate_use() {
1512        for sql in [
1513            "SELECT json_extract(variant_data, '$') FROM parts",
1514            "SELECT json_get_string(variant_data, 'name') FROM parts",
1515            "SELECT CAST(ordinal AS BIGINT) FROM parts",
1516            "SELECT timestamp::date FROM messages",
1517            // `options` as part of a longer identifier is not the column.
1518            "SELECT my_options::text FROM t",
1519            "SELECT CAST(json_extract(variant_data, '$.x') AS BIGINT) FROM parts",
1520        ] {
1521            assert!(!jsonb_cast_misuse(sql), "should allow: {sql}");
1522        }
1523    }
1524
1525    #[test]
1526    fn jsonb_fulldoc_like_scan_detects_whole_document_substring() {
1527        for sql in [
1528            "SELECT * FROM parts WHERE json_extract(variant_data, '$') LIKE '%needle%'",
1529            "SELECT * FROM parts p WHERE lower(json_extract(p.variant_data, '$')) LIKE '%x%'",
1530            "SELECT * FROM messages WHERE json_extract(options, '$') ILIKE '%y%'",
1531            "SELECT * FROM parts WHERE json_extract(variant_data,'$') NOT LIKE '%z%'",
1532            // The real timeout shape: day-scoped join still scans every part.
1533            "SELECT p.message_id FROM parts p JOIN messages m ON p.message_id = m.message_id \
1534             WHERE m.timestamp >= '2026-06-11' AND lower(json_extract(p.variant_data, '$')) \
1535             LIKE '%weekly limit%'",
1536        ] {
1537            assert!(jsonb_fulldoc_like_scan(sql), "should reject: {sql}");
1538        }
1539    }
1540
1541    #[test]
1542    fn jsonb_fulldoc_like_scan_allows_targeted_and_nonleading() {
1543        for sql in [
1544            // single-field extract, not the whole document
1545            "SELECT * FROM parts WHERE json_extract(variant_data, '$.name') LIKE '%x%'",
1546            // non-leading (prefix) pattern can be served without a full stringify
1547            "SELECT * FROM parts WHERE json_extract(variant_data, '$') LIKE 'pre%'",
1548            // plain text LIKE has no whole-document stringify
1549            "SELECT * FROM messages WHERE search_text LIKE '%x%'",
1550            // indexed predicate, the path agents should take
1551            "SELECT * FROM messages WHERE contains_tokens(search_text, 'x')",
1552            // projecting the stringified value is fine; no LIKE scan
1553            "SELECT json_extract(variant_data, '$') FROM parts LIMIT 1",
1554        ] {
1555            assert!(!jsonb_fulldoc_like_scan(sql), "should allow: {sql}");
1556        }
1557    }
1558
1559    #[test]
1560    fn render_inline_collapses_newlines_in_cells() {
1561        let schema = Arc::new(Schema::new(vec![Field::new("t", DataType::Utf8, true)]));
1562        let batch = RecordBatch::try_new(
1563            schema,
1564            vec![Arc::new(StringArray::from(vec![Some(
1565                "line one\nline two\r\nline three",
1566            )]))],
1567        )
1568        .expect("single-column batch");
1569        let out = render_inline(&[batch], 10, Duration::from_millis(1)).expect("render succeeds");
1570        assert!(
1571            out.contains("line one\\nline two\\nline three"),
1572            "newlines collapse to literal \\n: {out}"
1573        );
1574        // The data row renders as one physical line: header rule, header,
1575        // rule, row, rule - the row itself never wraps.
1576        let row_lines: Vec<&str> = out
1577            .lines()
1578            .filter(|line| line.contains("line one"))
1579            .collect();
1580        assert_eq!(row_lines.len(), 1, "one physical line per row: {out}");
1581    }
1582}