1use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use anyhow::anyhow;
16use arrow_json::LineDelimitedWriter;
17use lance::Dataset;
18use lance::datafusion::LanceTableProvider;
19use lance::deps::arrow_array::builder::{
20 BooleanBuilder, Float64Builder, Int64Builder, StringBuilder,
21};
22use lance::deps::arrow_array::{
23 Array, ArrayRef, GenericStringArray, LargeBinaryArray, OffsetSizeTrait, RecordBatch,
24 StringArray, StringViewArray,
25};
26use lance::deps::arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
27use lance::deps::datafusion::arrow::util::pretty::pretty_format_batches;
28use lance::deps::datafusion::catalog::{Session, TableFunctionImpl, TableProvider};
29use lance::deps::datafusion::common::ScalarValue;
30use lance::deps::datafusion::datasource::{ViewTable, provider_as_source};
31use lance::deps::datafusion::error::DataFusionError;
32use lance::deps::datafusion::execution::SessionStateBuilder;
33use lance::deps::datafusion::execution::runtime_env::RuntimeEnvBuilder;
34use lance::deps::datafusion::logical_expr::{
35 ColumnarValue, LogicalPlanBuilder, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
36 TypeSignature, Volatility,
37};
38use lance::deps::datafusion::logical_expr::{Expr, TableType};
39use lance::deps::datafusion::physical_plan::ExecutionPlan;
40use lance::deps::datafusion::prelude::{SQLOptions, SessionConfig, SessionContext, col};
41use lance::deps::datafusion::sql::parser::{DFParser, Statement as DfStatement};
42use lance::deps::datafusion::sql::sqlparser::ast::{SetExpr, Statement as SqlStatement};
43use lance_arrow::SchemaExt;
44use lance_datafusion::udf::register_functions;
45use lance_index::scalar::FullTextSearchQuery;
46use lance_index::scalar::inverted::parser::from_json;
47use parquet::arrow::ArrowWriter;
48use serde_json::{Map as JsonMap, Value as JsonValue, json};
49
50const MEM_LIMIT_BYTES: usize = 512 * 1024 * 1024;
53const QUERY_TIMEOUT: Duration = Duration::from_secs(30);
56const INLINE_BUDGET_BYTES: usize = 80_000;
58const MAX_EXPORT_BYTES: usize = 100 * 1024 * 1024;
61pub const DEFAULT_INLINE_ROWS: usize = 100;
63pub const MAX_INLINE_ROWS: usize = 1_000;
65
66#[derive(Debug, Clone, Copy)]
69pub enum Format {
70 Parquet,
71 Ndjson,
72}
73
74impl Format {
75 pub fn ext(self) -> &'static str {
76 match self {
77 Self::Parquet => "parquet",
78 Self::Ndjson => "ndjson",
79 }
80 }
81
82 pub fn mime(self) -> &'static str {
83 match self {
84 Self::Parquet => "application/vnd.apache.parquet",
85 Self::Ndjson => "application/x-ndjson",
86 }
87 }
88}
89
90#[derive(Debug, Clone, Copy)]
92pub enum Mode {
93 Inline,
95 InlineJson,
102 Export(Format),
104}
105
106pub struct Tables {
109 pub sessions: Arc<Dataset>,
110 pub messages: Arc<Dataset>,
111 pub parts: Arc<Dataset>,
112}
113
114pub enum Outcome {
116 Inline(String),
118 InlineJson(JsonValue),
121 Export {
123 bytes: Vec<u8>,
124 format: Format,
125 rows: usize,
126 columns: Vec<String>,
127 },
128}
129
130#[derive(Debug)]
134pub enum SqlError {
135 Query(String),
136 Infra(anyhow::Error),
137}
138
139fn infra(error: ArrowError) -> SqlError {
140 SqlError::Infra(anyhow::Error::new(error))
141}
142
143pub async fn run(
146 tables: &Tables,
147 sql: &str,
148 mode: Mode,
149 inline_rows: usize,
150) -> Result<Outcome, SqlError> {
151 let parsed = parse_and_gate(sql)?;
152 if matches!(parsed.kind, StatementKind::Explain) && matches!(mode, Mode::Export(_)) {
153 return Err(SqlError::Query(
154 "EXPLAIN returns a plan, not a result set; use format=text (or json) to read it"
155 .to_owned(),
156 ));
157 }
158 if projection_mentions_vector(parsed.projection_query()) {
159 return Err(SqlError::Query(
160 "the `vector` column is not selectable from pond_sql_query (it is a \
161 FixedSizeList<f32> embedding, ~600 bytes per row and not useful in a result). \
162 For semantic search use pond_search. Filtering on it is allowed in WHERE \
163 (e.g. `vector IS NOT NULL`)."
164 .to_owned(),
165 ));
166 }
167 if jsonb_cast_misuse(sql) {
168 return Err(SqlError::Query(
169 "CAST / `::` does not work on the binary JSONB columns (variant_data, options) - \
170 when the bytes happen to be valid text it can even silently return garbage. \
171 Stringify the whole value with json_extract(col, '$') or read one field with \
172 json_extract(col, '$.field')."
173 .to_owned(),
174 ));
175 }
176 if jsonb_fulldoc_like_scan(sql) {
177 return Err(SqlError::Query(
178 "a leading-wildcard LIKE over the whole JSONB document - \
179 json_extract(variant_data, '$') LIKE '%...%' - stringifies and scans every row, \
180 so over parts it will not finish within the time limit. There is no substring \
181 index on tool bodies yet (TODO #47: lance v8 FM-Index). Instead match a single \
182 field with json_extract(variant_data, '$.field') LIKE '...', scope to one session \
183 with session_id = '<id>' and read it with pond_get, or search conversational text \
184 with contains_tokens(search_text, '...')."
185 .to_owned(),
186 ));
187 }
188 let ctx = build_context()?;
189 register(&ctx, tables)?;
190
191 let options = SQLOptions::new()
197 .with_allow_ddl(false)
198 .with_allow_dml(false)
199 .with_allow_statements(matches!(parsed.kind, StatementKind::Explain));
200 let df = ctx
201 .sql_with_options(sql, options)
202 .await
203 .map_err(|error| SqlError::Query(enrich(&format!("SQL error: {error}"))))?;
204
205 let result_schema = Arc::new(df.schema().as_arrow().clone());
208 let started = Instant::now();
209 let collected = tokio::time::timeout(QUERY_TIMEOUT, df.collect())
215 .await
216 .map_err(|_| {
217 SqlError::Query(format!(
218 "query exceeded the {}s limit; add a narrower WHERE or a LIMIT. If you were \
219 substring-scanning variant_data (json_extract + LIKE), there is no \
220 substring index on tool bodies yet: filter parts by type and \
221 json_get_string(variant_data, 'name') first, or search conversational \
222 text with contains_tokens(search_text, '...') instead.",
223 QUERY_TIMEOUT.as_secs()
224 ))
225 })?
226 .map_err(|error| SqlError::Query(enrich(&format!("SQL error: {error}"))))?;
227 let elapsed = started.elapsed();
228
229 let display: Vec<RecordBatch> = if collected.is_empty() {
230 vec![displayable(&RecordBatch::new_empty(result_schema)).map_err(infra)?]
231 } else {
232 collected
233 .iter()
234 .map(displayable)
235 .collect::<Result<_, _>>()
236 .map_err(infra)?
237 };
238
239 match mode {
240 Mode::Inline => Ok(Outcome::Inline(
241 render_inline(&display, inline_rows, elapsed).map_err(infra)?,
242 )),
243 Mode::InlineJson => Ok(Outcome::InlineJson(render_inline_json(
244 &display,
245 inline_rows,
246 elapsed,
247 )?)),
248 Mode::Export(format) => {
249 let rows = display.iter().map(RecordBatch::num_rows).sum();
250 let columns = display
251 .first()
252 .map(|batch| {
253 batch
254 .schema()
255 .fields()
256 .iter()
257 .map(|field| field.name().clone())
258 .collect::<Vec<_>>()
259 })
260 .unwrap_or_default();
261 let bytes = match format {
262 Format::Parquet => encode_parquet(&display)?,
263 Format::Ndjson => encode_ndjson(&display)?,
264 };
265 if bytes.len() > MAX_EXPORT_BYTES {
266 return Err(SqlError::Query(format!(
267 "export is {} bytes, over the {MAX_EXPORT_BYTES} byte limit; \
268 narrow the query or aggregate",
269 bytes.len()
270 )));
271 }
272 Ok(Outcome::Export {
273 bytes,
274 format,
275 rows,
276 columns,
277 })
278 }
279 }
280}
281
282#[derive(Debug, Clone, Copy, PartialEq, Eq)]
284enum StatementKind {
285 Query,
287 Explain,
289}
290
291struct ParsedStatement {
297 kind: StatementKind,
298 query: lance::deps::datafusion::sql::sqlparser::ast::Query,
299}
300
301impl ParsedStatement {
302 fn projection_query(&self) -> &lance::deps::datafusion::sql::sqlparser::ast::Query {
303 &self.query
304 }
305}
306
307fn parse_and_gate(sql: &str) -> Result<ParsedStatement, SqlError> {
314 let statements = DFParser::parse_sql(sql)
315 .map_err(|error| SqlError::Query(format!("SQL parse error: {error}")))?;
316 if statements.len() != 1 {
317 return Err(SqlError::Query(
318 "pond_sql_query runs exactly one statement; submit a single SELECT".to_owned(),
319 ));
320 }
321 let Some(front) = statements.front() else {
322 return Err(read_only_rejection());
323 };
324 match front {
325 DfStatement::Statement(boxed) => match boxed.as_ref() {
326 SqlStatement::Query(query) => Ok(ParsedStatement {
327 kind: StatementKind::Query,
328 query: query.as_ref().clone(),
329 }),
330 _ => Err(read_only_rejection()),
331 },
332 DfStatement::Explain(explain) => match explain.statement.as_ref() {
333 DfStatement::Statement(inner) => match inner.as_ref() {
334 SqlStatement::Query(query) => Ok(ParsedStatement {
335 kind: StatementKind::Explain,
336 query: query.as_ref().clone(),
337 }),
338 _ => Err(read_only_rejection()),
339 },
340 _ => Err(read_only_rejection()),
341 },
342 _ => Err(read_only_rejection()),
343 }
344}
345
346fn read_only_rejection() -> SqlError {
347 SqlError::Query(
348 "pond_sql_query is read-only: only a single SELECT/WITH (or EXPLAIN of one) is \
349 allowed (no INSERT/UPDATE/DELETE/CREATE/DROP/COPY/SET)"
350 .to_owned(),
351 )
352}
353
354fn projection_mentions_vector(query: &lance::deps::datafusion::sql::sqlparser::ast::Query) -> bool {
365 walk_set_expr_for_vector(query.body.as_ref())
366}
367
368fn walk_set_expr_for_vector(expr: &SetExpr) -> bool {
369 match expr {
370 SetExpr::Select(select) => select
371 .projection
372 .iter()
373 .any(|item| mentions_vector_token(&item.to_string())),
374 SetExpr::Query(inner) => walk_set_expr_for_vector(inner.body.as_ref()),
375 SetExpr::SetOperation { left, right, .. } => {
376 walk_set_expr_for_vector(left) || walk_set_expr_for_vector(right)
377 }
378 _ => false,
379 }
380}
381
382fn mentions_vector_token(text: &str) -> bool {
383 text.split(|c: char| !c.is_alphanumeric() && c != '_')
384 .any(|token| token == "vector")
385}
386
387fn jsonb_cast_misuse(sql: &str) -> bool {
394 const JSONB_COLUMNS: [&str; 2] = ["variant_data", "options"];
395 let lowered = sql.to_ascii_lowercase();
396 let bytes = lowered.as_bytes();
397 let is_ident = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
398
399 for column in JSONB_COLUMNS {
401 let mut start = 0;
402 while let Some(pos) = lowered[start..].find(column) {
403 let begin = start + pos;
404 let end = begin + column.len();
405 start = end;
406 let bounded = (begin == 0 || !is_ident(bytes[begin - 1]))
407 && (end == bytes.len() || !is_ident(bytes[end]));
408 if bounded && lowered[end..].trim_start().starts_with("::") {
409 return true;
410 }
411 }
412 }
413
414 let mut start = 0;
416 while let Some(pos) = lowered[start..].find("cast") {
417 let begin = start + pos;
418 start = begin + 4;
419 if begin > 0 && is_ident(bytes[begin - 1]) {
420 continue;
421 }
422 let Some(open) = lowered[begin + 4..].trim_start().strip_prefix('(') else {
423 continue;
424 };
425 let mut operand = open.trim_start();
426 if let Some(dot) = operand.find('.')
427 && dot > 0
428 && operand.as_bytes()[..dot].iter().all(|b| is_ident(*b))
429 {
430 operand = &operand[dot + 1..];
431 }
432 for column in JSONB_COLUMNS {
433 if let Some(after) = operand.strip_prefix(column)
434 && !after.starts_with(|c: char| c.is_ascii_alphanumeric() || c == '_')
435 && after
436 .trim_start()
437 .strip_prefix("as")
438 .is_some_and(|rest| rest.starts_with(char::is_whitespace))
439 {
440 return true;
441 }
442 }
443 }
444 false
445}
446
447fn jsonb_fulldoc_like_scan(sql: &str) -> bool {
461 const JSONB_COLUMNS: [&str; 2] = ["variant_data", "options"];
462 const NEEDLE: &str = "json_extract";
463 let lowered = sql.to_ascii_lowercase();
464 let bytes = lowered.as_bytes();
465 let is_ident = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
466
467 let mut start = 0;
468 while let Some(pos) = lowered[start..].find(NEEDLE) {
469 let begin = start + pos;
470 start = begin + NEEDLE.len();
471 if begin > 0 && is_ident(bytes[begin - 1]) {
472 continue;
473 }
474 let Some(rest) = lowered[start..].trim_start().strip_prefix('(') else {
475 continue;
476 };
477 let mut operand = rest.trim_start();
478 if let Some(dot) = operand.find('.')
480 && dot > 0
481 && operand.as_bytes()[..dot].iter().all(|b| is_ident(*b))
482 {
483 operand = &operand[dot + 1..];
484 }
485 let Some(col) = JSONB_COLUMNS.into_iter().find(|c| operand.starts_with(c)) else {
486 continue;
487 };
488 let tail = operand[col.len()..].trim_start();
491 let Some(tail) = tail
492 .strip_prefix(',')
493 .map(str::trim_start)
494 .and_then(|t| t.strip_prefix("'$'"))
495 .map(str::trim_start)
496 .and_then(|t| t.strip_prefix(')'))
497 else {
498 continue;
499 };
500 let mut tail = tail.trim_start();
502 while let Some(next) = tail.strip_prefix(')') {
503 tail = next.trim_start();
504 }
505 if let Some(next) = tail.strip_prefix("not")
506 && next.starts_with(char::is_whitespace)
507 {
508 tail = next.trim_start();
509 }
510 for op in ["like", "ilike"] {
511 if let Some(next) = tail.strip_prefix(op)
512 && next.starts_with(char::is_whitespace)
513 && next.trim_start().starts_with("'%")
514 {
515 return true;
516 }
517 }
518 }
519 false
520}
521
522fn build_context() -> Result<SessionContext, SqlError> {
523 let runtime = RuntimeEnvBuilder::new()
524 .with_memory_limit(MEM_LIMIT_BYTES, 1.0)
525 .build_arc()
526 .map_err(|error| SqlError::Infra(anyhow!("datafusion runtime init failed: {error}")))?;
527 let state = SessionStateBuilder::new()
530 .with_config(SessionConfig::new().with_information_schema(true))
531 .with_runtime_env(runtime)
532 .with_default_features()
533 .build();
534 Ok(SessionContext::new_with_state(state))
535}
536
537fn renamed_key(table: &str) -> Option<&'static str> {
542 match table {
543 "messages" => Some("message_id"),
544 "sessions" => Some("session_id"),
545 _ => None,
546 }
547}
548
549fn register(ctx: &SessionContext, tables: &Tables) -> Result<(), SqlError> {
550 for (name, dataset) in [
551 ("sessions", &tables.sessions),
552 ("messages", &tables.messages),
553 ] {
554 let provider = LanceTableProvider::new(dataset.clone(), false, false);
559 let key = renamed_key(name).unwrap_or("id");
560 let view = renamed_view(name, Arc::new(provider), "id", key)
561 .map_err(|error| SqlError::Infra(anyhow!("build {name} view: {error}")))?;
562 ctx.register_table(name, Arc::new(view))
563 .map_err(|error| SqlError::Infra(anyhow!("register table {name}: {error}")))?;
564 }
565 let provider = LanceTableProvider::new(tables.parts.clone(), false, false);
570 let keep: Vec<_> = tables
571 .parts
572 .schema()
573 .fields
574 .iter()
575 .filter(|field| field.name != "data")
576 .map(|field| col(field.name.as_str()))
577 .collect();
578 let plan = LogicalPlanBuilder::scan("parts", provider_as_source(Arc::new(provider)), None)
579 .and_then(|builder| builder.project(keep))
580 .and_then(LogicalPlanBuilder::build)
581 .map_err(|error| SqlError::Infra(anyhow!("build parts view: {error}")))?;
582 ctx.register_table("parts", Arc::new(ViewTable::new(plan, None)))
583 .map_err(|error| SqlError::Infra(anyhow!("register table parts: {error}")))?;
584 let fts = ScoredFtsUdtf {
588 datasets: HashMap::from([
589 ("sessions".to_owned(), tables.sessions.clone()),
590 ("messages".to_owned(), tables.messages.clone()),
591 ("parts".to_owned(), tables.parts.clone()),
592 ]),
593 };
594 ctx.register_udtf("fts", Arc::new(fts));
595 register_functions(ctx);
596 for udf in lenient_json_udfs() {
600 ctx.register_udf(udf);
601 }
602 if let Some(first_value) = ctx.state().aggregate_functions().get("first_value") {
607 ctx.register_udaf(first_value.as_ref().clone().with_aliases(["any_value"]));
608 }
609 ctx.register_udf(ScalarUDF::new_from_impl(FtsMisuse::new()));
615 Ok(())
616}
617
618fn renamed_view(
622 scan_name: &str,
623 provider: Arc<dyn TableProvider>,
624 from: &str,
625 to: &str,
626) -> Result<ViewTable, DataFusionError> {
627 let projection: Vec<_> = provider
628 .schema()
629 .fields()
630 .iter()
631 .map(|field| {
632 let column = col(field.name().as_str());
633 if field.name() == from {
634 column.alias(to)
635 } else {
636 column
637 }
638 })
639 .collect();
640 let plan = LogicalPlanBuilder::scan(scan_name, provider_as_source(provider), None)?
641 .project(projection)?
642 .build()?;
643 Ok(ViewTable::new(plan, None))
644}
645
646const FTS_MISUSE: &str = "fts is a table function and goes in FROM, not in WHERE or the \
647 projection. For filtering use WHERE contains_tokens(search_text, 'word1 word2') (all \
648 words must match; index-accelerated). For ranked results: SELECT m.message_id, f._score \
649 FROM fts('messages', '{\"match\":{\"column\":\"search_text\",\"terms\":\"...\"}}') f \
650 JOIN messages m ON m.message_id = f.message_id ORDER BY f._score DESC.";
651
652#[derive(Debug, PartialEq, Eq, Hash)]
654struct FtsMisuse {
655 signature: Signature,
656}
657
658impl FtsMisuse {
659 fn new() -> Self {
660 Self {
661 signature: Signature::variadic_any(Volatility::Immutable),
662 }
663 }
664}
665
666impl ScalarUDFImpl for FtsMisuse {
667 fn as_any(&self) -> &dyn std::any::Any {
668 self
669 }
670
671 fn name(&self) -> &str {
672 "fts"
673 }
674
675 fn signature(&self) -> &Signature {
676 &self.signature
677 }
678
679 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
680 Err(DataFusionError::Plan(FTS_MISUSE.to_owned()))
681 }
682
683 fn invoke_with_args(
684 &self,
685 _args: ScalarFunctionArgs,
686 ) -> Result<ColumnarValue, DataFusionError> {
687 Err(DataFusionError::Plan(FTS_MISUSE.to_owned()))
688 }
689}
690
691#[derive(Debug)]
703struct ScoredFtsUdtf {
704 datasets: HashMap<String, Arc<Dataset>>,
705}
706
707impl TableFunctionImpl for ScoredFtsUdtf {
708 fn call(
709 &self,
710 expr: &[Expr],
711 ) -> Result<Arc<dyn TableProvider>, lance::deps::datafusion::error::DataFusionError> {
712 let [table_expr, query_expr] = expr else {
713 return Err(DataFusionError::Execution(
714 "fts() takes (table_name, fts_query_json)".to_owned(),
715 ));
716 };
717 let Expr::Literal(ScalarValue::Utf8(Some(table_name)), _) = table_expr else {
718 return Err(DataFusionError::Execution(
719 "fts() first argument must be a table name string".to_owned(),
720 ));
721 };
722 let Expr::Literal(ScalarValue::Utf8(Some(fts_query)), _) = query_expr else {
723 return Err(DataFusionError::Execution(
724 "fts() second argument must be the fts query as a JSON string".to_owned(),
725 ));
726 };
727 let dataset = self.datasets.get(table_name).ok_or_else(|| {
728 DataFusionError::Execution(format!("fts(): table {table_name} not found"))
729 })?;
730 let mut full_schema = Schema::from(dataset.schema());
731 full_schema = full_schema
732 .try_with_column(Field::new(SCORE_COLUMN, DataType::Float32, true))
733 .map_err(|error| DataFusionError::ArrowError(Box::new(error), None))?;
734 let provider: Arc<dyn TableProvider> = Arc::new(ScoredFtsProvider {
735 dataset: dataset.clone(),
736 fts_query: FullTextSearchQuery::new_query(from_json(fts_query)?),
737 full_schema: Arc::new(full_schema),
738 });
739 match renamed_key(table_name) {
742 Some(key) => Ok(Arc::new(renamed_view("fts", provider, "id", key)?)),
743 None => Ok(provider),
744 }
745 }
746}
747
748const SCORE_COLUMN: &str = "_score";
749
750#[derive(Debug)]
751struct ScoredFtsProvider {
752 dataset: Arc<Dataset>,
753 fts_query: FullTextSearchQuery,
754 full_schema: SchemaRef,
755}
756
757#[async_trait::async_trait]
758impl TableProvider for ScoredFtsProvider {
759 fn as_any(&self) -> &dyn std::any::Any {
760 self
761 }
762
763 fn schema(&self) -> SchemaRef {
764 self.full_schema.clone()
765 }
766
767 fn table_type(&self) -> TableType {
768 TableType::Temporary
769 }
770
771 async fn scan(
772 &self,
773 _state: &dyn Session,
774 projection: Option<&Vec<usize>>,
775 filters: &[Expr],
776 limit: Option<usize>,
777 ) -> Result<Arc<dyn ExecutionPlan>, lance::deps::datafusion::error::DataFusionError> {
778 let mut scan = self.dataset.scan();
779 scan.full_text_search(self.fts_query.clone())?;
780 scan.disable_scoring_autoprojection();
784 match projection {
785 Some(projection) if projection.is_empty() => {
786 scan.empty_project()?;
787 }
788 Some(projection) => {
789 let columns: Vec<&str> = projection
790 .iter()
791 .map(|idx| self.full_schema.field(*idx).name().as_str())
792 .collect();
793 scan.project(&columns)?;
794 }
795 None => {
796 let columns: Vec<&str> = self
797 .full_schema
798 .fields()
799 .iter()
800 .map(|field| field.name().as_str())
801 .collect();
802 scan.project(&columns)?;
803 }
804 }
805 if let Some(combined) = filters
806 .iter()
807 .cloned()
808 .reduce(|left, right| left.and(right))
809 {
810 scan.filter_expr(combined);
811 }
812 scan.limit(limit.map(|l| l as i64), None)?;
813 scan.create_plan().await.map_err(DataFusionError::from)
814 }
815}
816
817#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
819enum JsonGet {
820 Text,
821 Int,
822 Float,
823 Bool,
824}
825
826const MAX_JSON_KEYS: usize = 6;
829
830fn lenient_json_udfs() -> [ScalarUDF; 4] {
840 let make = |name: &'static str, kind: JsonGet, return_type: DataType| {
841 ScalarUDF::new_from_impl(LenientJsonGet {
842 name,
843 kind,
844 return_type,
845 signature: json_key_path_signature(),
846 })
847 };
848 [
849 make("json_get_string", JsonGet::Text, DataType::Utf8),
850 make("json_get_int", JsonGet::Int, DataType::Int64),
851 make("json_get_float", JsonGet::Float, DataType::Float64),
852 make("json_get_bool", JsonGet::Bool, DataType::Boolean),
853 ]
854}
855
856fn json_key_path_signature() -> Signature {
858 let arities = (1..=MAX_JSON_KEYS)
859 .map(|keys| {
860 let mut types = vec![DataType::LargeBinary];
861 types.extend(std::iter::repeat_n(DataType::Utf8, keys));
862 TypeSignature::Exact(types)
863 })
864 .collect();
865 Signature::one_of(arities, Volatility::Immutable)
866}
867
868#[derive(Debug, PartialEq, Eq, Hash)]
870struct LenientJsonGet {
871 name: &'static str,
872 kind: JsonGet,
873 return_type: DataType,
874 signature: Signature,
875}
876
877impl ScalarUDFImpl for LenientJsonGet {
878 fn as_any(&self) -> &dyn std::any::Any {
879 self
880 }
881
882 fn name(&self) -> &str {
883 self.name
884 }
885
886 fn signature(&self) -> &Signature {
887 &self.signature
888 }
889
890 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
891 Ok(self.return_type.clone())
892 }
893
894 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue, DataFusionError> {
895 json_get_lenient(&args.args, &self.kind)
896 }
897}
898
899fn json_step(raw: jsonb::RawJsonb<'_>, key: &str) -> Option<jsonb::OwnedJsonb> {
901 let value = if raw.is_object().unwrap_or(false) {
902 raw.get_by_name(key, false).ok().flatten()
903 } else if raw.is_array().unwrap_or(false) {
904 key.parse::<usize>()
905 .ok()
906 .and_then(|index| raw.get_by_index(index).ok().flatten())
907 } else {
908 None
909 };
910 value.filter(|value| !value.as_raw().is_null().unwrap_or(false))
911}
912
913fn json_get_lenient(
914 args: &[ColumnarValue],
915 kind: &JsonGet,
916) -> Result<ColumnarValue, DataFusionError> {
917 let arrays = ColumnarValue::values_to_arrays(args)?;
918 let Some((jsonb_arg, key_args)) = arrays.split_first().filter(|(_, keys)| !keys.is_empty())
919 else {
920 return Err(DataFusionError::Execution(
921 "json_get_* takes (json_column, 'key', ...) - at least one key".to_owned(),
922 ));
923 };
924 let jsonb_array = jsonb_arg
925 .as_any()
926 .downcast_ref::<LargeBinaryArray>()
927 .ok_or_else(|| {
928 DataFusionError::Execution(
929 "json_get_* argument 1 must be a JSON column (variant_data, options)".to_owned(),
930 )
931 })?;
932 let key_arrays: Vec<&StringArray> = key_args
933 .iter()
934 .map(|key_arg| {
935 key_arg
936 .as_any()
937 .downcast_ref::<StringArray>()
938 .ok_or_else(|| {
939 DataFusionError::Execution("json_get_* keys must be string literals".to_owned())
940 })
941 })
942 .collect::<Result<_, _>>()?;
943
944 let field = |row: usize| -> Option<jsonb::OwnedJsonb> {
945 if jsonb_array.is_null(row) {
946 return None;
947 }
948 let mut keys = key_arrays.iter();
949 let first = keys.next()?;
950 if first.is_null(row) {
951 return None;
952 }
953 let mut current = json_step(
954 jsonb::RawJsonb::new(jsonb_array.value(row)),
955 first.value(row),
956 )?;
957 for key_array in keys {
958 if key_array.is_null(row) {
959 return None;
960 }
961 current = json_step(current.as_raw(), key_array.value(row))?;
962 }
963 Some(current)
964 };
965
966 let rows = jsonb_array.len();
967 let array: Arc<dyn Array> = match kind {
968 JsonGet::Text => {
969 let mut builder = StringBuilder::with_capacity(rows, 1024);
970 for row in 0..rows {
971 match field(row) {
972 Some(value) => match value.as_raw().to_str() {
975 Ok(text) => builder.append_value(text),
976 Err(_) => builder.append_value(value.to_string()),
977 },
978 None => builder.append_null(),
979 }
980 }
981 Arc::new(builder.finish())
982 }
983 JsonGet::Int => {
984 let mut builder = Int64Builder::with_capacity(rows);
985 for row in 0..rows {
986 builder.append_option(field(row).and_then(|value| value.as_raw().to_i64().ok()));
987 }
988 Arc::new(builder.finish())
989 }
990 JsonGet::Float => {
991 let mut builder = Float64Builder::with_capacity(rows);
992 for row in 0..rows {
993 builder.append_option(field(row).and_then(|value| value.as_raw().to_f64().ok()));
994 }
995 Arc::new(builder.finish())
996 }
997 JsonGet::Bool => {
998 let mut builder = BooleanBuilder::with_capacity(rows);
999 for row in 0..rows {
1000 builder.append_option(field(row).and_then(|value| value.as_raw().to_bool().ok()));
1001 }
1002 Arc::new(builder.finish())
1003 }
1004 };
1005 Ok(ColumnarValue::Array(array))
1006}
1007
1008fn enrich(message: &str) -> String {
1012 const HINTS: &[(&str, &str)] = &[
1013 (
1014 "No field named",
1015 "columns are messages(session_id, message_id, timestamp, role, source_agent, \
1016 project, content [system-role only], search_text [the conversational text], \
1017 embedding_model, options) | sessions(session_id, parent_session_id, \
1018 parent_message_id, source_agent, created_at, project, options) | \
1019 parts(session_id, message_id, id, ordinal, type, provenance, variant_data, \
1020 options). Part bodies (tool params/results, text) live in parts.variant_data - \
1021 read them with json_extract(variant_data, '$.field'). For text search use \
1022 contains_tokens(search_text, '...') in WHERE, or the fts('messages', ...) \
1023 table function in FROM for ranked results; to read a transcript use pond_get. \
1024 Full doc: resource schema://pond-sql.",
1025 ),
1026 (
1027 "Encountered non UTF-8 data",
1028 "JSON columns (variant_data, options) are binary JSONB - CAST / ::text does not \
1029 work on them. Stringify the whole value with json_extract(col, '$'), or fetch \
1030 one field with json_extract(col, '$.field').",
1031 ),
1032 (
1033 "Resources exhausted",
1034 "the query ran out of memory - usually from carrying whole JSON columns \
1035 (variant_data, options) through a join or sort. Project narrow fields with \
1036 json_extract(col, '$.field') instead of whole columns, filter before joining, \
1037 or export the full set with format=parquet.",
1038 ),
1039 (
1040 "LIKE prefix queries are not supported for bitmap indexes",
1041 "prefix LIKE ('x%') and starts_with() fail on bitmap-indexed columns \
1042 (messages.source_agent, messages.role). Use equality, \
1043 split_part(source_agent, '/', 1) = '...', or an infix pattern (LIKE '%x%').",
1044 ),
1045 (
1046 "call to 'json_",
1047 "JSON function signatures: json_get_string|json_get_int|json_get_float|\
1048 json_get_bool(col, 'key', ...) walk a key path (array steps by numeric \
1049 index); json_get(col, 'key') returns JSONB for chaining; json_extract(col, \
1050 '$.a.b') takes a JSONPath and returns JSON text of any value (the right tool \
1051 for deeply nested or mixed-type fields).",
1052 ),
1053 (
1054 "Invalid function 'json",
1055 "available JSON functions: json_get_string, json_get_int, json_get_float, \
1056 json_get_bool (col, 'key', ...); json_get(col, 'key') -> JSONB for chaining; \
1057 json_extract(col, '$.a.b') -> JSON text; json_array_contains; \
1058 json_array_length. See resource schema://pond-sql.",
1059 ),
1060 (
1061 "does not satisfy distribution requirements",
1066 "this fts query shape planned an unexecutable join. For AND semantics use a \
1067 single match query with operator And: fts('messages', \
1068 '{\"match\":{\"column\":\"search_text\",\"terms\":\"a b\",\"operator\":\"And\"}}'), \
1069 optionally with LIKE post-filters in WHERE.",
1070 ),
1071 (
1072 "position is not found but required for phrase queries",
1073 "the full-text index is built without positions, so \"phrase\" queries are \
1074 unavailable. Use a match query with operator And plus LIKE post-filters for \
1075 exact-substring matching.",
1076 ),
1077 ];
1078 for (pattern, hint) in HINTS {
1079 if message.contains(pattern) {
1080 return format!("{message}\nhint: {hint}");
1081 }
1082 }
1083 message.to_owned()
1084}
1085
1086fn displayable(batch: &RecordBatch) -> Result<RecordBatch, ArrowError> {
1089 let decoded = lance_arrow::json::convert_lance_json_to_arrow(batch)?;
1090 let keep: Vec<usize> = decoded
1091 .schema()
1092 .fields()
1093 .iter()
1094 .enumerate()
1095 .filter(|(_, field)| is_displayable(field.data_type()))
1096 .map(|(index, _)| index)
1097 .collect();
1098 decoded.project(&keep)
1099}
1100
1101fn is_displayable(data_type: &DataType) -> bool {
1102 !matches!(
1103 data_type,
1104 DataType::FixedSizeList(_, _)
1105 | DataType::Binary
1106 | DataType::LargeBinary
1107 | DataType::BinaryView
1108 | DataType::FixedSizeBinary(_)
1109 )
1110}
1111
1112fn collapse_newlines(batches: &[RecordBatch]) -> Result<Vec<RecordBatch>, ArrowError> {
1118 fn escape<O: OffsetSizeTrait>(array: &GenericStringArray<O>) -> ArrayRef {
1119 let escaped: GenericStringArray<O> =
1120 array.iter().map(|value| value.map(escape_cell)).collect();
1121 Arc::new(escaped)
1122 }
1123 fn escape_cell(text: &str) -> std::borrow::Cow<'_, str> {
1124 if text.contains(['\n', '\r']) {
1125 std::borrow::Cow::Owned(text.replace("\r\n", "\\n").replace(['\n', '\r'], "\\n"))
1126 } else {
1127 std::borrow::Cow::Borrowed(text)
1128 }
1129 }
1130 batches
1131 .iter()
1132 .map(|batch| {
1133 let columns: Vec<ArrayRef> = batch
1134 .columns()
1135 .iter()
1136 .map(|array| match array.data_type() {
1137 DataType::Utf8 => array
1138 .as_any()
1139 .downcast_ref::<StringArray>()
1140 .map_or_else(|| array.clone(), escape),
1141 DataType::LargeUtf8 => array
1142 .as_any()
1143 .downcast_ref::<GenericStringArray<i64>>()
1144 .map_or_else(|| array.clone(), escape),
1145 DataType::Utf8View => array
1146 .as_any()
1147 .downcast_ref::<StringViewArray>()
1148 .map_or_else(
1149 || array.clone(),
1150 |view| {
1151 let escaped: StringViewArray =
1152 view.iter().map(|value| value.map(escape_cell)).collect();
1153 Arc::new(escaped)
1154 },
1155 ),
1156 _ => array.clone(),
1157 })
1158 .collect();
1159 RecordBatch::try_new(batch.schema(), columns)
1160 })
1161 .collect()
1162}
1163
1164fn render_inline(
1165 display: &[RecordBatch],
1166 max_rows: usize,
1167 elapsed: Duration,
1168) -> Result<String, ArrowError> {
1169 let total: usize = display.iter().map(RecordBatch::num_rows).sum();
1170 let elapsed_ms = elapsed.as_millis();
1171 if total == 0 {
1172 return Ok(format!(
1174 "0 rows ({elapsed_ms} ms).\n{}",
1175 pretty_format_batches(display)?
1176 ));
1177 }
1178 let render = |shown: usize| -> Result<String, ArrowError> {
1179 let limited = collapse_newlines(&limit_batches(display, shown))?;
1180 Ok(pretty_format_batches(&limited)?.to_string())
1181 };
1182 let mut shown = total.min(max_rows);
1183 let mut table = render(shown)?;
1184 while table.len() > INLINE_BUDGET_BYTES && shown > 1 {
1185 shown = (shown / 2).max(1);
1186 table = render(shown)?;
1187 }
1188 let mut out = format!("{total} row(s) in {elapsed_ms} ms; showing {shown}.\n{table}");
1189 if shown < total {
1190 out.push_str(&format!(
1191 "\n... {} row(s) omitted. To page: ORDER BY <indexed col> (e.g. timestamp, \
1192 message_id), then in the next call add `WHERE (col, message_id) < \
1193 (<last_col>, <last_message_id>)` - keyset pagination, see schema://pond-sql. \
1194 For the full set: format=parquet or format=ndjson.",
1195 total - shown
1196 ));
1197 }
1198 Ok(out)
1199}
1200
1201fn render_inline_json(
1206 display: &[RecordBatch],
1207 max_rows: usize,
1208 elapsed: Duration,
1209) -> Result<JsonValue, SqlError> {
1210 let total: usize = display.iter().map(RecordBatch::num_rows).sum();
1211 let columns: Vec<String> = display
1212 .first()
1213 .map(|batch| {
1214 batch
1215 .schema()
1216 .fields()
1217 .iter()
1218 .map(|field| field.name().clone())
1219 .collect()
1220 })
1221 .unwrap_or_default();
1222 let elapsed_ms = u64::try_from(elapsed.as_millis()).unwrap_or(u64::MAX);
1223
1224 if total == 0 {
1225 return Ok(json!({
1226 "total_rows": 0,
1227 "shown_rows": 0,
1228 "truncated": false,
1229 "elapsed_ms": elapsed_ms,
1230 "columns": columns,
1231 "rows": [],
1232 }));
1233 }
1234
1235 let mut shown = total.min(max_rows);
1236 let mut rows = batches_to_json_rows(&limit_batches(display, shown))?;
1237 let mut serialized = serde_json::to_string(&rows)
1238 .map_err(|error| SqlError::Infra(anyhow!("json serialize: {error}")))?;
1239 while serialized.len() > INLINE_BUDGET_BYTES && shown > 1 {
1240 shown = (shown / 2).max(1);
1241 rows = batches_to_json_rows(&limit_batches(display, shown))?;
1242 serialized = serde_json::to_string(&rows)
1243 .map_err(|error| SqlError::Infra(anyhow!("json serialize: {error}")))?;
1244 }
1245
1246 let mut payload = JsonMap::new();
1247 payload.insert("total_rows".to_owned(), json!(total));
1248 payload.insert("shown_rows".to_owned(), json!(shown));
1249 payload.insert("truncated".to_owned(), json!(shown < total));
1250 payload.insert("elapsed_ms".to_owned(), json!(elapsed_ms));
1251 payload.insert("columns".to_owned(), json!(columns));
1252 payload.insert("rows".to_owned(), JsonValue::Array(rows));
1253 if shown < total {
1254 payload.insert(
1255 "next_steps".to_owned(),
1256 json!(format!(
1257 "{} row(s) omitted; ORDER BY + keyset (`WHERE (col, message_id) < \
1258 (<last_col>, <last_message_id>)`) to page, or format=parquet|ndjson for \
1259 the full set. See schema://pond-sql.",
1260 total - shown
1261 )),
1262 );
1263 }
1264 Ok(JsonValue::Object(payload))
1265}
1266
1267fn batches_to_json_rows(batches: &[RecordBatch]) -> Result<Vec<JsonValue>, SqlError> {
1271 if batches.iter().all(|batch| batch.num_rows() == 0) {
1272 return Ok(Vec::new());
1273 }
1274 let mut buffer = Vec::new();
1275 {
1276 let mut writer = LineDelimitedWriter::new(&mut buffer);
1277 let refs: Vec<&RecordBatch> = batches.iter().collect();
1278 writer
1279 .write_batches(&refs)
1280 .map_err(|error| SqlError::Infra(anyhow!("ndjson encode: {error}")))?;
1281 writer
1282 .finish()
1283 .map_err(|error| SqlError::Infra(anyhow!("ndjson finish: {error}")))?;
1284 }
1285 let text = String::from_utf8(buffer)
1286 .map_err(|error| SqlError::Infra(anyhow!("ndjson not utf-8: {error}")))?;
1287 text.lines()
1288 .filter(|line| !line.is_empty())
1289 .map(|line| {
1290 serde_json::from_str::<JsonValue>(line)
1291 .map_err(|error| SqlError::Infra(anyhow!("ndjson parse: {error}")))
1292 })
1293 .collect()
1294}
1295
1296fn limit_batches(batches: &[RecordBatch], max_rows: usize) -> Vec<RecordBatch> {
1297 let mut out = Vec::new();
1298 let mut remaining = max_rows;
1299 for batch in batches {
1300 if remaining == 0 {
1301 break;
1302 }
1303 if batch.num_rows() <= remaining {
1304 remaining -= batch.num_rows();
1305 out.push(batch.clone());
1306 } else {
1307 out.push(batch.slice(0, remaining));
1308 remaining = 0;
1309 }
1310 }
1311 out
1312}
1313
1314fn encode_parquet(batches: &[RecordBatch]) -> Result<Vec<u8>, SqlError> {
1315 let schema = batches
1316 .first()
1317 .map(RecordBatch::schema)
1318 .ok_or_else(|| SqlError::Query("query returned no columns to export".to_owned()))?;
1319 let mut buffer = Vec::new();
1320 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None)
1321 .map_err(|error| SqlError::Infra(anyhow!("parquet init failed: {error}")))?;
1322 for batch in batches {
1323 writer
1324 .write(batch)
1325 .map_err(|error| SqlError::Infra(anyhow!("parquet write failed: {error}")))?;
1326 }
1327 writer
1328 .close()
1329 .map_err(|error| SqlError::Infra(anyhow!("parquet close failed: {error}")))?;
1330 Ok(buffer)
1331}
1332
1333fn encode_ndjson(batches: &[RecordBatch]) -> Result<Vec<u8>, SqlError> {
1334 let mut buffer = Vec::new();
1335 {
1336 let mut writer = LineDelimitedWriter::new(&mut buffer);
1337 let refs: Vec<&RecordBatch> = batches.iter().collect();
1338 writer
1339 .write_batches(&refs)
1340 .map_err(|error| SqlError::Infra(anyhow!("ndjson write failed: {error}")))?;
1341 writer
1342 .finish()
1343 .map_err(|error| SqlError::Infra(anyhow!("ndjson finish failed: {error}")))?;
1344 }
1345 Ok(buffer)
1346}
1347
1348#[cfg(test)]
1349mod tests {
1350 #![allow(clippy::expect_used)]
1351
1352 use super::*;
1353
1354 fn rejected(sql: &str) -> bool {
1355 matches!(parse_and_gate(sql), Err(SqlError::Query(_)))
1356 }
1357
1358 fn parses_as(sql: &str, expected: StatementKind) -> bool {
1359 match parse_and_gate(sql) {
1360 Ok(parsed) => matches!(
1361 (&parsed.kind, &expected),
1362 (StatementKind::Query, StatementKind::Query)
1363 | (StatementKind::Explain, StatementKind::Explain)
1364 ),
1365 Err(_) => false,
1366 }
1367 }
1368
1369 #[test]
1370 fn allows_single_select_and_cte() {
1371 assert!(parses_as("SELECT 1", StatementKind::Query));
1372 assert!(parses_as(
1373 "SELECT role, count(*) FROM messages GROUP BY role",
1374 StatementKind::Query
1375 ));
1376 assert!(parses_as(
1377 "WITH t AS (SELECT 1 AS a) SELECT a FROM t",
1378 StatementKind::Query
1379 ));
1380 }
1381
1382 #[test]
1383 fn allows_explain_of_select() {
1384 assert!(parses_as("EXPLAIN SELECT 1", StatementKind::Explain));
1385 assert!(parses_as(
1386 "EXPLAIN ANALYZE SELECT role FROM messages",
1387 StatementKind::Explain
1388 ));
1389 }
1390
1391 #[test]
1392 fn rejects_explain_of_non_query() {
1393 assert!(rejected("EXPLAIN INSERT INTO messages VALUES ('x')"));
1396 }
1397
1398 #[test]
1399 fn rejects_writes_and_side_effects() {
1400 assert!(rejected("INSERT INTO messages VALUES ('x')"));
1401 assert!(rejected("UPDATE messages SET role = 'x'"));
1402 assert!(rejected("DELETE FROM messages"));
1403 assert!(rejected("CREATE TABLE t (x INT)"));
1404 assert!(rejected("CREATE VIEW v AS SELECT 1"));
1405 assert!(rejected("DROP TABLE messages"));
1406 assert!(rejected(
1407 "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION '/etc'"
1408 ));
1409 assert!(rejected("COPY (SELECT 1) TO '/tmp/x.parquet'"));
1410 assert!(rejected("SET a = 1"));
1411 }
1412
1413 #[test]
1414 fn rejects_multiple_statements() {
1415 assert!(rejected("SELECT 1; SELECT 2"));
1416 assert!(rejected("SELECT 1; DROP TABLE messages"));
1417 }
1418
1419 #[test]
1420 fn rejects_unparseable() {
1421 assert!(rejected("NOT SQL AT ALL ;;"));
1422 }
1423
1424 fn mentions_vector(sql: &str) -> bool {
1425 match parse_and_gate(sql) {
1426 Ok(parsed) => projection_mentions_vector(parsed.projection_query()),
1427 Err(_) => false,
1428 }
1429 }
1430
1431 #[test]
1432 fn explicit_vector_projection_is_rejected() {
1433 assert!(mentions_vector("SELECT vector FROM messages"));
1434 assert!(mentions_vector("SELECT id, vector FROM messages"));
1435 assert!(mentions_vector("SELECT m.vector FROM messages m"));
1436 assert!(mentions_vector("SELECT array_length(vector) FROM messages"));
1437 assert!(mentions_vector("EXPLAIN SELECT vector FROM messages"));
1438 }
1439
1440 #[test]
1441 fn enrich_appends_recovery_hints() {
1442 let cases = [
1444 (
1445 "SQL error: Schema error: No field named created_at.",
1446 "schema://pond-sql",
1447 ),
1448 (
1449 "SQL error: External error: Arrow error: Invalid argument error: \
1450 Encountered non UTF-8 data",
1451 "json_extract",
1452 ),
1453 (
1454 "SQL error: External error: Not supported: LIKE prefix queries are not \
1455 supported for bitmap indexes",
1456 "split_part",
1457 ),
1458 (
1459 "SQL error: Error during planning: Failed to coerce arguments to satisfy \
1460 a call to 'json_get_string' function",
1461 "JSONPath",
1462 ),
1463 (
1464 "SQL error: Error during planning: Invalid function 'json_get_json'.",
1465 "json_extract",
1466 ),
1467 (
1468 "SQL error: Resources exhausted: Additional allocation failed for \
1469 HashJoinInput[0] with top memory consumers",
1470 "json_extract",
1471 ),
1472 ];
1473 for (raw, marker) in cases {
1474 let enriched = enrich(raw);
1475 assert!(enriched.starts_with(raw), "original kept: {enriched}");
1476 assert!(enriched.contains("hint:"), "hint appended: {enriched}");
1477 assert!(enriched.contains(marker), "hint names the fix: {enriched}");
1478 }
1479 assert_eq!(
1481 enrich("SQL error: division by zero"),
1482 "SQL error: division by zero"
1483 );
1484 }
1485
1486 #[test]
1487 fn select_star_and_where_vector_are_allowed() {
1488 assert!(!mentions_vector("SELECT * FROM messages"));
1490 assert!(!mentions_vector(
1492 "SELECT message_id FROM messages WHERE vector IS NOT NULL"
1493 ));
1494 }
1495
1496 #[test]
1497 fn jsonb_cast_misuse_detects_cast_and_coloncolon() {
1498 for sql in [
1499 "SELECT CAST(variant_data AS VARCHAR) FROM parts",
1500 "SELECT cast(p.variant_data as text) FROM parts p",
1501 "SELECT variant_data::text FROM parts",
1502 "SELECT p.variant_data :: varchar FROM parts p",
1503 "SELECT options::text FROM messages",
1504 "SELECT lower(CAST(variant_data AS VARCHAR)) FROM parts",
1505 ] {
1506 assert!(jsonb_cast_misuse(sql), "should reject: {sql}");
1507 }
1508 }
1509
1510 #[test]
1511 fn jsonb_cast_misuse_allows_legitimate_use() {
1512 for sql in [
1513 "SELECT json_extract(variant_data, '$') FROM parts",
1514 "SELECT json_get_string(variant_data, 'name') FROM parts",
1515 "SELECT CAST(ordinal AS BIGINT) FROM parts",
1516 "SELECT timestamp::date FROM messages",
1517 "SELECT my_options::text FROM t",
1519 "SELECT CAST(json_extract(variant_data, '$.x') AS BIGINT) FROM parts",
1520 ] {
1521 assert!(!jsonb_cast_misuse(sql), "should allow: {sql}");
1522 }
1523 }
1524
1525 #[test]
1526 fn jsonb_fulldoc_like_scan_detects_whole_document_substring() {
1527 for sql in [
1528 "SELECT * FROM parts WHERE json_extract(variant_data, '$') LIKE '%needle%'",
1529 "SELECT * FROM parts p WHERE lower(json_extract(p.variant_data, '$')) LIKE '%x%'",
1530 "SELECT * FROM messages WHERE json_extract(options, '$') ILIKE '%y%'",
1531 "SELECT * FROM parts WHERE json_extract(variant_data,'$') NOT LIKE '%z%'",
1532 "SELECT p.message_id FROM parts p JOIN messages m ON p.message_id = m.message_id \
1534 WHERE m.timestamp >= '2026-06-11' AND lower(json_extract(p.variant_data, '$')) \
1535 LIKE '%weekly limit%'",
1536 ] {
1537 assert!(jsonb_fulldoc_like_scan(sql), "should reject: {sql}");
1538 }
1539 }
1540
1541 #[test]
1542 fn jsonb_fulldoc_like_scan_allows_targeted_and_nonleading() {
1543 for sql in [
1544 "SELECT * FROM parts WHERE json_extract(variant_data, '$.name') LIKE '%x%'",
1546 "SELECT * FROM parts WHERE json_extract(variant_data, '$') LIKE 'pre%'",
1548 "SELECT * FROM messages WHERE search_text LIKE '%x%'",
1550 "SELECT * FROM messages WHERE contains_tokens(search_text, 'x')",
1552 "SELECT json_extract(variant_data, '$') FROM parts LIMIT 1",
1554 ] {
1555 assert!(!jsonb_fulldoc_like_scan(sql), "should allow: {sql}");
1556 }
1557 }
1558
1559 #[test]
1560 fn render_inline_collapses_newlines_in_cells() {
1561 let schema = Arc::new(Schema::new(vec![Field::new("t", DataType::Utf8, true)]));
1562 let batch = RecordBatch::try_new(
1563 schema,
1564 vec![Arc::new(StringArray::from(vec![Some(
1565 "line one\nline two\r\nline three",
1566 )]))],
1567 )
1568 .expect("single-column batch");
1569 let out = render_inline(&[batch], 10, Duration::from_millis(1)).expect("render succeeds");
1570 assert!(
1571 out.contains("line one\\nline two\\nline three"),
1572 "newlines collapse to literal \\n: {out}"
1573 );
1574 let row_lines: Vec<&str> = out
1577 .lines()
1578 .filter(|line| line.contains("line one"))
1579 .collect();
1580 assert_eq!(row_lines.len(), 1, "one physical line per row: {out}");
1581 }
1582}