1use std::collections::HashMap;
12use std::sync::Arc;
13use std::time::{Duration, Instant};
14
15use anyhow::anyhow;
16use arrow_json::LineDelimitedWriter;
17use lance::Dataset;
18use lance::datafusion::LanceTableProvider;
19use lance::deps::arrow_array::builder::{
20 BooleanBuilder, Float64Builder, Int64Builder, StringBuilder,
21};
22use lance::deps::arrow_array::{
23 Array, ArrayRef, GenericStringArray, LargeBinaryArray, OffsetSizeTrait, RecordBatch,
24 StringArray, StringViewArray,
25};
26use lance::deps::arrow_schema::{ArrowError, DataType, Field, Schema, SchemaRef};
27use lance::deps::datafusion::arrow::util::pretty::pretty_format_batches;
28use lance::deps::datafusion::catalog::{Session, TableFunctionImpl, TableProvider};
29use lance::deps::datafusion::common::ScalarValue;
30use lance::deps::datafusion::datasource::{ViewTable, provider_as_source};
31use lance::deps::datafusion::error::DataFusionError;
32use lance::deps::datafusion::execution::SessionStateBuilder;
33use lance::deps::datafusion::execution::runtime_env::RuntimeEnvBuilder;
34use lance::deps::datafusion::logical_expr::{
35 ColumnarValue, LogicalPlanBuilder, ScalarFunctionArgs, ScalarUDF, ScalarUDFImpl, Signature,
36 TypeSignature, Volatility,
37};
38use lance::deps::datafusion::logical_expr::{Expr, TableType};
39use lance::deps::datafusion::physical_plan::ExecutionPlan;
40use lance::deps::datafusion::prelude::{SQLOptions, SessionConfig, SessionContext, col};
41use lance::deps::datafusion::sql::parser::{DFParser, Statement as DfStatement};
42use lance::deps::datafusion::sql::sqlparser::ast::{SetExpr, Statement as SqlStatement};
43use lance_arrow::SchemaExt;
44use lance_datafusion::udf::register_functions;
45use lance_index::scalar::FullTextSearchQuery;
46use lance_index::scalar::inverted::parser::from_json;
47use parquet::arrow::ArrowWriter;
48
49const MEM_LIMIT_BYTES: usize = 512 * 1024 * 1024;
52const QUERY_TIMEOUT: Duration = Duration::from_secs(30);
55const INLINE_BUDGET_BYTES: usize = 80_000;
57const MAX_EXPORT_BYTES: usize = 100 * 1024 * 1024;
60pub const DEFAULT_INLINE_ROWS: usize = 100;
62pub const MAX_INLINE_ROWS: usize = 1_000;
64
65#[derive(Debug, Clone, Copy)]
68pub enum Format {
69 Parquet,
70 Ndjson,
71}
72
73impl Format {
74 pub fn ext(self) -> &'static str {
75 match self {
76 Self::Parquet => "parquet",
77 Self::Ndjson => "ndjson",
78 }
79 }
80
81 pub fn mime(self) -> &'static str {
82 match self {
83 Self::Parquet => "application/vnd.apache.parquet",
84 Self::Ndjson => "application/x-ndjson",
85 }
86 }
87}
88
89#[derive(Debug, Clone, Copy)]
91pub enum Mode {
92 Inline,
94 Export(Format),
96}
97
98pub struct Tables {
104 pub sessions: Option<Arc<Dataset>>,
105 pub messages: Option<Arc<Dataset>>,
106 pub parts: Option<Arc<Dataset>>,
107}
108
109pub fn mentions_table(sql: &str, table: &str) -> bool {
116 sql.to_ascii_lowercase()
117 .split(|c: char| !c.is_alphanumeric() && c != '_')
118 .any(|token| token == table)
119}
120
121pub enum Outcome {
123 Inline(String),
125 Export {
127 bytes: Vec<u8>,
128 format: Format,
129 rows: usize,
130 columns: Vec<String>,
131 },
132}
133
134#[derive(Debug)]
138pub enum SqlError {
139 Query(String),
140 Infra(anyhow::Error),
141}
142
143fn infra(error: ArrowError) -> SqlError {
144 SqlError::Infra(anyhow::Error::new(error))
145}
146
147pub async fn run(
150 tables: &Tables,
151 sql: &str,
152 mode: Mode,
153 inline_rows: usize,
154) -> Result<Outcome, SqlError> {
155 let parsed = parse_and_gate(sql)?;
156 if matches!(parsed.kind, StatementKind::Explain) && matches!(mode, Mode::Export(_)) {
157 return Err(SqlError::Query(
158 "EXPLAIN returns a plan, not a result set; use format=text (or json) to read it"
159 .to_owned(),
160 ));
161 }
162 if projection_mentions_vector(parsed.projection_query()) {
163 return Err(SqlError::Query(
164 "the `vector` column is not selectable from pond_sql_query (it is a \
165 FixedSizeList<f32> embedding, ~600 bytes per row and not useful in a result). \
166 For semantic search use pond_search. Filtering on it is allowed in WHERE \
167 (e.g. `vector IS NOT NULL`)."
168 .to_owned(),
169 ));
170 }
171 if jsonb_cast_misuse(sql) {
172 return Err(SqlError::Query(
173 "CAST / `::` does not work on the binary JSONB columns (variant_data, options) - \
174 when the bytes happen to be valid text it can even silently return garbage. \
175 Stringify the whole value with json_extract(col, '$') or read one field with \
176 json_extract(col, '$.field')."
177 .to_owned(),
178 ));
179 }
180 if jsonb_fulldoc_like_scan(sql) {
181 return Err(SqlError::Query(
182 "a leading-wildcard LIKE over the whole JSONB document - \
183 json_extract(variant_data, '$') LIKE '%...%' - stringifies and scans every row, \
184 so over parts it will not finish within the time limit. There is no substring \
185 index on tool bodies yet (TODO #47: lance v8 FM-Index). Instead match a single \
186 field with json_extract(variant_data, '$.field') LIKE '...', scope to one session \
187 with session_id = '<id>' and read it with pond_get, or search conversational text \
188 with contains_tokens(search_text, '...')."
189 .to_owned(),
190 ));
191 }
192 let ctx = build_context()?;
193 register(&ctx, tables)?;
194
195 let options = SQLOptions::new()
201 .with_allow_ddl(false)
202 .with_allow_dml(false)
203 .with_allow_statements(matches!(parsed.kind, StatementKind::Explain));
204 let df = ctx
205 .sql_with_options(sql, options)
206 .await
207 .map_err(|error| SqlError::Query(enrich(&format!("SQL error: {error}"))))?;
208
209 let result_schema = Arc::new(df.schema().as_arrow().clone());
212 let started = Instant::now();
213 let collected = tokio::time::timeout(QUERY_TIMEOUT, df.collect())
219 .await
220 .map_err(|_| {
221 SqlError::Query(format!(
222 "query exceeded the {}s limit; add a narrower WHERE or a LIMIT. If you were \
223 substring-scanning variant_data (json_extract + LIKE), there is no \
224 substring index on tool bodies yet: filter parts by type and \
225 json_get_string(variant_data, 'name') first, or search conversational \
226 text with contains_tokens(search_text, '...') instead.",
227 QUERY_TIMEOUT.as_secs()
228 ))
229 })?
230 .map_err(|error| SqlError::Query(enrich(&format!("SQL error: {error}"))))?;
231 let elapsed = started.elapsed();
232
233 let display: Vec<RecordBatch> = if collected.is_empty() {
234 vec![displayable(&RecordBatch::new_empty(result_schema)).map_err(infra)?]
235 } else {
236 collected
237 .into_iter()
238 .map(|batch| displayable(&batch))
239 .collect::<Result<_, _>>()
240 .map_err(infra)?
241 };
242
243 match mode {
244 Mode::Inline => Ok(Outcome::Inline(
245 render_inline(&display, inline_rows, elapsed).map_err(infra)?,
246 )),
247 Mode::Export(format) => {
248 let rows = display.iter().map(RecordBatch::num_rows).sum();
249 let columns = display
250 .first()
251 .map(|batch| {
252 batch
253 .schema()
254 .fields()
255 .iter()
256 .map(|field| field.name().clone())
257 .collect::<Vec<_>>()
258 })
259 .unwrap_or_default();
260 let bytes = match format {
261 Format::Parquet => encode_parquet(&display)?,
262 Format::Ndjson => encode_ndjson(&display)?,
263 };
264 if bytes.len() > MAX_EXPORT_BYTES {
265 return Err(SqlError::Query(format!(
266 "export is {} bytes, over the {MAX_EXPORT_BYTES} byte limit; \
267 narrow the query or aggregate",
268 bytes.len()
269 )));
270 }
271 Ok(Outcome::Export {
272 bytes,
273 format,
274 rows,
275 columns,
276 })
277 }
278 }
279}
280
281#[derive(Debug, Clone, Copy, PartialEq, Eq)]
283enum StatementKind {
284 Query,
286 Explain,
288}
289
290struct ParsedStatement {
296 kind: StatementKind,
297 query: lance::deps::datafusion::sql::sqlparser::ast::Query,
298}
299
300impl ParsedStatement {
301 fn projection_query(&self) -> &lance::deps::datafusion::sql::sqlparser::ast::Query {
302 &self.query
303 }
304}
305
306fn parse_and_gate(sql: &str) -> Result<ParsedStatement, SqlError> {
313 let statements = DFParser::parse_sql(sql)
314 .map_err(|error| SqlError::Query(format!("SQL parse error: {error}")))?;
315 if statements.len() != 1 {
316 return Err(SqlError::Query(
317 "pond_sql_query runs exactly one statement; submit a single SELECT".to_owned(),
318 ));
319 }
320 let Some(front) = statements.front() else {
321 return Err(read_only_rejection());
322 };
323 match front {
324 DfStatement::Statement(boxed) => match boxed.as_ref() {
325 SqlStatement::Query(query) => Ok(ParsedStatement {
326 kind: StatementKind::Query,
327 query: query.as_ref().clone(),
328 }),
329 _ => Err(read_only_rejection()),
330 },
331 DfStatement::Explain(explain) => match explain.statement.as_ref() {
332 DfStatement::Statement(inner) => match inner.as_ref() {
333 SqlStatement::Query(query) => Ok(ParsedStatement {
334 kind: StatementKind::Explain,
335 query: query.as_ref().clone(),
336 }),
337 _ => Err(read_only_rejection()),
338 },
339 _ => Err(read_only_rejection()),
340 },
341 _ => Err(read_only_rejection()),
342 }
343}
344
345fn read_only_rejection() -> SqlError {
346 SqlError::Query(
347 "pond_sql_query is read-only: only a single SELECT/WITH (or EXPLAIN of one) is \
348 allowed (no INSERT/UPDATE/DELETE/CREATE/DROP/COPY/SET)"
349 .to_owned(),
350 )
351}
352
353fn projection_mentions_vector(query: &lance::deps::datafusion::sql::sqlparser::ast::Query) -> bool {
364 walk_set_expr_for_vector(query.body.as_ref())
365}
366
367fn walk_set_expr_for_vector(expr: &SetExpr) -> bool {
368 match expr {
369 SetExpr::Select(select) => select
370 .projection
371 .iter()
372 .any(|item| mentions_vector_token(&item.to_string())),
373 SetExpr::Query(inner) => walk_set_expr_for_vector(inner.body.as_ref()),
374 SetExpr::SetOperation { left, right, .. } => {
375 walk_set_expr_for_vector(left) || walk_set_expr_for_vector(right)
376 }
377 _ => false,
378 }
379}
380
381fn mentions_vector_token(text: &str) -> bool {
382 text.split(|c: char| !c.is_alphanumeric() && c != '_')
383 .any(|token| token == "vector")
384}
385
386fn jsonb_cast_misuse(sql: &str) -> bool {
393 const JSONB_COLUMNS: [&str; 2] = ["variant_data", "options"];
394 let lowered = sql.to_ascii_lowercase();
395 let bytes = lowered.as_bytes();
396 let is_ident = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
397
398 for column in JSONB_COLUMNS {
400 let mut start = 0;
401 while let Some(pos) = lowered[start..].find(column) {
402 let begin = start + pos;
403 let end = begin + column.len();
404 start = end;
405 let bounded = (begin == 0 || !is_ident(bytes[begin - 1]))
406 && (end == bytes.len() || !is_ident(bytes[end]));
407 if bounded && lowered[end..].trim_start().starts_with("::") {
408 return true;
409 }
410 }
411 }
412
413 let mut start = 0;
415 while let Some(pos) = lowered[start..].find("cast") {
416 let begin = start + pos;
417 start = begin + 4;
418 if begin > 0 && is_ident(bytes[begin - 1]) {
419 continue;
420 }
421 let Some(open) = lowered[begin + 4..].trim_start().strip_prefix('(') else {
422 continue;
423 };
424 let mut operand = open.trim_start();
425 if let Some(dot) = operand.find('.')
426 && dot > 0
427 && operand.as_bytes()[..dot].iter().all(|b| is_ident(*b))
428 {
429 operand = &operand[dot + 1..];
430 }
431 for column in JSONB_COLUMNS {
432 if let Some(after) = operand.strip_prefix(column)
433 && !after.starts_with(|c: char| c.is_ascii_alphanumeric() || c == '_')
434 && after
435 .trim_start()
436 .strip_prefix("as")
437 .is_some_and(|rest| rest.starts_with(char::is_whitespace))
438 {
439 return true;
440 }
441 }
442 }
443 false
444}
445
446fn jsonb_fulldoc_like_scan(sql: &str) -> bool {
460 const JSONB_COLUMNS: [&str; 2] = ["variant_data", "options"];
461 const NEEDLE: &str = "json_extract";
462 let lowered = sql.to_ascii_lowercase();
463 let bytes = lowered.as_bytes();
464 let is_ident = |b: u8| b.is_ascii_alphanumeric() || b == b'_';
465
466 let mut start = 0;
467 while let Some(pos) = lowered[start..].find(NEEDLE) {
468 let begin = start + pos;
469 start = begin + NEEDLE.len();
470 if begin > 0 && is_ident(bytes[begin - 1]) {
471 continue;
472 }
473 let Some(rest) = lowered[start..].trim_start().strip_prefix('(') else {
474 continue;
475 };
476 let mut operand = rest.trim_start();
477 if let Some(dot) = operand.find('.')
479 && dot > 0
480 && operand.as_bytes()[..dot].iter().all(|b| is_ident(*b))
481 {
482 operand = &operand[dot + 1..];
483 }
484 let Some(col) = JSONB_COLUMNS.into_iter().find(|c| operand.starts_with(c)) else {
485 continue;
486 };
487 let tail = operand[col.len()..].trim_start();
490 let Some(tail) = tail
491 .strip_prefix(',')
492 .map(str::trim_start)
493 .and_then(|t| t.strip_prefix("'$'"))
494 .map(str::trim_start)
495 .and_then(|t| t.strip_prefix(')'))
496 else {
497 continue;
498 };
499 let mut tail = tail.trim_start();
501 while let Some(next) = tail.strip_prefix(')') {
502 tail = next.trim_start();
503 }
504 if let Some(next) = tail.strip_prefix("not")
505 && next.starts_with(char::is_whitespace)
506 {
507 tail = next.trim_start();
508 }
509 for op in ["like", "ilike"] {
510 if let Some(next) = tail.strip_prefix(op)
511 && next.starts_with(char::is_whitespace)
512 && next.trim_start().starts_with("'%")
513 {
514 return true;
515 }
516 }
517 }
518 false
519}
520
521fn build_context() -> Result<SessionContext, SqlError> {
522 let runtime = RuntimeEnvBuilder::new()
523 .with_memory_limit(MEM_LIMIT_BYTES, 1.0)
524 .build_arc()
525 .map_err(|error| SqlError::Infra(anyhow!("datafusion runtime init failed: {error}")))?;
526 let state = SessionStateBuilder::new()
529 .with_config(SessionConfig::new().with_information_schema(true))
530 .with_runtime_env(runtime)
531 .with_default_features()
532 .build();
533 Ok(SessionContext::new_with_state(state))
534}
535
536fn renamed_key(table: &str) -> Option<&'static str> {
541 match table {
542 "messages" => Some("message_id"),
543 "sessions" => Some("session_id"),
544 _ => None,
545 }
546}
547
548fn register(ctx: &SessionContext, tables: &Tables) -> Result<(), SqlError> {
549 for (name, dataset) in [
550 ("sessions", &tables.sessions),
551 ("messages", &tables.messages),
552 ] {
553 let Some(dataset) = dataset else { continue };
554 let provider = LanceTableProvider::new(dataset.clone(), false, false);
559 let key = renamed_key(name).unwrap_or("id");
560 let view = renamed_view(name, Arc::new(provider), "id", key)
561 .map_err(|error| SqlError::Infra(anyhow!("build {name} view: {error}")))?;
562 ctx.register_table(name, Arc::new(view))
563 .map_err(|error| SqlError::Infra(anyhow!("register table {name}: {error}")))?;
564 }
565 if let Some(parts) = &tables.parts {
570 let provider = LanceTableProvider::new(parts.clone(), false, false);
571 let keep: Vec<_> = parts
572 .schema()
573 .fields
574 .iter()
575 .filter(|field| field.name != "data")
576 .map(|field| col(field.name.as_str()))
577 .collect();
578 let plan = LogicalPlanBuilder::scan("parts", provider_as_source(Arc::new(provider)), None)
579 .and_then(|builder| builder.project(keep))
580 .and_then(LogicalPlanBuilder::build)
581 .map_err(|error| SqlError::Infra(anyhow!("build parts view: {error}")))?;
582 ctx.register_table("parts", Arc::new(ViewTable::new(plan, None)))
583 .map_err(|error| SqlError::Infra(anyhow!("register table parts: {error}")))?;
584 }
585 let datasets = [
590 ("sessions", &tables.sessions),
591 ("messages", &tables.messages),
592 ("parts", &tables.parts),
593 ]
594 .into_iter()
595 .filter_map(|(name, dataset)| dataset.clone().map(|d| (name.to_owned(), d)))
596 .collect();
597 let fts = ScoredFtsUdtf { datasets };
598 ctx.register_udtf("fts", Arc::new(fts));
599 register_functions(ctx);
600 for udf in lenient_json_udfs() {
604 ctx.register_udf(udf);
605 }
606 if let Some(first_value) = ctx.state().aggregate_functions().get("first_value") {
611 ctx.register_udaf(first_value.as_ref().clone().with_aliases(["any_value"]));
612 }
613 ctx.register_udf(ScalarUDF::new_from_impl(FtsMisuse::new()));
619 Ok(())
620}
621
622fn renamed_view(
626 scan_name: &str,
627 provider: Arc<dyn TableProvider>,
628 from: &str,
629 to: &str,
630) -> Result<ViewTable, DataFusionError> {
631 let projection: Vec<_> = provider
632 .schema()
633 .fields()
634 .iter()
635 .map(|field| {
636 let column = col(field.name().as_str());
637 if field.name() == from {
638 column.alias(to)
639 } else {
640 column
641 }
642 })
643 .collect();
644 let plan = LogicalPlanBuilder::scan(scan_name, provider_as_source(provider), None)?
645 .project(projection)?
646 .build()?;
647 Ok(ViewTable::new(plan, None))
648}
649
650const FTS_MISUSE: &str = "fts is a table function and goes in FROM, not in WHERE or the \
651 projection. For filtering use WHERE contains_tokens(search_text, 'word1 word2') (all \
652 words must match; index-accelerated). For ranked results: SELECT m.message_id, f._score \
653 FROM fts('messages', '{\"match\":{\"column\":\"search_text\",\"terms\":\"...\"}}') f \
654 JOIN messages m ON m.message_id = f.message_id ORDER BY f._score DESC.";
655
656#[derive(Debug, PartialEq, Eq, Hash)]
658struct FtsMisuse {
659 signature: Signature,
660}
661
662impl FtsMisuse {
663 fn new() -> Self {
664 Self {
665 signature: Signature::variadic_any(Volatility::Immutable),
666 }
667 }
668}
669
670impl ScalarUDFImpl for FtsMisuse {
671 fn as_any(&self) -> &dyn std::any::Any {
672 self
673 }
674
675 fn name(&self) -> &str {
676 "fts"
677 }
678
679 fn signature(&self) -> &Signature {
680 &self.signature
681 }
682
683 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
684 Err(DataFusionError::Plan(FTS_MISUSE.to_owned()))
685 }
686
687 fn invoke_with_args(
688 &self,
689 _args: ScalarFunctionArgs,
690 ) -> Result<ColumnarValue, DataFusionError> {
691 Err(DataFusionError::Plan(FTS_MISUSE.to_owned()))
692 }
693}
694
695#[derive(Debug)]
707struct ScoredFtsUdtf {
708 datasets: HashMap<String, Arc<Dataset>>,
709}
710
711impl TableFunctionImpl for ScoredFtsUdtf {
712 fn call(
713 &self,
714 expr: &[Expr],
715 ) -> Result<Arc<dyn TableProvider>, lance::deps::datafusion::error::DataFusionError> {
716 let [table_expr, query_expr] = expr else {
717 return Err(DataFusionError::Execution(
718 "fts() takes (table_name, fts_query_json)".to_owned(),
719 ));
720 };
721 let Expr::Literal(ScalarValue::Utf8(Some(table_name)), _) = table_expr else {
722 return Err(DataFusionError::Execution(
723 "fts() first argument must be a table name string".to_owned(),
724 ));
725 };
726 let Expr::Literal(ScalarValue::Utf8(Some(fts_query)), _) = query_expr else {
727 return Err(DataFusionError::Execution(
728 "fts() second argument must be the fts query as a JSON string".to_owned(),
729 ));
730 };
731 let dataset = self.datasets.get(table_name).ok_or_else(|| {
732 DataFusionError::Execution(format!("fts(): table {table_name} not found"))
733 })?;
734 let mut full_schema = Schema::from(dataset.schema());
735 full_schema = full_schema
736 .try_with_column(Field::new(SCORE_COLUMN, DataType::Float32, true))
737 .map_err(|error| DataFusionError::ArrowError(Box::new(error), None))?;
738 let provider: Arc<dyn TableProvider> = Arc::new(ScoredFtsProvider {
739 dataset: dataset.clone(),
740 fts_query: FullTextSearchQuery::new_query(from_json(fts_query)?),
741 full_schema: Arc::new(full_schema),
742 });
743 match renamed_key(table_name) {
746 Some(key) => Ok(Arc::new(renamed_view("fts", provider, "id", key)?)),
747 None => Ok(provider),
748 }
749 }
750}
751
752const SCORE_COLUMN: &str = "_score";
753
754#[derive(Debug)]
755struct ScoredFtsProvider {
756 dataset: Arc<Dataset>,
757 fts_query: FullTextSearchQuery,
758 full_schema: SchemaRef,
759}
760
761#[async_trait::async_trait]
762impl TableProvider for ScoredFtsProvider {
763 fn as_any(&self) -> &dyn std::any::Any {
764 self
765 }
766
767 fn schema(&self) -> SchemaRef {
768 self.full_schema.clone()
769 }
770
771 fn table_type(&self) -> TableType {
772 TableType::Temporary
773 }
774
775 async fn scan(
776 &self,
777 _state: &dyn Session,
778 projection: Option<&Vec<usize>>,
779 filters: &[Expr],
780 limit: Option<usize>,
781 ) -> Result<Arc<dyn ExecutionPlan>, lance::deps::datafusion::error::DataFusionError> {
782 let mut scan = self.dataset.scan();
783 scan.full_text_search(self.fts_query.clone())?;
784 scan.disable_scoring_autoprojection();
788 match projection {
789 Some(projection) if projection.is_empty() => {
790 scan.empty_project()?;
791 }
792 Some(projection) => {
793 let columns: Vec<&str> = projection
794 .iter()
795 .map(|idx| self.full_schema.field(*idx).name().as_str())
796 .collect();
797 scan.project(&columns)?;
798 }
799 None => {
800 let columns: Vec<&str> = self
801 .full_schema
802 .fields()
803 .iter()
804 .map(|field| field.name().as_str())
805 .collect();
806 scan.project(&columns)?;
807 }
808 }
809 if let Some(combined) = filters
810 .iter()
811 .cloned()
812 .reduce(|left, right| left.and(right))
813 {
814 scan.filter_expr(combined);
815 }
816 scan.limit(limit.map(|l| l as i64), None)?;
817 scan.create_plan().await.map_err(DataFusionError::from)
818 }
819}
820
821#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
823enum JsonGet {
824 Text,
825 Int,
826 Float,
827 Bool,
828}
829
830const MAX_JSON_KEYS: usize = 6;
833
834fn lenient_json_udfs() -> [ScalarUDF; 4] {
844 let make = |name: &'static str, kind: JsonGet, return_type: DataType| {
845 ScalarUDF::new_from_impl(LenientJsonGet {
846 name,
847 kind,
848 return_type,
849 signature: json_key_path_signature(),
850 })
851 };
852 [
853 make("json_get_string", JsonGet::Text, DataType::Utf8),
854 make("json_get_int", JsonGet::Int, DataType::Int64),
855 make("json_get_float", JsonGet::Float, DataType::Float64),
856 make("json_get_bool", JsonGet::Bool, DataType::Boolean),
857 ]
858}
859
860fn json_key_path_signature() -> Signature {
862 let arities = (1..=MAX_JSON_KEYS)
863 .map(|keys| {
864 let mut types = vec![DataType::LargeBinary];
865 types.extend(std::iter::repeat_n(DataType::Utf8, keys));
866 TypeSignature::Exact(types)
867 })
868 .collect();
869 Signature::one_of(arities, Volatility::Immutable)
870}
871
872#[derive(Debug, PartialEq, Eq, Hash)]
874struct LenientJsonGet {
875 name: &'static str,
876 kind: JsonGet,
877 return_type: DataType,
878 signature: Signature,
879}
880
881impl ScalarUDFImpl for LenientJsonGet {
882 fn as_any(&self) -> &dyn std::any::Any {
883 self
884 }
885
886 fn name(&self) -> &str {
887 self.name
888 }
889
890 fn signature(&self) -> &Signature {
891 &self.signature
892 }
893
894 fn return_type(&self, _arg_types: &[DataType]) -> Result<DataType, DataFusionError> {
895 Ok(self.return_type.clone())
896 }
897
898 fn invoke_with_args(&self, args: ScalarFunctionArgs) -> Result<ColumnarValue, DataFusionError> {
899 json_get_lenient(&args.args, &self.kind)
900 }
901}
902
903fn json_step(raw: jsonb::RawJsonb<'_>, key: &str) -> Option<jsonb::OwnedJsonb> {
905 let value = if raw.is_object().unwrap_or(false) {
906 raw.get_by_name(key, false).ok().flatten()
907 } else if raw.is_array().unwrap_or(false) {
908 key.parse::<usize>()
909 .ok()
910 .and_then(|index| raw.get_by_index(index).ok().flatten())
911 } else {
912 None
913 };
914 value.filter(|value| !value.as_raw().is_null().unwrap_or(false))
915}
916
917fn json_get_lenient(
918 args: &[ColumnarValue],
919 kind: &JsonGet,
920) -> Result<ColumnarValue, DataFusionError> {
921 let arrays = ColumnarValue::values_to_arrays(args)?;
922 let Some((jsonb_arg, key_args)) = arrays.split_first().filter(|(_, keys)| !keys.is_empty())
923 else {
924 return Err(DataFusionError::Execution(
925 "json_get_* takes (json_column, 'key', ...) - at least one key".to_owned(),
926 ));
927 };
928 let jsonb_array = jsonb_arg
929 .as_any()
930 .downcast_ref::<LargeBinaryArray>()
931 .ok_or_else(|| {
932 DataFusionError::Execution(
933 "json_get_* argument 1 must be a JSON column (variant_data, options)".to_owned(),
934 )
935 })?;
936 let key_arrays: Vec<&StringArray> = key_args
937 .iter()
938 .map(|key_arg| {
939 key_arg
940 .as_any()
941 .downcast_ref::<StringArray>()
942 .ok_or_else(|| {
943 DataFusionError::Execution("json_get_* keys must be string literals".to_owned())
944 })
945 })
946 .collect::<Result<_, _>>()?;
947
948 let field = |row: usize| -> Option<jsonb::OwnedJsonb> {
949 if jsonb_array.is_null(row) {
950 return None;
951 }
952 let mut keys = key_arrays.iter();
953 let first = keys.next()?;
954 if first.is_null(row) {
955 return None;
956 }
957 let mut current = json_step(
958 jsonb::RawJsonb::new(jsonb_array.value(row)),
959 first.value(row),
960 )?;
961 for key_array in keys {
962 if key_array.is_null(row) {
963 return None;
964 }
965 current = json_step(current.as_raw(), key_array.value(row))?;
966 }
967 Some(current)
968 };
969
970 let rows = jsonb_array.len();
971 let array: Arc<dyn Array> = match kind {
972 JsonGet::Text => {
973 let mut builder = StringBuilder::with_capacity(rows, 1024);
974 for row in 0..rows {
975 match field(row) {
976 Some(value) => match value.as_raw().to_str() {
979 Ok(text) => builder.append_value(text),
980 Err(_) => builder.append_value(value.to_string()),
981 },
982 None => builder.append_null(),
983 }
984 }
985 Arc::new(builder.finish())
986 }
987 JsonGet::Int => {
988 let mut builder = Int64Builder::with_capacity(rows);
989 for row in 0..rows {
990 builder.append_option(field(row).and_then(|value| value.as_raw().to_i64().ok()));
991 }
992 Arc::new(builder.finish())
993 }
994 JsonGet::Float => {
995 let mut builder = Float64Builder::with_capacity(rows);
996 for row in 0..rows {
997 builder.append_option(field(row).and_then(|value| value.as_raw().to_f64().ok()));
998 }
999 Arc::new(builder.finish())
1000 }
1001 JsonGet::Bool => {
1002 let mut builder = BooleanBuilder::with_capacity(rows);
1003 for row in 0..rows {
1004 builder.append_option(field(row).and_then(|value| value.as_raw().to_bool().ok()));
1005 }
1006 Arc::new(builder.finish())
1007 }
1008 };
1009 Ok(ColumnarValue::Array(array))
1010}
1011
1012fn enrich(message: &str) -> String {
1016 const HINTS: &[(&str, &str)] = &[
1017 (
1018 "No field named",
1019 "columns are messages(session_id, message_id, timestamp, role, source_agent, \
1020 project, content [system-role only], search_text [the conversational text], \
1021 embedding_model, options) | sessions(session_id, parent_session_id, \
1022 parent_message_id, source_agent, created_at, project, options) | \
1023 parts(session_id, message_id, id, ordinal, type, provenance, variant_data, \
1024 options). Part bodies (tool params/results, text) live in parts.variant_data - \
1025 read them with json_extract(variant_data, '$.field'). For text search use \
1026 contains_tokens(search_text, '...') in WHERE, or the fts('messages', ...) \
1027 table function in FROM for ranked results; to read a transcript use pond_get. \
1028 Full doc: resource schema://pond-sql.",
1029 ),
1030 (
1031 "Encountered non UTF-8 data",
1032 "JSON columns (variant_data, options) are binary JSONB - CAST / ::text does not \
1033 work on them. Stringify the whole value with json_extract(col, '$'), or fetch \
1034 one field with json_extract(col, '$.field').",
1035 ),
1036 (
1037 "Resources exhausted",
1038 "the query ran out of memory - usually from carrying whole JSON columns \
1039 (variant_data, options) through a join or sort. Project narrow fields with \
1040 json_extract(col, '$.field') instead of whole columns, filter before joining, \
1041 or export the full set with format=parquet.",
1042 ),
1043 (
1044 "LIKE prefix queries are not supported for bitmap indexes",
1045 "prefix LIKE ('x%') and starts_with() fail on bitmap-indexed columns \
1046 (messages.source_agent, messages.role). Use equality, \
1047 split_part(source_agent, '/', 1) = '...', or an infix pattern (LIKE '%x%').",
1048 ),
1049 (
1050 "call to 'json_",
1051 "JSON function signatures: json_get_string|json_get_int|json_get_float|\
1052 json_get_bool(col, 'key', ...) walk a key path (array steps by numeric \
1053 index); json_get(col, 'key') returns JSONB for chaining; json_extract(col, \
1054 '$.a.b') takes a JSONPath and returns JSON text of any value (the right tool \
1055 for deeply nested or mixed-type fields).",
1056 ),
1057 (
1058 "Invalid function 'json",
1059 "available JSON functions: json_get_string, json_get_int, json_get_float, \
1060 json_get_bool (col, 'key', ...); json_get(col, 'key') -> JSONB for chaining; \
1061 json_extract(col, '$.a.b') -> JSON text; json_array_contains; \
1062 json_array_length. See resource schema://pond-sql.",
1063 ),
1064 (
1065 "does not satisfy distribution requirements",
1070 "this fts query shape planned an unexecutable join. For AND semantics use a \
1071 single match query with operator And: fts('messages', \
1072 '{\"match\":{\"column\":\"search_text\",\"terms\":\"a b\",\"operator\":\"And\"}}'), \
1073 optionally with LIKE post-filters in WHERE.",
1074 ),
1075 (
1076 "position is not found but required for phrase queries",
1077 "the full-text index is built without positions, so \"phrase\" queries are \
1078 unavailable. Use a match query with operator And plus LIKE post-filters for \
1079 exact-substring matching.",
1080 ),
1081 ];
1082 for (pattern, hint) in HINTS {
1083 if message.contains(pattern) {
1084 return format!("{message}\nhint: {hint}");
1085 }
1086 }
1087 message.to_owned()
1088}
1089
1090fn displayable(batch: &RecordBatch) -> Result<RecordBatch, ArrowError> {
1093 let decoded = lance_arrow::json::convert_lance_json_to_arrow(batch)?;
1094 let keep: Vec<usize> = decoded
1095 .schema()
1096 .fields()
1097 .iter()
1098 .enumerate()
1099 .filter(|(_, field)| is_displayable(field.data_type()))
1100 .map(|(index, _)| index)
1101 .collect();
1102 decoded.project(&keep)
1103}
1104
1105fn is_displayable(data_type: &DataType) -> bool {
1106 !matches!(
1107 data_type,
1108 DataType::FixedSizeList(_, _)
1109 | DataType::Binary
1110 | DataType::LargeBinary
1111 | DataType::BinaryView
1112 | DataType::FixedSizeBinary(_)
1113 )
1114}
1115
1116fn collapse_newlines(batches: &[RecordBatch]) -> Result<Vec<RecordBatch>, ArrowError> {
1122 fn escape<O: OffsetSizeTrait>(array: &GenericStringArray<O>) -> ArrayRef {
1123 let escaped: GenericStringArray<O> =
1124 array.iter().map(|value| value.map(escape_cell)).collect();
1125 Arc::new(escaped)
1126 }
1127 fn escape_cell(text: &str) -> std::borrow::Cow<'_, str> {
1128 if text.contains(['\n', '\r']) {
1129 std::borrow::Cow::Owned(text.replace("\r\n", "\\n").replace(['\n', '\r'], "\\n"))
1130 } else {
1131 std::borrow::Cow::Borrowed(text)
1132 }
1133 }
1134 batches
1135 .iter()
1136 .map(|batch| {
1137 let columns: Vec<ArrayRef> = batch
1138 .columns()
1139 .iter()
1140 .map(|array| match array.data_type() {
1141 DataType::Utf8 => array
1142 .as_any()
1143 .downcast_ref::<StringArray>()
1144 .map_or_else(|| array.clone(), escape),
1145 DataType::LargeUtf8 => array
1146 .as_any()
1147 .downcast_ref::<GenericStringArray<i64>>()
1148 .map_or_else(|| array.clone(), escape),
1149 DataType::Utf8View => array
1150 .as_any()
1151 .downcast_ref::<StringViewArray>()
1152 .map_or_else(
1153 || array.clone(),
1154 |view| {
1155 let escaped: StringViewArray =
1156 view.iter().map(|value| value.map(escape_cell)).collect();
1157 Arc::new(escaped)
1158 },
1159 ),
1160 _ => array.clone(),
1161 })
1162 .collect();
1163 RecordBatch::try_new(batch.schema(), columns)
1164 })
1165 .collect()
1166}
1167
1168fn render_inline(
1169 display: &[RecordBatch],
1170 max_rows: usize,
1171 elapsed: Duration,
1172) -> Result<String, ArrowError> {
1173 let total: usize = display.iter().map(RecordBatch::num_rows).sum();
1174 let elapsed_ms = elapsed.as_millis();
1175 if total == 0 {
1176 return Ok(format!(
1178 "0 rows ({elapsed_ms} ms).\n{}",
1179 pretty_format_batches(display)?
1180 ));
1181 }
1182 let render = |shown: usize| -> Result<String, ArrowError> {
1183 let limited = collapse_newlines(&limit_batches(display, shown))?;
1184 Ok(pretty_format_batches(&limited)?.to_string())
1185 };
1186 let mut shown = total.min(max_rows);
1187 let mut table = render(shown)?;
1188 while table.len() > INLINE_BUDGET_BYTES && shown > 1 {
1189 shown = (shown / 2).max(1);
1190 table = render(shown)?;
1191 }
1192 let mut out = format!("{total} row(s) in {elapsed_ms} ms; showing {shown}.\n{table}");
1193 if shown < total {
1194 out.push_str(&format!(
1195 "\n... {} row(s) omitted. To page: ORDER BY <indexed col> (e.g. timestamp, \
1196 message_id), then in the next call add `WHERE (col, message_id) < \
1197 (<last_col>, <last_message_id>)` - keyset pagination, see schema://pond-sql. \
1198 For the full set: format=parquet or format=ndjson.",
1199 total - shown
1200 ));
1201 }
1202 Ok(out)
1203}
1204
1205fn limit_batches(batches: &[RecordBatch], max_rows: usize) -> Vec<RecordBatch> {
1206 let mut out = Vec::new();
1207 let mut remaining = max_rows;
1208 for batch in batches {
1209 if remaining == 0 {
1210 break;
1211 }
1212 if batch.num_rows() <= remaining {
1213 remaining -= batch.num_rows();
1214 out.push(batch.clone());
1215 } else {
1216 out.push(batch.slice(0, remaining));
1217 remaining = 0;
1218 }
1219 }
1220 out
1221}
1222
1223fn encode_parquet(batches: &[RecordBatch]) -> Result<Vec<u8>, SqlError> {
1224 let schema = batches
1225 .first()
1226 .map(RecordBatch::schema)
1227 .ok_or_else(|| SqlError::Query("query returned no columns to export".to_owned()))?;
1228 let mut buffer = Vec::new();
1229 let mut writer = ArrowWriter::try_new(&mut buffer, schema, None)
1230 .map_err(|error| SqlError::Infra(anyhow!("parquet init failed: {error}")))?;
1231 for batch in batches {
1232 writer
1233 .write(batch)
1234 .map_err(|error| SqlError::Infra(anyhow!("parquet write failed: {error}")))?;
1235 }
1236 writer
1237 .close()
1238 .map_err(|error| SqlError::Infra(anyhow!("parquet close failed: {error}")))?;
1239 Ok(buffer)
1240}
1241
1242fn encode_ndjson(batches: &[RecordBatch]) -> Result<Vec<u8>, SqlError> {
1243 let mut buffer = Vec::new();
1244 {
1245 let mut writer = LineDelimitedWriter::new(&mut buffer);
1246 let refs: Vec<&RecordBatch> = batches.iter().collect();
1247 writer
1248 .write_batches(&refs)
1249 .map_err(|error| SqlError::Infra(anyhow!("ndjson write failed: {error}")))?;
1250 writer
1251 .finish()
1252 .map_err(|error| SqlError::Infra(anyhow!("ndjson finish failed: {error}")))?;
1253 }
1254 Ok(buffer)
1255}
1256
1257#[cfg(test)]
1258mod tests {
1259 #![allow(clippy::expect_used)]
1260
1261 use super::*;
1262
1263 fn rejected(sql: &str) -> bool {
1264 matches!(parse_and_gate(sql), Err(SqlError::Query(_)))
1265 }
1266
1267 fn parses_as(sql: &str, expected: StatementKind) -> bool {
1268 match parse_and_gate(sql) {
1269 Ok(parsed) => matches!(
1270 (&parsed.kind, &expected),
1271 (StatementKind::Query, StatementKind::Query)
1272 | (StatementKind::Explain, StatementKind::Explain)
1273 ),
1274 Err(_) => false,
1275 }
1276 }
1277
1278 #[test]
1279 fn mentions_table_is_sound_for_open_pruning() {
1280 assert!(mentions_table("SELECT * FROM messages", "messages"));
1283 assert!(mentions_table("select * from MESSAGES", "messages"));
1284 assert!(mentions_table(
1285 "SELECT s.id FROM sessions s JOIN parts p ON s.id = p.session_id",
1286 "parts",
1287 ));
1288 assert!(mentions_table(
1289 "SELECT * FROM fts('messages', '{\"match\":{}}')",
1290 "messages",
1291 ));
1292 assert!(mentions_table(
1293 "WITH x AS (SELECT * FROM sessions) SELECT * FROM x",
1294 "sessions",
1295 ));
1296 assert!(!mentions_table("SELECT * FROM messages", "parts"));
1299 assert!(!mentions_table("SELECT * FROM messages", "sessions"));
1300 assert!(!mentions_table(
1301 "SELECT counterparts FROM messages",
1302 "parts"
1303 ));
1304 }
1305
1306 #[test]
1307 fn allows_single_select_and_cte() {
1308 assert!(parses_as("SELECT 1", StatementKind::Query));
1309 assert!(parses_as(
1310 "SELECT role, count(*) FROM messages GROUP BY role",
1311 StatementKind::Query
1312 ));
1313 assert!(parses_as(
1314 "WITH t AS (SELECT 1 AS a) SELECT a FROM t",
1315 StatementKind::Query
1316 ));
1317 }
1318
1319 #[test]
1320 fn allows_explain_of_select() {
1321 assert!(parses_as("EXPLAIN SELECT 1", StatementKind::Explain));
1322 assert!(parses_as(
1323 "EXPLAIN ANALYZE SELECT role FROM messages",
1324 StatementKind::Explain
1325 ));
1326 }
1327
1328 #[test]
1329 fn rejects_explain_of_non_query() {
1330 assert!(rejected("EXPLAIN INSERT INTO messages VALUES ('x')"));
1333 }
1334
1335 #[test]
1336 fn rejects_writes_and_side_effects() {
1337 assert!(rejected("INSERT INTO messages VALUES ('x')"));
1338 assert!(rejected("UPDATE messages SET role = 'x'"));
1339 assert!(rejected("DELETE FROM messages"));
1340 assert!(rejected("CREATE TABLE t (x INT)"));
1341 assert!(rejected("CREATE VIEW v AS SELECT 1"));
1342 assert!(rejected("DROP TABLE messages"));
1343 assert!(rejected(
1344 "CREATE EXTERNAL TABLE t STORED AS PARQUET LOCATION '/etc'"
1345 ));
1346 assert!(rejected("COPY (SELECT 1) TO '/tmp/x.parquet'"));
1347 assert!(rejected("SET a = 1"));
1348 }
1349
1350 #[test]
1351 fn rejects_multiple_statements() {
1352 assert!(rejected("SELECT 1; SELECT 2"));
1353 assert!(rejected("SELECT 1; DROP TABLE messages"));
1354 }
1355
1356 #[test]
1357 fn rejects_unparseable() {
1358 assert!(rejected("NOT SQL AT ALL ;;"));
1359 }
1360
1361 fn mentions_vector(sql: &str) -> bool {
1362 match parse_and_gate(sql) {
1363 Ok(parsed) => projection_mentions_vector(parsed.projection_query()),
1364 Err(_) => false,
1365 }
1366 }
1367
1368 #[test]
1369 fn explicit_vector_projection_is_rejected() {
1370 assert!(mentions_vector("SELECT vector FROM messages"));
1371 assert!(mentions_vector("SELECT id, vector FROM messages"));
1372 assert!(mentions_vector("SELECT m.vector FROM messages m"));
1373 assert!(mentions_vector("SELECT array_length(vector) FROM messages"));
1374 assert!(mentions_vector("EXPLAIN SELECT vector FROM messages"));
1375 }
1376
1377 #[test]
1378 fn enrich_appends_recovery_hints() {
1379 let cases = [
1381 (
1382 "SQL error: Schema error: No field named created_at.",
1383 "schema://pond-sql",
1384 ),
1385 (
1386 "SQL error: External error: Arrow error: Invalid argument error: \
1387 Encountered non UTF-8 data",
1388 "json_extract",
1389 ),
1390 (
1391 "SQL error: External error: Not supported: LIKE prefix queries are not \
1392 supported for bitmap indexes",
1393 "split_part",
1394 ),
1395 (
1396 "SQL error: Error during planning: Failed to coerce arguments to satisfy \
1397 a call to 'json_get_string' function",
1398 "JSONPath",
1399 ),
1400 (
1401 "SQL error: Error during planning: Invalid function 'json_get_json'.",
1402 "json_extract",
1403 ),
1404 (
1405 "SQL error: Resources exhausted: Additional allocation failed for \
1406 HashJoinInput[0] with top memory consumers",
1407 "json_extract",
1408 ),
1409 ];
1410 for (raw, marker) in cases {
1411 let enriched = enrich(raw);
1412 assert!(enriched.starts_with(raw), "original kept: {enriched}");
1413 assert!(enriched.contains("hint:"), "hint appended: {enriched}");
1414 assert!(enriched.contains(marker), "hint names the fix: {enriched}");
1415 }
1416 assert_eq!(
1418 enrich("SQL error: division by zero"),
1419 "SQL error: division by zero"
1420 );
1421 }
1422
1423 #[test]
1424 fn select_star_and_where_vector_are_allowed() {
1425 assert!(!mentions_vector("SELECT * FROM messages"));
1427 assert!(!mentions_vector(
1429 "SELECT message_id FROM messages WHERE vector IS NOT NULL"
1430 ));
1431 }
1432
1433 #[test]
1434 fn jsonb_cast_misuse_detects_cast_and_coloncolon() {
1435 for sql in [
1436 "SELECT CAST(variant_data AS VARCHAR) FROM parts",
1437 "SELECT cast(p.variant_data as text) FROM parts p",
1438 "SELECT variant_data::text FROM parts",
1439 "SELECT p.variant_data :: varchar FROM parts p",
1440 "SELECT options::text FROM messages",
1441 "SELECT lower(CAST(variant_data AS VARCHAR)) FROM parts",
1442 ] {
1443 assert!(jsonb_cast_misuse(sql), "should reject: {sql}");
1444 }
1445 }
1446
1447 #[test]
1448 fn jsonb_cast_misuse_allows_legitimate_use() {
1449 for sql in [
1450 "SELECT json_extract(variant_data, '$') FROM parts",
1451 "SELECT json_get_string(variant_data, 'name') FROM parts",
1452 "SELECT CAST(ordinal AS BIGINT) FROM parts",
1453 "SELECT timestamp::date FROM messages",
1454 "SELECT my_options::text FROM t",
1456 "SELECT CAST(json_extract(variant_data, '$.x') AS BIGINT) FROM parts",
1457 ] {
1458 assert!(!jsonb_cast_misuse(sql), "should allow: {sql}");
1459 }
1460 }
1461
1462 #[test]
1463 fn jsonb_fulldoc_like_scan_detects_whole_document_substring() {
1464 for sql in [
1465 "SELECT * FROM parts WHERE json_extract(variant_data, '$') LIKE '%needle%'",
1466 "SELECT * FROM parts p WHERE lower(json_extract(p.variant_data, '$')) LIKE '%x%'",
1467 "SELECT * FROM messages WHERE json_extract(options, '$') ILIKE '%y%'",
1468 "SELECT * FROM parts WHERE json_extract(variant_data,'$') NOT LIKE '%z%'",
1469 "SELECT p.message_id FROM parts p JOIN messages m ON p.message_id = m.message_id \
1471 WHERE m.timestamp >= '2026-06-11' AND lower(json_extract(p.variant_data, '$')) \
1472 LIKE '%weekly limit%'",
1473 ] {
1474 assert!(jsonb_fulldoc_like_scan(sql), "should reject: {sql}");
1475 }
1476 }
1477
1478 #[test]
1479 fn jsonb_fulldoc_like_scan_allows_targeted_and_nonleading() {
1480 for sql in [
1481 "SELECT * FROM parts WHERE json_extract(variant_data, '$.name') LIKE '%x%'",
1483 "SELECT * FROM parts WHERE json_extract(variant_data, '$') LIKE 'pre%'",
1485 "SELECT * FROM messages WHERE search_text LIKE '%x%'",
1487 "SELECT * FROM messages WHERE contains_tokens(search_text, 'x')",
1489 "SELECT json_extract(variant_data, '$') FROM parts LIMIT 1",
1491 ] {
1492 assert!(!jsonb_fulldoc_like_scan(sql), "should allow: {sql}");
1493 }
1494 }
1495
1496 #[test]
1497 fn render_inline_collapses_newlines_in_cells() {
1498 let schema = Arc::new(Schema::new(vec![Field::new("t", DataType::Utf8, true)]));
1499 let batch = RecordBatch::try_new(
1500 schema,
1501 vec![Arc::new(StringArray::from(vec![Some(
1502 "line one\nline two\r\nline three",
1503 )]))],
1504 )
1505 .expect("single-column batch");
1506 let out = render_inline(&[batch], 10, Duration::from_millis(1)).expect("render succeeds");
1507 assert!(
1508 out.contains("line one\\nline two\\nline three"),
1509 "newlines collapse to literal \\n: {out}"
1510 );
1511 let row_lines: Vec<&str> = out
1514 .lines()
1515 .filter(|line| line.contains("line one"))
1516 .collect();
1517 assert_eq!(row_lines.len(), 1, "one physical line per row: {out}");
1518 }
1519}