1use std::ops::Range;
5use std::pin::Pin;
6use std::sync::{Arc, LazyLock};
7use std::task::{Context, Poll};
8
9use arrow::array::AsArray;
10use arrow_array::{Array, Float32Array, Int64Array, RecordBatch};
11use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, SortOptions};
12use arrow_select::concat::concat_batches;
13use async_recursion::async_recursion;
14use chrono::Utc;
15use datafusion::common::{exec_datafusion_err, DFSchema, NullEquality, SchemaExt};
16use datafusion::functions_aggregate;
17use datafusion::functions_aggregate::count::count_udaf;
18use datafusion::logical_expr::{col, lit, Expr, ScalarUDF};
19use datafusion::physical_expr::PhysicalSortExpr;
20use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
21use datafusion::physical_plan::expressions;
22use datafusion::physical_plan::projection::ProjectionExec as DFProjectionExec;
23use datafusion::physical_plan::sorts::sort::SortExec;
24use datafusion::physical_plan::{
25 aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
26 display::DisplayableExecutionPlan,
27 expressions::Literal,
28 limit::GlobalLimitExec,
29 repartition::RepartitionExec,
30 union::UnionExec,
31 ExecutionPlan, SendableRecordBatchStream,
32};
33use datafusion::scalar::ScalarValue;
34use datafusion_expr::execution_props::ExecutionProps;
35use datafusion_expr::ExprSchemable;
36use datafusion_functions::core::getfield::GetFieldFunc;
37use datafusion_physical_expr::{aggregate::AggregateExprBuilder, expressions::Column};
38use datafusion_physical_expr::{create_physical_expr, LexOrdering, Partitioning, PhysicalExpr};
39use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
40use datafusion_physical_plan::{empty::EmptyExec, joins::HashJoinExec};
41use futures::future::BoxFuture;
42use futures::stream::{Stream, StreamExt};
43use futures::{FutureExt, TryStreamExt};
44use lance_arrow::floats::{coerce_float_vector, FloatType};
45use lance_arrow::DataTypeExt;
46use lance_core::datatypes::{
47 escape_field_path_for_project, format_field_path, Field, OnMissing, Projection,
48};
49use lance_core::error::LanceOptionExt;
50use lance_core::utils::address::RowAddress;
51use lance_core::utils::mask::{RowIdMask, RowIdTreeMap};
52use lance_core::utils::tokio::get_num_compute_intensive_cpus;
53use lance_core::{ROW_ADDR, ROW_ID, ROW_OFFSET};
54use lance_datafusion::exec::{
55 analyze_plan, execute_plan, LanceExecutionOptions, OneShotExec, StrictBatchSizeExec,
56};
57use lance_datafusion::expr::safe_coerce_scalar;
58use lance_datafusion::projection::ProjectionPlan;
59use lance_file::v2::reader::FileReaderOptions;
60use lance_index::scalar::expression::{IndexExprResult, PlannerIndexExt, INDEX_EXPR_RESULT_SCHEMA};
61use lance_index::scalar::inverted::query::{
62 fill_fts_query_column, FtsQuery, FtsSearchParams, MatchQuery, PhraseQuery,
63};
64use lance_index::scalar::inverted::SCORE_COL;
65use lance_index::scalar::FullTextSearchQuery;
66use lance_index::vector::{Query, DIST_COL};
67use lance_index::ScalarIndexCriteria;
68use lance_index::{metrics::NoOpMetricsCollector, scalar::inverted::FTS_SCHEMA};
69use lance_index::{scalar::expression::ScalarIndexExpr, DatasetIndexExt};
70use lance_io::stream::RecordBatchStream;
71use lance_linalg::distance::MetricType;
72use lance_table::format::{Fragment, IndexMetadata};
73use roaring::RoaringBitmap;
74use tracing::{info_span, instrument, Span};
75
76use super::Dataset;
77use crate::dataset::row_offsets_to_row_addresses;
78use crate::dataset::utils::SchemaAdapter;
79use crate::index::vector::utils::{get_vector_dim, get_vector_type};
80use crate::index::DatasetIndexInternalExt;
81use crate::io::exec::filtered_read::{FilteredReadExec, FilteredReadOptions};
82use crate::io::exec::fts::{BoostQueryExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec};
83use crate::io::exec::knn::MultivectorScoringExec;
84use crate::io::exec::scalar_index::{MaterializeIndexExec, ScalarIndexExec};
85use crate::io::exec::{get_physical_optimizer, AddRowOffsetExec, LanceFilterExec, LanceScanConfig};
86use crate::io::exec::{
87 knn::new_knn_exec, project, AddRowAddrExec, FilterPlan, KNNVectorDistanceExec,
88 LancePushdownScanExec, LanceScanExec, Planner, PreFilterSource, ScanConfig, TakeExec,
89};
90use crate::{datatypes::Schema, io::exec::fts::BooleanQueryExec};
91use crate::{Error, Result};
92
93use snafu::location;
94
95pub use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts};
96#[cfg(feature = "substrait")]
97use lance_datafusion::substrait::parse_substrait;
98
99pub(crate) const BATCH_SIZE_FALLBACK: usize = 8192;
100pub fn get_default_batch_size() -> Option<usize> {
103 std::env::var("LANCE_DEFAULT_BATCH_SIZE")
104 .map(|val| Some(val.parse().unwrap()))
105 .unwrap_or(None)
106}
107
108pub const LEGACY_DEFAULT_FRAGMENT_READAHEAD: usize = 4;
109
110pub static DEFAULT_FRAGMENT_READAHEAD: LazyLock<Option<usize>> = LazyLock::new(|| {
111 std::env::var("LANCE_DEFAULT_FRAGMENT_READAHEAD")
112 .map(|val| Some(val.parse().unwrap()))
113 .unwrap_or(None)
114});
115
116pub static DEFAULT_XTR_OVERFETCH: LazyLock<u32> = LazyLock::new(|| {
117 std::env::var("LANCE_XTR_OVERFETCH")
118 .map(|val| val.parse().unwrap())
119 .unwrap_or(10)
120});
121
122pub static DEFAULT_IO_BUFFER_SIZE: LazyLock<u64> = LazyLock::new(|| {
126 std::env::var("LANCE_DEFAULT_IO_BUFFER_SIZE")
127 .map(|val| val.parse().unwrap())
128 .unwrap_or(2 * 1024 * 1024 * 1024)
129});
130
131#[derive(Debug, Clone)]
136pub struct ColumnOrdering {
137 pub ascending: bool,
138 pub nulls_first: bool,
139 pub column_name: String,
140}
141
142impl ColumnOrdering {
143 pub fn asc_nulls_first(column_name: String) -> Self {
144 Self {
145 ascending: true,
146 nulls_first: true,
147 column_name,
148 }
149 }
150
151 pub fn asc_nulls_last(column_name: String) -> Self {
152 Self {
153 ascending: true,
154 nulls_first: false,
155 column_name,
156 }
157 }
158
159 pub fn desc_nulls_first(column_name: String) -> Self {
160 Self {
161 ascending: false,
162 nulls_first: true,
163 column_name,
164 }
165 }
166
167 pub fn desc_nulls_last(column_name: String) -> Self {
168 Self {
169 ascending: false,
170 nulls_first: false,
171 column_name,
172 }
173 }
174}
175
176#[derive(Clone)]
190pub enum MaterializationStyle {
191 Heuristic,
203 AllLate,
205 AllEarly,
207 AllEarlyExcept(Vec<u32>),
209}
210
211impl MaterializationStyle {
212 pub fn all_early_except(columns: &[impl AsRef<str>], schema: &Schema) -> Result<Self> {
213 let field_ids = schema
214 .project(columns)?
215 .field_ids()
216 .into_iter()
217 .map(|id| id as u32)
218 .collect();
219 Ok(Self::AllEarlyExcept(field_ids))
220 }
221}
222
223#[derive(Debug)]
224struct PlannedFilteredScan {
225 plan: Arc<dyn ExecutionPlan>,
226 limit_pushed_down: bool,
227 filter_pushed_down: bool,
228}
229
230#[derive(Debug, Clone)]
232pub enum LanceFilter {
233 Sql(String),
235 Substrait(Vec<u8>),
237 Datafusion(Expr),
239}
240
241impl LanceFilter {
242 #[allow(unused)]
251 #[instrument(level = "trace", name = "filter_to_df", skip_all)]
252 pub fn to_datafusion(&self, dataset_schema: &Schema, full_schema: &Schema) -> Result<Expr> {
253 match self {
254 Self::Sql(sql) => {
255 let schema = Arc::new(ArrowSchema::from(full_schema));
256 let planner = Planner::new(schema.clone());
257 let filter = planner.parse_filter(sql)?;
258
259 let df_schema = DFSchema::try_from(schema)?;
260 let (ret_type, _) = filter.data_type_and_nullable(&df_schema)?;
261 if ret_type != DataType::Boolean {
262 return Err(Error::InvalidInput {
263 source: format!("The filter {} does not return a boolean", filter).into(),
264 location: location!(),
265 });
266 }
267
268 let optimized = planner.optimize_expr(filter).map_err(|e| {
269 Error::invalid_input(
270 format!("Error optimizing sql filter: {sql} ({e})"),
271 location!(),
272 )
273 })?;
274 Ok(optimized)
275 }
276 #[cfg(feature = "substrait")]
277 Self::Substrait(expr) => {
278 use lance_datafusion::exec::{get_session_context, LanceExecutionOptions};
279
280 let ctx = get_session_context(&LanceExecutionOptions::default());
281 let state = ctx.state();
282 let schema = Arc::new(ArrowSchema::from(dataset_schema));
283 let expr = parse_substrait(expr, schema.clone(), &ctx.state())
284 .now_or_never()
285 .expect("could not parse the Substrait filter in a synchronous fashion")?;
286 let planner = Planner::new(schema);
287 planner.optimize_expr(expr.clone()).map_err(|e| {
288 Error::invalid_input(
289 format!("Error optimizing substrait filter: {expr:?} ({e})"),
290 location!(),
291 )
292 })
293 }
294 #[cfg(not(feature = "substrait"))]
295 Self::Substrait(_) => {
296 panic!("Substrait filter is not supported in this build");
297 }
298 Self::Datafusion(expr) => Ok(expr.clone()),
299 }
300 }
301}
302
303#[derive(Clone)]
317pub struct Scanner {
318 dataset: Arc<Dataset>,
319
320 projection_plan: ProjectionPlan,
327
328 prefilter: bool,
330
331 materialization_style: MaterializationStyle,
333
334 filter: Option<LanceFilter>,
336
337 full_text_query: Option<FullTextSearchQuery>,
339
340 batch_size: Option<usize>,
342
343 batch_readahead: usize,
345
346 fragment_readahead: Option<usize>,
348
349 io_buffer_size: Option<u64>,
351
352 limit: Option<i64>,
353 offset: Option<i64>,
354
355 ordering: Option<Vec<ColumnOrdering>>,
364
365 nearest: Option<Query>,
366
367 use_scalar_index: bool,
373
374 use_stats: bool,
378
379 ordered: bool,
383
384 fragments: Option<Vec<Fragment>>,
386
387 fast_search: bool,
394
395 include_deleted_rows: bool,
397
398 scan_stats_callback: Option<ExecutionStatsCallback>,
400
401 strict_batch_size: bool,
406
407 file_reader_options: Option<FileReaderOptions>,
409
410 legacy_with_row_id: bool,
431 legacy_with_row_addr: bool,
433 explicit_projection: bool,
436 autoproject_scoring_columns: bool,
438}
439
440#[derive(Debug, Clone)]
442pub enum TakeOperation {
443 RowIds(Vec<u64>),
445 RowAddrs(Vec<u64>),
447 RowOffsets(Vec<u64>),
452}
453
454impl TakeOperation {
455 fn extract_u64_list(list: &[Expr]) -> Option<Vec<u64>> {
456 let mut u64s = Vec::with_capacity(list.len());
457 for expr in list {
458 if let Expr::Literal(lit, _) = expr {
459 if let Some(ScalarValue::UInt64(Some(val))) =
460 safe_coerce_scalar(lit, &DataType::UInt64)
461 {
462 u64s.push(val);
463 } else {
464 return None;
465 }
466 } else {
467 return None;
468 }
469 }
470 Some(u64s)
471 }
472
473 fn merge(self, other: Self) -> Option<Self> {
474 match (self, other) {
475 (Self::RowIds(mut left), Self::RowIds(right)) => {
476 left.extend(right);
477 Some(Self::RowIds(left))
478 }
479 (Self::RowAddrs(mut left), Self::RowAddrs(right)) => {
480 left.extend(right);
481 Some(Self::RowAddrs(left))
482 }
483 (Self::RowOffsets(mut left), Self::RowOffsets(right)) => {
484 left.extend(right);
485 Some(Self::RowOffsets(left))
486 }
487 _ => None,
488 }
489 }
490
491 fn try_from_expr(expr: &Expr) -> Option<(Self, Option<Expr>)> {
509 if let Expr::BinaryExpr(binary) = expr {
510 match binary.op {
511 datafusion_expr::Operator::And => {
512 let left_take = Self::try_from_expr(&binary.left);
513 let right_take = Self::try_from_expr(&binary.right);
514 match (left_take, right_take) {
515 (Some(_), Some(_)) => {
516 return None;
522 }
523 (Some((left_op, left_rem)), None) => {
524 let remainder = match left_rem {
525 Some(expr) => Expr::and(expr, binary.right.as_ref().clone()),
529 None => binary.right.as_ref().clone(),
530 };
531 return Some((left_op, Some(remainder)));
532 }
533 (None, Some((right_op, right_rem))) => {
534 let remainder = match right_rem {
535 Some(expr) => Expr::and(expr, binary.left.as_ref().clone()),
536 None => binary.left.as_ref().clone(),
537 };
538 return Some((right_op, Some(remainder)));
539 }
540 (None, None) => {
541 return None;
542 }
543 }
544 }
545 datafusion_expr::Operator::Eq => {
546 if let (Expr::Column(col), Expr::Literal(lit, _)) =
548 (binary.left.as_ref(), binary.right.as_ref())
549 {
550 if let Some(ScalarValue::UInt64(Some(val))) =
551 safe_coerce_scalar(lit, &DataType::UInt64)
552 {
553 if col.name == ROW_ID {
554 return Some((Self::RowIds(vec![val]), None));
555 } else if col.name == ROW_ADDR {
556 return Some((Self::RowAddrs(vec![val]), None));
557 } else if col.name == ROW_OFFSET {
558 return Some((Self::RowOffsets(vec![val]), None));
559 }
560 }
561 }
562 }
563 datafusion_expr::Operator::Or => {
564 let left_take = Self::try_from_expr(&binary.left);
565 let right_take = Self::try_from_expr(&binary.right);
566 if let (Some(left), Some(right)) = (left_take, right_take) {
567 if left.1.is_some() || right.1.is_some() {
568 return None;
575 }
576 return left.0.merge(right.0).map(|op| (op, None));
577 }
578 }
579 _ => {}
580 }
581 } else if let Expr::InList(in_expr) = expr {
582 if let Expr::Column(col) = in_expr.expr.as_ref() {
583 if let Some(u64s) = Self::extract_u64_list(&in_expr.list) {
584 if col.name == ROW_ID {
585 return Some((Self::RowIds(u64s), None));
586 } else if col.name == ROW_ADDR {
587 return Some((Self::RowAddrs(u64s), None));
588 } else if col.name == ROW_OFFSET {
589 return Some((Self::RowOffsets(u64s), None));
590 }
591 }
592 }
593 }
594 None
595 }
596}
597
598impl Scanner {
599 pub fn new(dataset: Arc<Dataset>) -> Self {
600 let projection_plan = ProjectionPlan::full(dataset.clone()).unwrap();
601 let file_reader_options = dataset.file_reader_options.clone();
602 Self {
603 dataset,
604 projection_plan,
605 prefilter: false,
606 materialization_style: MaterializationStyle::Heuristic,
607 filter: None,
608 full_text_query: None,
609 batch_size: None,
610 batch_readahead: get_num_compute_intensive_cpus(),
611 fragment_readahead: None,
612 io_buffer_size: None,
613 limit: None,
614 offset: None,
615 ordering: None,
616 nearest: None,
617 use_stats: true,
618 ordered: true,
619 fragments: None,
620 fast_search: false,
621 use_scalar_index: true,
622 include_deleted_rows: false,
623 scan_stats_callback: None,
624 strict_batch_size: false,
625 file_reader_options,
626 legacy_with_row_addr: false,
627 legacy_with_row_id: false,
628 explicit_projection: false,
629 autoproject_scoring_columns: true,
630 }
631 }
632
633 pub fn from_fragment(dataset: Arc<Dataset>, fragment: Fragment) -> Self {
634 Self {
635 fragments: Some(vec![fragment]),
636 ..Self::new(dataset)
637 }
638 }
639
640 pub fn with_fragments(&mut self, fragments: Vec<Fragment>) -> &mut Self {
644 self.fragments = Some(fragments);
645 self
646 }
647
648 fn get_batch_size(&self) -> usize {
649 get_default_batch_size().unwrap_or_else(|| {
655 self.batch_size.unwrap_or_else(|| {
656 std::cmp::max(
657 self.dataset.object_store().block_size() / 4,
658 BATCH_SIZE_FALLBACK,
659 )
660 })
661 })
662 }
663
664 fn ensure_not_fragment_scan(&self) -> Result<()> {
665 if self.is_fragment_scan() {
666 Err(Error::io(
667 "This operation is not supported for fragment scan".to_string(),
668 location!(),
669 ))
670 } else {
671 Ok(())
672 }
673 }
674
675 fn is_fragment_scan(&self) -> bool {
676 self.fragments.is_some()
677 }
678
679 pub fn empty_project(&mut self) -> Result<&mut Self> {
683 self.project(&[] as &[&str])
684 }
685
686 pub fn project<T: AsRef<str>>(&mut self, columns: &[T]) -> Result<&mut Self> {
690 let transformed_columns: Vec<(&str, String)> = columns
691 .iter()
692 .map(|c| (c.as_ref(), escape_field_path_for_project(c.as_ref())))
693 .collect();
694
695 self.project_with_transform(&transformed_columns)
696 }
697
698 pub fn project_with_transform(
702 &mut self,
703 columns: &[(impl AsRef<str>, impl AsRef<str>)],
704 ) -> Result<&mut Self> {
705 self.explicit_projection = true;
706 self.projection_plan = ProjectionPlan::from_expressions(self.dataset.clone(), columns)?;
707 if self.legacy_with_row_id {
708 self.projection_plan.include_row_id();
709 }
710 if self.legacy_with_row_addr {
711 self.projection_plan.include_row_addr();
712 }
713 Ok(self)
714 }
715
716 pub fn prefilter(&mut self, should_prefilter: bool) -> &mut Self {
725 self.prefilter = should_prefilter;
726 self
727 }
728
729 pub fn scan_stats_callback(&mut self, callback: ExecutionStatsCallback) -> &mut Self {
731 self.scan_stats_callback = Some(callback);
732 self
733 }
734
735 pub fn materialization_style(&mut self, style: MaterializationStyle) -> &mut Self {
747 self.materialization_style = style;
748 self
749 }
750
751 pub fn filter(&mut self, filter: &str) -> Result<&mut Self> {
767 self.filter = Some(LanceFilter::Sql(filter.to_string()));
768 Ok(self)
769 }
770
771 pub fn full_text_search(&mut self, query: FullTextSearchQuery) -> Result<&mut Self> {
785 let fields = query.columns();
786 if !fields.is_empty() {
787 for field in fields.iter() {
788 if self.dataset.schema().field(field).is_none() {
789 return Err(Error::invalid_input(
790 format!("Column {} not found", field),
791 location!(),
792 ));
793 }
794 }
795 }
796
797 self.full_text_query = Some(query);
798 Ok(self)
799 }
800
801 pub fn filter_substrait(&mut self, filter: &[u8]) -> Result<&mut Self> {
806 self.filter = Some(LanceFilter::Substrait(filter.to_vec()));
807 Ok(self)
808 }
809
810 pub fn filter_expr(&mut self, filter: Expr) -> &mut Self {
811 self.filter = Some(LanceFilter::Datafusion(filter));
812 self
813 }
814
815 pub fn batch_size(&mut self, batch_size: usize) -> &mut Self {
817 self.batch_size = Some(batch_size);
818 self
819 }
820
821 pub fn include_deleted_rows(&mut self) -> &mut Self {
832 self.include_deleted_rows = true;
833 self
834 }
835
836 pub fn io_buffer_size(&mut self, size: u64) -> &mut Self {
855 self.io_buffer_size = Some(size);
856 self
857 }
858
859 pub fn batch_readahead(&mut self, nbatches: usize) -> &mut Self {
862 self.batch_readahead = nbatches;
863 self
864 }
865
866 pub fn fragment_readahead(&mut self, nfragments: usize) -> &mut Self {
870 self.fragment_readahead = Some(nfragments);
871 self
872 }
873
874 pub fn scan_in_order(&mut self, ordered: bool) -> &mut Self {
888 self.ordered = ordered;
889 self
890 }
891
892 pub fn use_scalar_index(&mut self, use_scalar_index: bool) -> &mut Self {
898 self.use_scalar_index = use_scalar_index;
899 self
900 }
901
902 pub fn strict_batch_size(&mut self, strict_batch_size: bool) -> &mut Self {
909 self.strict_batch_size = strict_batch_size;
910 self
911 }
912
913 pub fn limit(&mut self, limit: Option<i64>, offset: Option<i64>) -> Result<&mut Self> {
920 if limit.unwrap_or_default() < 0 {
921 return Err(Error::invalid_input(
922 "Limit must be non-negative".to_string(),
923 location!(),
924 ));
925 }
926 if let Some(off) = offset {
927 if off < 0 {
928 return Err(Error::invalid_input(
929 "Offset must be non-negative".to_string(),
930 location!(),
931 ));
932 }
933 }
934 self.limit = limit;
935 self.offset = offset;
936 Ok(self)
937 }
938
939 pub fn nearest(&mut self, column: &str, q: &dyn Array, k: usize) -> Result<&mut Self> {
943 if !self.prefilter {
944 self.ensure_not_fragment_scan()?;
947 }
948
949 if k == 0 {
950 return Err(Error::invalid_input(
951 "k must be positive".to_string(),
952 location!(),
953 ));
954 }
955 if q.is_empty() {
956 return Err(Error::invalid_input(
957 "Query vector must have non-zero length".to_string(),
958 location!(),
959 ));
960 }
961 let (vector_type, element_type) = get_vector_type(self.dataset.schema(), column)?;
963 let dim = get_vector_dim(self.dataset.schema(), column)?;
964
965 let q = match q.data_type() {
966 DataType::List(_) | DataType::FixedSizeList(_, _) => {
967 if !matches!(vector_type, DataType::List(_)) {
968 return Err(Error::invalid_input(
969 format!(
970 "Query is multivector but column {}({})is not multivector",
971 column, vector_type,
972 ),
973 location!(),
974 ));
975 }
976
977 if let Some(list_array) = q.as_list_opt::<i32>() {
978 for i in 0..list_array.len() {
979 let vec = list_array.value(i);
980 if vec.len() != dim {
981 return Err(Error::invalid_input(
982 format!(
983 "query dim({}) doesn't match the column {} vector dim({})",
984 vec.len(),
985 column,
986 dim,
987 ),
988 location!(),
989 ));
990 }
991 }
992 list_array.values().clone()
993 } else {
994 let fsl = q.as_fixed_size_list();
995 if fsl.value_length() as usize != dim {
996 return Err(Error::invalid_input(
997 format!(
998 "query dim({}) doesn't match the column {} vector dim({})",
999 fsl.value_length(),
1000 column,
1001 dim,
1002 ),
1003 location!(),
1004 ));
1005 }
1006 fsl.values().clone()
1007 }
1008 }
1009 _ => {
1010 if q.len() != dim {
1011 return Err(Error::invalid_input(
1012 format!(
1013 "query dim({}) doesn't match the column {} vector dim({})",
1014 q.len(),
1015 column,
1016 dim,
1017 ),
1018 location!(),
1019 ));
1020 }
1021 q.slice(0, q.len())
1022 }
1023 };
1024
1025 let key = match element_type {
1026 dt if dt == *q.data_type() => q,
1027 dt if dt.is_floating() => coerce_float_vector(
1028 q.as_any().downcast_ref::<Float32Array>().unwrap(),
1029 FloatType::try_from(&dt)?,
1030 )?,
1031 _ => {
1032 return Err(Error::invalid_input(
1033 format!(
1034 "Column {} has element type {} and the query vector is {}",
1035 column,
1036 element_type,
1037 q.data_type(),
1038 ),
1039 location!(),
1040 ));
1041 }
1042 };
1043
1044 self.nearest = Some(Query {
1045 column: column.to_string(),
1046 key,
1047 k,
1048 lower_bound: None,
1049 upper_bound: None,
1050 minimum_nprobes: 20,
1051 maximum_nprobes: None,
1052 ef: None,
1053 refine_factor: None,
1054 metric_type: MetricType::L2,
1055 use_index: true,
1056 dist_q_c: 0.0,
1057 });
1058 Ok(self)
1059 }
1060
1061 #[cfg(test)]
1062 fn nearest_mut(&mut self) -> Option<&mut Query> {
1063 self.nearest.as_mut()
1064 }
1065
1066 pub fn distance_range(
1068 &mut self,
1069 lower_bound: Option<f32>,
1070 upper_bound: Option<f32>,
1071 ) -> &mut Self {
1072 if let Some(q) = self.nearest.as_mut() {
1073 q.lower_bound = lower_bound;
1074 q.upper_bound = upper_bound;
1075 }
1076 self
1077 }
1078
1079 pub fn nprobs(&mut self, n: usize) -> &mut Self {
1084 if let Some(q) = self.nearest.as_mut() {
1085 q.minimum_nprobes = n;
1086 q.maximum_nprobes = Some(n);
1087 } else {
1088 log::warn!("nprobes is not set because nearest has not been called yet");
1089 }
1090 self
1091 }
1092
1093 pub fn minimum_nprobes(&mut self, n: usize) -> &mut Self {
1099 if let Some(q) = self.nearest.as_mut() {
1100 q.minimum_nprobes = n;
1101 } else {
1102 log::warn!("minimum_nprobes is not set because nearest has not been called yet");
1103 }
1104 self
1105 }
1106
1107 pub fn maximum_nprobes(&mut self, n: usize) -> &mut Self {
1119 if let Some(q) = self.nearest.as_mut() {
1120 q.maximum_nprobes = Some(n);
1121 } else {
1122 log::warn!("maximum_nprobes is not set because nearest has not been called yet");
1123 }
1124 self
1125 }
1126
1127 pub fn ef(&mut self, ef: usize) -> &mut Self {
1128 if let Some(q) = self.nearest.as_mut() {
1129 q.ef = Some(ef);
1130 }
1131 self
1132 }
1133
1134 pub fn fast_search(&mut self) -> &mut Self {
1140 if let Some(q) = self.nearest.as_mut() {
1141 q.use_index = true;
1142 }
1143 self.fast_search = true;
1144 self.projection_plan.include_row_id(); self
1146 }
1147
1148 pub fn refine(&mut self, factor: u32) -> &mut Self {
1158 if let Some(q) = self.nearest.as_mut() {
1159 q.refine_factor = Some(factor)
1160 };
1161 self
1162 }
1163
1164 pub fn distance_metric(&mut self, metric_type: MetricType) -> &mut Self {
1166 if let Some(q) = self.nearest.as_mut() {
1167 q.metric_type = metric_type
1168 }
1169 self
1170 }
1171
1172 pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> Result<&mut Self> {
1178 if let Some(ordering) = &ordering {
1179 if ordering.is_empty() {
1180 self.ordering = None;
1181 return Ok(self);
1182 }
1183 for column in ordering {
1185 self.dataset
1186 .schema()
1187 .field(&column.column_name)
1188 .ok_or(Error::invalid_input(
1189 format!("Column {} not found", &column.column_name),
1190 location!(),
1191 ))?;
1192 }
1193 }
1194 self.ordering = ordering;
1195 Ok(self)
1196 }
1197
1198 pub fn use_index(&mut self, use_index: bool) -> &mut Self {
1200 if let Some(q) = self.nearest.as_mut() {
1201 q.use_index = use_index
1202 }
1203 self
1204 }
1205
1206 pub fn with_row_id(&mut self) -> &mut Self {
1208 self.legacy_with_row_id = true;
1209 self.projection_plan.include_row_id();
1210 self
1211 }
1212
1213 pub fn with_row_address(&mut self) -> &mut Self {
1215 self.legacy_with_row_addr = true;
1216 self.projection_plan.include_row_addr();
1217 self
1218 }
1219
1220 pub fn disable_scoring_autoprojection(&mut self) -> &mut Self {
1236 self.autoproject_scoring_columns = false;
1237 self
1238 }
1239
1240 pub fn with_file_reader_options(&mut self, options: FileReaderOptions) -> &mut Self {
1242 self.file_reader_options = Some(options);
1243 self
1244 }
1245
1246 fn create_column_expr(
1248 column_name: &str,
1249 dataset: &Dataset,
1250 arrow_schema: &ArrowSchema,
1251 ) -> Result<Arc<dyn PhysicalExpr>> {
1252 let lance_schema = dataset.schema();
1253 let field_path = lance_schema.resolve(column_name).ok_or_else(|| {
1254 Error::invalid_input(
1255 format!("Field '{}' not found in schema", column_name),
1256 location!(),
1257 )
1258 })?;
1259
1260 if field_path.len() == 1 {
1261 expressions::col(&field_path[0].name, arrow_schema).map_err(|e| Error::Internal {
1263 message: format!(
1264 "Failed to create column expression for '{}': {}",
1265 column_name, e
1266 ),
1267 location: location!(),
1268 })
1269 } else {
1270 let get_field_func = ScalarUDF::from(GetFieldFunc::default());
1272
1273 let mut expr = col(&field_path[0].name);
1274 for nested_field in &field_path[1..] {
1275 expr = get_field_func.call(vec![expr, lit(&nested_field.name)]);
1276 }
1277
1278 let df_schema = Arc::new(DFSchema::try_from(arrow_schema.clone())?);
1280 let execution_props = ExecutionProps::new().with_query_execution_start_time(Utc::now());
1281 create_physical_expr(&expr, &df_schema, &execution_props).map_err(|e| Error::Internal {
1282 message: format!(
1283 "Failed to create physical expression for nested field '{}': {}",
1284 column_name, e
1285 ),
1286 location: location!(),
1287 })
1288 }
1289 }
1290
1291 pub fn use_stats(&mut self, use_stats: bool) -> &mut Self {
1295 self.use_stats = use_stats;
1296 self
1297 }
1298
1299 pub async fn schema(&self) -> Result<SchemaRef> {
1301 let plan = self.create_plan().await?;
1302 Ok(plan.schema())
1303 }
1304
1305 pub fn get_filter(&self) -> Result<Option<Expr>> {
1312 if let Some(filter) = &self.filter {
1313 let filter_schema = self.filterable_schema()?;
1314 Ok(Some(filter.to_datafusion(
1315 self.dataset.schema(),
1316 filter_schema.as_ref(),
1317 )?))
1318 } else {
1319 Ok(None)
1320 }
1321 }
1322
1323 fn add_extra_columns(&self, schema: Schema) -> Result<Schema> {
1324 let mut extra_columns = vec![ArrowField::new(ROW_OFFSET, DataType::UInt64, true)];
1325
1326 if self.nearest.as_ref().is_some() {
1327 extra_columns.push(ArrowField::new(DIST_COL, DataType::Float32, true));
1328 };
1329
1330 if self.full_text_query.is_some() {
1331 extra_columns.push(ArrowField::new(SCORE_COL, DataType::Float32, true));
1332 }
1333
1334 schema.merge(&ArrowSchema::new(extra_columns))
1335 }
1336
1337 fn filterable_schema(&self) -> Result<Arc<Schema>> {
1342 let base_schema = Projection::full(self.dataset.clone())
1343 .with_row_id()
1344 .with_row_addr()
1345 .with_row_last_updated_at_version()
1346 .with_row_created_at_version()
1347 .to_schema();
1348
1349 Ok(Arc::new(self.add_extra_columns(base_schema)?))
1350 }
1351
1352 pub(crate) fn calculate_final_projection(
1357 &self,
1358 current_schema: &ArrowSchema,
1359 ) -> Result<Vec<(Arc<dyn PhysicalExpr>, String)>> {
1360 let mut output_expr = self.projection_plan.to_physical_exprs(current_schema)?;
1363
1364 if self.autoproject_scoring_columns {
1367 if self.nearest.is_some() && output_expr.iter().all(|(_, name)| name != DIST_COL) {
1368 if self.explicit_projection {
1369 log::warn!("Deprecation warning, this behavior will change in the future. This search specified output columns but did not include `_distance`. Currently the `_distance` column will be included. In the future it will not. Call `disable_scoring_autoprojection` to to adopt the future behavior and avoid this warning");
1370 }
1371 let vector_expr = expressions::col(DIST_COL, current_schema)?;
1372 output_expr.push((vector_expr, DIST_COL.to_string()));
1373 }
1374 if self.full_text_query.is_some()
1375 && output_expr.iter().all(|(_, name)| name != SCORE_COL)
1376 {
1377 if self.explicit_projection {
1378 log::warn!("Deprecation warning, this behavior will change in the future. This search specified output columns but did not include `_score`. Currently the `_score` column will be included. In the future it will not. Call `disable_scoring_autoprojection` to adopt the future behavior and avoid this warning");
1379 }
1380 let score_expr = expressions::col(SCORE_COL, current_schema)?;
1381 output_expr.push((score_expr, SCORE_COL.to_string()));
1382 }
1383 }
1384
1385 if self.legacy_with_row_id {
1386 let row_id_pos = output_expr
1387 .iter()
1388 .position(|(_, name)| name == ROW_ID)
1389 .ok_or_else(|| Error::Internal {
1390 message:
1391 "user specified with_row_id but the _rowid column was not in the output"
1392 .to_string(),
1393 location: location!(),
1394 })?;
1395 if row_id_pos != output_expr.len() - 1 {
1396 let row_id_expr = output_expr.remove(row_id_pos);
1398 output_expr.push(row_id_expr);
1399 }
1400 }
1401
1402 if self.legacy_with_row_addr {
1403 let row_addr_pos = output_expr.iter().position(|(_, name)| name == ROW_ADDR).ok_or_else(|| {
1404 Error::Internal {
1405 message: "user specified with_row_address but the _rowaddr column was not in the output".to_string(),
1406 location: location!(),
1407 }
1408 })?;
1409 if row_addr_pos != output_expr.len() - 1 {
1410 let row_addr_expr = output_expr.remove(row_addr_pos);
1412 output_expr.push(row_addr_expr);
1413 }
1414 }
1415
1416 Ok(output_expr)
1417 }
1418
1419 #[instrument(skip_all)]
1421 pub fn try_into_stream(&self) -> BoxFuture<'_, Result<DatasetRecordBatchStream>> {
1422 async move {
1424 let plan = self.create_plan().await?;
1425
1426 Ok(DatasetRecordBatchStream::new(execute_plan(
1427 plan,
1428 LanceExecutionOptions {
1429 batch_size: self.batch_size,
1430 execution_stats_callback: self.scan_stats_callback.clone(),
1431 ..Default::default()
1432 },
1433 )?))
1434 }
1435 .boxed()
1436 }
1437
1438 pub(crate) async fn try_into_dfstream(
1439 &self,
1440 mut options: LanceExecutionOptions,
1441 ) -> Result<SendableRecordBatchStream> {
1442 let plan = self.create_plan().await?;
1443
1444 if options.execution_stats_callback.is_none() {
1446 options.execution_stats_callback = self.scan_stats_callback.clone();
1447 }
1448
1449 execute_plan(plan, options)
1450 }
1451
1452 pub async fn try_into_batch(&self) -> Result<RecordBatch> {
1453 let stream = self.try_into_stream().await?;
1454 let schema = stream.schema();
1455 let batches = stream.try_collect::<Vec<_>>().await?;
1456 Ok(concat_batches(&schema, &batches)?)
1457 }
1458
1459 fn create_count_plan(&self) -> BoxFuture<'_, Result<Arc<dyn ExecutionPlan>>> {
1460 async move {
1462 if self.projection_plan.physical_projection.is_empty() {
1463 return Err(Error::invalid_input("count_rows called but with_row_id is false".to_string(), location!()));
1464 }
1465 if !self.projection_plan.physical_projection.is_metadata_only() {
1466 let physical_schema = self.projection_plan.physical_projection.to_schema();
1467 let columns: Vec<&str> = physical_schema.fields
1468 .iter()
1469 .map(|field| field.name.as_str())
1470 .collect();
1471
1472 let msg = format!(
1473 "count_rows should not be called on a plan selecting columns. selected columns: [{}]",
1474 columns.join(", ")
1475 );
1476
1477 return Err(Error::invalid_input(msg, location!()));
1478 }
1479
1480 if self.limit.is_some() || self.offset.is_some() {
1481 log::warn!(
1482 "count_rows called with limit or offset which could have surprising results"
1483 );
1484 }
1485
1486 let plan = self.create_plan().await?;
1487 let one = Arc::new(Literal::new(ScalarValue::UInt8(Some(1))));
1489
1490 let input_phy_exprs: &[Arc<dyn PhysicalExpr>] = &[one];
1491 let schema = plan.schema();
1492
1493 let mut builder = AggregateExprBuilder::new(count_udaf(), input_phy_exprs.to_vec());
1494 builder = builder.schema(schema);
1495 builder = builder.alias("count_rows".to_string());
1496
1497 let count_expr = builder.build()?;
1498
1499 let plan_schema = plan.schema();
1500 Ok(Arc::new(AggregateExec::try_new(
1501 AggregateMode::Single,
1502 PhysicalGroupBy::new_single(Vec::new()),
1503 vec![Arc::new(count_expr)],
1504 vec![None],
1505 plan,
1506 plan_schema,
1507 )?) as Arc<dyn ExecutionPlan>)
1508 }
1509 .boxed()
1510 }
1511
1512 #[instrument(skip_all)]
1517 pub fn count_rows(&self) -> BoxFuture<'_, Result<u64>> {
1518 async move {
1520 let count_plan = self.create_count_plan().await?;
1521 let mut stream = execute_plan(count_plan, LanceExecutionOptions::default())?;
1522
1523 if let Some(first_batch) = stream.next().await {
1525 let batch = first_batch?;
1526 let array = batch
1527 .column(0)
1528 .as_any()
1529 .downcast_ref::<Int64Array>()
1530 .ok_or(Error::io(
1531 "Count plan did not return a UInt64Array".to_string(),
1532 location!(),
1533 ))?;
1534 Ok(array.value(0) as u64)
1535 } else {
1536 Ok(0)
1537 }
1538 }
1539 .boxed()
1540 }
1541
1542 fn is_early_field(&self, field: &Field) -> bool {
1563 match self.materialization_style {
1564 MaterializationStyle::AllEarly => true,
1565 MaterializationStyle::AllLate => false,
1566 MaterializationStyle::AllEarlyExcept(ref cols) => !cols.contains(&(field.id as u32)),
1567 MaterializationStyle::Heuristic => {
1568 if field.is_blob() {
1569 return true;
1574 }
1575
1576 let byte_width = field.data_type().byte_width_opt();
1577 let is_cloud = self.dataset.object_store().is_cloud();
1578 if is_cloud {
1579 byte_width.is_some_and(|bw| bw < 1000)
1580 } else {
1581 byte_width.is_some_and(|bw| bw < 10)
1582 }
1583 }
1584 }
1585 }
1586
1587 fn calc_eager_projection(
1592 &self,
1593 filter_plan: &FilterPlan,
1594 desired_projection: &Projection,
1595 ) -> Result<Projection> {
1596 let filter_columns = filter_plan.all_columns();
1603
1604 let filter_schema = self
1605 .dataset
1606 .empty_projection()
1607 .union_columns(filter_columns, OnMissing::Error)?
1608 .into_schema();
1609 if filter_schema.fields.iter().any(|f| !f.is_default_storage()) {
1610 return Err(Error::NotSupported {
1611 source: "non-default storage columns cannot be used as filters".into(),
1612 location: location!(),
1613 });
1614 }
1615
1616 Ok(desired_projection
1618 .clone()
1619 .subtract_predicate(|f| !self.is_early_field(f))
1621 .union_schema(&filter_schema))
1623 }
1624
1625 fn validate_options(&self) -> Result<()> {
1626 if self.include_deleted_rows && !self.projection_plan.physical_projection.with_row_id {
1627 return Err(Error::InvalidInput {
1628 source: "include_deleted_rows is set but with_row_id is false".into(),
1629 location: location!(),
1630 });
1631 }
1632
1633 Ok(())
1634 }
1635
1636 async fn create_filter_plan(&self, use_scalar_index: bool) -> Result<FilterPlan> {
1637 let filter_schema = self.filterable_schema()?;
1638 let planner = Planner::new(Arc::new(filter_schema.as_ref().into()));
1639
1640 if let Some(filter) = self.filter.as_ref() {
1641 let filter = filter.to_datafusion(self.dataset.schema(), filter_schema.as_ref())?;
1642 let index_info = self.dataset.scalar_index_info().await?;
1643 let filter_plan =
1644 planner.create_filter_plan(filter.clone(), &index_info, use_scalar_index)?;
1645
1646 if filter_plan.index_query.is_some() {
1649 let fragments = if let Some(fragments) = self.fragments.as_ref() {
1650 fragments
1651 } else {
1652 self.dataset.fragments()
1653 };
1654 let mut has_missing_row_count = false;
1655 for frag in fragments {
1656 if frag.physical_rows.is_none() {
1657 has_missing_row_count = true;
1658 break;
1659 }
1660 }
1661 if has_missing_row_count {
1662 Ok(planner.create_filter_plan(filter.clone(), &index_info, false)?)
1665 } else {
1666 Ok(filter_plan)
1667 }
1668 } else {
1669 Ok(filter_plan)
1670 }
1671 } else {
1672 Ok(FilterPlan::default())
1673 }
1674 }
1675
1676 async fn get_scan_range(&self, filter_plan: &FilterPlan) -> Result<Option<Range<u64>>> {
1677 if filter_plan.has_any_filter() {
1678 Ok(None)
1680 } else if self.ordering.is_some() {
1681 Ok(None)
1684 } else {
1685 match (self.limit, self.offset) {
1686 (None, None) => Ok(None),
1687 (Some(limit), None) => {
1688 let num_rows = self.dataset.count_all_rows().await? as i64;
1689 Ok(Some(0..limit.min(num_rows) as u64))
1690 }
1691 (None, Some(offset)) => {
1692 let num_rows = self.dataset.count_all_rows().await? as i64;
1693 Ok(Some(offset.min(num_rows) as u64..num_rows as u64))
1694 }
1695 (Some(limit), Some(offset)) => {
1696 let num_rows = self.dataset.count_all_rows().await? as i64;
1697 Ok(Some(
1698 offset.min(num_rows) as u64..(offset + limit).min(num_rows) as u64,
1699 ))
1700 }
1701 }
1702 }
1703 }
1704
1705 #[instrument(level = "debug", skip_all)]
1751 pub async fn create_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
1752 log::trace!("creating scanner plan");
1753 self.validate_options()?;
1754
1755 let use_scalar_index = self.use_scalar_index && (self.prefilter || self.nearest.is_none());
1757 let mut filter_plan = self.create_filter_plan(use_scalar_index).await?;
1758
1759 let mut use_limit_node = true;
1760 let mut plan: Arc<dyn ExecutionPlan> = match (&self.nearest, &self.full_text_query) {
1762 (Some(_), None) => self.vector_search_source(&mut filter_plan).await?,
1763 (None, Some(query)) => self.fts_search_source(&mut filter_plan, query).await?,
1764 (None, None) => {
1765 if self.projection_plan.has_output_cols()
1766 && self.projection_plan.physical_projection.is_empty()
1767 {
1768 let output_expr = self.calculate_final_projection(&ArrowSchema::empty())?;
1779 return Err(Error::NotSupported {
1780 source: format!("Scans must request at least one column. Received only dynamic expressions: {:?}", output_expr).into(),
1781 location: location!(),
1782 });
1783 }
1784
1785 let take_op = filter_plan
1786 .full_expr
1787 .as_ref()
1788 .and_then(TakeOperation::try_from_expr);
1789 if let Some((take_op, remainder)) = take_op {
1790 filter_plan = remainder
1793 .map(FilterPlan::new_refine_only)
1794 .unwrap_or(FilterPlan::default());
1795 self.take_source(take_op).await?
1796 } else {
1797 let planned_read = self.filtered_read_source(&mut filter_plan).await?;
1798 if planned_read.limit_pushed_down {
1799 use_limit_node = false;
1800 }
1801 if planned_read.filter_pushed_down {
1802 filter_plan = FilterPlan::default();
1803 }
1804 planned_read.plan
1805 }
1806 }
1807 _ => {
1808 return Err(Error::InvalidInput {
1809 source: "Cannot have both nearest and full text search".into(),
1810 location: location!(),
1811 })
1812 }
1813 };
1814
1815 let mut pre_filter_projection = self.dataset.empty_projection();
1818
1819 if filter_plan.has_refine() {
1822 pre_filter_projection = pre_filter_projection
1824 .union_columns(filter_plan.refine_columns(), OnMissing::Ignore)?;
1825 }
1826
1827 if let Some(ordering) = &self.ordering {
1832 pre_filter_projection = pre_filter_projection.union_columns(
1833 ordering.iter().map(|col| &col.column_name),
1834 OnMissing::Error,
1835 )?;
1836 }
1837
1838 plan = self.take(plan, pre_filter_projection)?;
1839
1840 if let Some(refine_expr) = filter_plan.refine_expr {
1842 plan = Arc::new(LanceFilterExec::try_new(refine_expr, plan)?);
1845 }
1846
1847 if let Some(ordering) = &self.ordering {
1849 let ordering_columns = ordering.iter().map(|col| &col.column_name);
1850 let projection_with_ordering = self
1851 .dataset
1852 .empty_projection()
1853 .union_columns(ordering_columns, OnMissing::Error)?;
1854 plan = self.take(plan, projection_with_ordering)?;
1856 let col_exprs = ordering
1857 .iter()
1858 .map(|col| {
1859 Ok(PhysicalSortExpr {
1860 expr: Self::create_column_expr(
1861 &col.column_name,
1862 &self.dataset,
1863 plan.schema().as_ref(),
1864 )?,
1865 options: SortOptions {
1866 descending: !col.ascending,
1867 nulls_first: col.nulls_first,
1868 },
1869 })
1870 })
1871 .collect::<Result<Vec<_>>>()?;
1872 plan = Arc::new(SortExec::new(
1873 LexOrdering::new(col_exprs)
1874 .ok_or(exec_datafusion_err!("Unexpected empty sort expressions"))?,
1875 plan,
1876 ));
1877 }
1878
1879 if use_limit_node && (self.limit.unwrap_or(0) > 0 || self.offset.is_some()) {
1881 plan = self.limit_node(plan);
1882 }
1883
1884 plan = self.take(plan, self.projection_plan.physical_projection.clone())?;
1886
1887 if self.projection_plan.must_add_row_offset {
1889 plan = Arc::new(AddRowOffsetExec::try_new(plan, self.dataset.clone()).await?);
1890 }
1891
1892 let final_projection = self.calculate_final_projection(plan.schema().as_ref())?;
1894
1895 plan = Arc::new(DFProjectionExec::try_new(final_projection, plan)?);
1896
1897 if self.strict_batch_size {
1899 plan = Arc::new(StrictBatchSizeExec::new(plan, self.get_batch_size()));
1900 }
1901
1902 let optimizer = get_physical_optimizer();
1903 let options = Default::default();
1904 for rule in optimizer.rules {
1905 plan = rule.optimize(plan, &options)?;
1906 }
1907
1908 Ok(plan)
1909 }
1910
1911 fn filter_references_version_columns(&self, filter_plan: &FilterPlan) -> bool {
1913 use lance_core::{ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION};
1914
1915 if let Some(refine_expr) = &filter_plan.refine_expr {
1916 let column_names = Planner::column_names_in_expr(refine_expr);
1917 for col_name in column_names {
1918 if col_name == ROW_CREATED_AT_VERSION || col_name == ROW_LAST_UPDATED_AT_VERSION {
1919 return true;
1920 }
1921 }
1922 }
1923 false
1924 }
1925
1926 async fn legacy_filtered_read(
1932 &self,
1933 filter_plan: &FilterPlan,
1934 projection: Projection,
1935 make_deletions_null: bool,
1936 fragments: Option<Arc<Vec<Fragment>>>,
1937 scan_range: Option<Range<u64>>,
1938 is_prefilter: bool,
1939 ) -> Result<PlannedFilteredScan> {
1940 let fragments = fragments.unwrap_or(self.dataset.fragments().clone());
1941 let mut filter_pushed_down = false;
1942
1943 let plan: Arc<dyn ExecutionPlan> = if filter_plan.has_index_query() {
1944 if self.include_deleted_rows {
1945 return Err(Error::InvalidInput {
1946 source: "Cannot include deleted rows in a scalar indexed scan".into(),
1947 location: location!(),
1948 });
1949 }
1950 self.scalar_indexed_scan(projection, filter_plan, fragments)
1951 .await
1952 } else if !is_prefilter
1953 && filter_plan.has_refine()
1954 && self.batch_size.is_none()
1955 && self.use_stats
1956 && !self.filter_references_version_columns(filter_plan)
1957 {
1958 filter_pushed_down = true;
1959 self.pushdown_scan(false, filter_plan)
1960 } else {
1961 let ordered = if self.ordering.is_some() || self.nearest.is_some() {
1962 false
1964 } else if projection.with_row_last_updated_at_version
1965 || projection.with_row_created_at_version
1966 {
1967 true
1970 } else {
1971 self.ordered
1972 };
1973
1974 let projection = if let Some(refine_expr) = filter_plan.refine_expr.as_ref() {
1975 if is_prefilter {
1976 let refine_cols = Planner::column_names_in_expr(refine_expr);
1977 projection.union_columns(refine_cols, OnMissing::Error)?
1978 } else {
1979 projection
1980 }
1981 } else {
1982 projection
1983 };
1984
1985 let scan_range = if filter_plan.has_refine() {
1987 None
1988 } else {
1989 scan_range
1990 };
1991
1992 let scan = self.scan_fragments(
1993 projection.with_row_id,
1994 self.projection_plan.physical_projection.with_row_addr,
1995 self.projection_plan
1996 .physical_projection
1997 .with_row_last_updated_at_version,
1998 self.projection_plan
1999 .physical_projection
2000 .with_row_created_at_version,
2001 make_deletions_null,
2002 Arc::new(projection.to_bare_schema()),
2003 fragments,
2004 scan_range,
2005 ordered,
2006 );
2007
2008 if filter_plan.has_refine() && is_prefilter {
2009 Ok(Arc::new(LanceFilterExec::try_new(
2010 filter_plan.refine_expr.clone().unwrap(),
2011 scan,
2012 )?) as Arc<dyn ExecutionPlan>)
2013 } else {
2014 Ok(scan)
2015 }
2016 }?;
2017 Ok(PlannedFilteredScan {
2018 plan,
2019 limit_pushed_down: false,
2020 filter_pushed_down,
2021 })
2022 }
2023
2024 async fn new_filtered_read(
2028 &self,
2029 filter_plan: &FilterPlan,
2030 projection: Projection,
2031 make_deletions_null: bool,
2032 fragments: Option<Arc<Vec<Fragment>>>,
2033 scan_range: Option<Range<u64>>,
2034 ) -> Result<Arc<dyn ExecutionPlan>> {
2035 let mut read_options = FilteredReadOptions::basic_full_read(&self.dataset)
2036 .with_filter_plan(filter_plan.clone())
2037 .with_projection(projection);
2038
2039 if let Some(fragments) = fragments {
2040 read_options = read_options.with_fragments(fragments);
2041 }
2042
2043 if let Some(scan_range) = scan_range {
2044 read_options = read_options.with_scan_range_before_filter(scan_range)?;
2045 }
2046
2047 if let Some(batch_size) = self.batch_size {
2048 read_options = read_options.with_batch_size(batch_size as u32);
2049 }
2050
2051 if let Some(fragment_readahead) = self.fragment_readahead {
2052 read_options = read_options.with_fragment_readahead(fragment_readahead);
2053 }
2054
2055 if make_deletions_null {
2056 read_options = read_options.with_deleted_rows()?;
2057 }
2058
2059 let index_input = filter_plan.index_query.clone().map(|index_query| {
2060 Arc::new(ScalarIndexExec::new(self.dataset.clone(), index_query))
2061 as Arc<dyn ExecutionPlan>
2062 });
2063
2064 Ok(Arc::new(FilteredReadExec::try_new(
2065 self.dataset.clone(),
2066 read_options,
2067 index_input,
2068 )?))
2069 }
2070
2071 async fn filtered_read(
2075 &self,
2076 filter_plan: &FilterPlan,
2077 projection: Projection,
2078 make_deletions_null: bool,
2079 fragments: Option<Arc<Vec<Fragment>>>,
2080 scan_range: Option<Range<u64>>,
2081 is_prefilter: bool,
2082 ) -> Result<PlannedFilteredScan> {
2083 if self.dataset.is_legacy_storage() {
2085 self.legacy_filtered_read(
2086 filter_plan,
2087 projection,
2088 make_deletions_null,
2089 fragments,
2090 scan_range,
2091 is_prefilter,
2092 )
2093 .await
2094 } else {
2095 let limit_pushed_down = scan_range.is_some();
2096 let plan = self
2097 .new_filtered_read(
2098 filter_plan,
2099 projection,
2100 make_deletions_null,
2101 fragments,
2102 scan_range,
2103 )
2104 .await?;
2105 Ok(PlannedFilteredScan {
2106 filter_pushed_down: true,
2107 limit_pushed_down,
2108 plan,
2109 })
2110 }
2111 }
2112
2113 fn u64s_as_take_input(&self, u64s: Vec<u64>) -> Result<Arc<dyn ExecutionPlan>> {
2114 let row_ids = RowIdTreeMap::from_iter(u64s);
2115 let row_id_mask = RowIdMask::from_allowed(row_ids);
2116 let index_result = IndexExprResult::Exact(row_id_mask);
2117 let fragments_covered =
2118 RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32));
2119 let batch = index_result.serialize_to_arrow(&fragments_covered)?;
2120 let stream = futures::stream::once(async move { Ok(batch) });
2121 let stream = Box::pin(RecordBatchStreamAdapter::new(
2122 INDEX_EXPR_RESULT_SCHEMA.clone(),
2123 stream,
2124 ));
2125 Ok(Arc::new(OneShotExec::new(stream)))
2126 }
2127
2128 async fn take_source(&self, take_op: TakeOperation) -> Result<Arc<dyn ExecutionPlan>> {
2129 let projection = self.projection_plan.physical_projection.clone();
2132
2133 let input = match take_op {
2134 TakeOperation::RowIds(ids) => self.u64s_as_take_input(ids),
2135 TakeOperation::RowAddrs(addrs) => self.u64s_as_take_input(addrs),
2136 TakeOperation::RowOffsets(offsets) => {
2137 let mut addrs =
2138 row_offsets_to_row_addresses(self.dataset.as_ref(), &offsets).await?;
2139 addrs.retain(|addr| *addr != RowAddress::TOMBSTONE_ROW);
2140 self.u64s_as_take_input(addrs)
2141 }
2142 }?;
2143
2144 Ok(Arc::new(FilteredReadExec::try_new(
2145 self.dataset.clone(),
2146 FilteredReadOptions::new(projection),
2147 Some(input),
2148 )?))
2149 }
2150
2151 async fn filtered_read_source(
2152 &self,
2153 filter_plan: &mut FilterPlan,
2154 ) -> Result<PlannedFilteredScan> {
2155 log::trace!("source is a filtered read");
2156 let mut projection = if filter_plan.has_refine() {
2157 self.calc_eager_projection(filter_plan, &self.projection_plan.physical_projection)?
2161 .with_row_id()
2162 } else {
2163 self.projection_plan.physical_projection.clone()
2166 };
2167
2168 if projection.is_empty() {
2169 projection.with_row_addr = true;
2172 }
2173
2174 let scan_range = if filter_plan.is_empty() {
2175 log::trace!("pushing scan_range into filtered_read");
2176 self.get_scan_range(filter_plan).await?
2177 } else {
2178 None
2179 };
2180
2181 self.filtered_read(
2182 filter_plan,
2183 projection,
2184 self.include_deleted_rows,
2185 self.fragments.clone().map(Arc::new),
2186 scan_range,
2187 false,
2188 )
2189 .await
2190 }
2191
2192 async fn fts_search_source(
2193 &self,
2194 filter_plan: &mut FilterPlan,
2195 query: &FullTextSearchQuery,
2196 ) -> Result<Arc<dyn ExecutionPlan>> {
2197 log::trace!("source is an fts search");
2198 if self.include_deleted_rows {
2199 return Err(Error::InvalidInput {
2200 source: "Cannot include deleted rows in an FTS search".into(),
2201 location: location!(),
2202 });
2203 }
2204
2205 if self.prefilter {
2207 let source = self.fts(filter_plan, query).await?;
2209 *filter_plan = FilterPlan::default();
2210 Ok(source)
2211 } else {
2212 filter_plan.make_refine_only();
2215 self.fts(&FilterPlan::default(), query).await
2216 }
2217 }
2218
2219 async fn vector_search_source(
2220 &self,
2221 filter_plan: &mut FilterPlan,
2222 ) -> Result<Arc<dyn ExecutionPlan>> {
2223 if self.include_deleted_rows {
2224 return Err(Error::InvalidInput {
2225 source: "Cannot include deleted rows in a nearest neighbor search".into(),
2226 location: location!(),
2227 });
2228 }
2229
2230 if self.prefilter {
2231 log::trace!("source is a vector search (prefilter)");
2232 let source = self.vector_search(filter_plan).await?;
2234 *filter_plan = FilterPlan::default();
2235 Ok(source)
2236 } else {
2237 log::trace!("source is a vector search (postfilter)");
2238 filter_plan.make_refine_only();
2241 self.vector_search(&FilterPlan::default()).await
2242 }
2243 }
2244
2245 async fn fragments_covered_by_fts_leaf(
2246 &self,
2247 column: &str,
2248 accum: &mut RoaringBitmap,
2249 ) -> Result<bool> {
2250 let index = self
2251 .dataset
2252 .load_scalar_index(
2253 ScalarIndexCriteria::default()
2254 .for_column(column)
2255 .supports_fts(),
2256 )
2257 .await?;
2258 match index {
2259 Some(index) => match &index.fragment_bitmap {
2260 Some(fragmap) => {
2261 *accum |= fragmap;
2262 Ok(true)
2263 }
2264 None => Ok(false),
2265 },
2266 None => Ok(false),
2267 }
2268 }
2269
2270 #[async_recursion]
2271 async fn fragments_covered_by_fts_query_helper(
2272 &self,
2273 query: &FtsQuery,
2274 accum: &mut RoaringBitmap,
2275 ) -> Result<bool> {
2276 match query {
2277 FtsQuery::Match(match_query) => {
2278 self.fragments_covered_by_fts_leaf(
2279 match_query.column.as_ref().ok_or(Error::invalid_input(
2280 "the column must be specified in the query".to_string(),
2281 location!(),
2282 ))?,
2283 accum,
2284 )
2285 .await
2286 }
2287 FtsQuery::Boost(boost) => Ok(self
2288 .fragments_covered_by_fts_query_helper(&boost.negative, accum)
2289 .await?
2290 & self
2291 .fragments_covered_by_fts_query_helper(&boost.positive, accum)
2292 .await?),
2293 FtsQuery::MultiMatch(multi_match) => {
2294 for mq in &multi_match.match_queries {
2295 if !self
2296 .fragments_covered_by_fts_leaf(
2297 mq.column.as_ref().ok_or(Error::invalid_input(
2298 "the column must be specified in the query".to_string(),
2299 location!(),
2300 ))?,
2301 accum,
2302 )
2303 .await?
2304 {
2305 return Ok(false);
2306 }
2307 }
2308 Ok(true)
2309 }
2310 FtsQuery::Phrase(phrase_query) => {
2311 self.fragments_covered_by_fts_leaf(
2312 phrase_query.column.as_ref().ok_or(Error::invalid_input(
2313 "the column must be specified in the query".to_string(),
2314 location!(),
2315 ))?,
2316 accum,
2317 )
2318 .await
2319 }
2320 FtsQuery::Boolean(bool_query) => {
2321 for query in bool_query.must.iter() {
2322 if !self
2323 .fragments_covered_by_fts_query_helper(query, accum)
2324 .await?
2325 {
2326 return Ok(false);
2327 }
2328 }
2329 for query in &bool_query.should {
2330 if !self
2331 .fragments_covered_by_fts_query_helper(query, accum)
2332 .await?
2333 {
2334 return Ok(false);
2335 }
2336 }
2337 Ok(true)
2338 }
2339 }
2340 }
2341
2342 async fn fragments_covered_by_fts_query(&self, query: &FtsQuery) -> Result<RoaringBitmap> {
2343 let all_fragments = self.get_fragments_as_bitmap();
2344
2345 let mut referenced_fragments = RoaringBitmap::new();
2346 if !self
2347 .fragments_covered_by_fts_query_helper(query, &mut referenced_fragments)
2348 .await?
2349 {
2350 Ok(all_fragments)
2352 } else {
2353 Ok(all_fragments & referenced_fragments)
2355 }
2356 }
2357
2358 async fn fts(
2360 &self,
2361 filter_plan: &FilterPlan,
2362 query: &FullTextSearchQuery,
2363 ) -> Result<Arc<dyn ExecutionPlan>> {
2364 let columns = query.columns();
2365 let mut params = query.params();
2366 if params.limit.is_none() {
2367 let search_limit = match (self.limit, self.offset) {
2368 (Some(limit), Some(offset)) => Some((limit + offset) as usize),
2369 (Some(limit), None) => Some(limit as usize),
2370 (None, Some(_)) => None, (None, None) => None,
2372 };
2373 params = params.with_limit(search_limit);
2374 }
2375 let query = if columns.is_empty() {
2376 let mut indexed_columns = Vec::new();
2379 for field in self.dataset.schema().fields_pre_order() {
2380 let is_string_field = match field.data_type() {
2382 DataType::Utf8 | DataType::LargeUtf8 => true,
2383 DataType::List(inner_field) | DataType::LargeList(inner_field) => {
2384 matches!(
2385 inner_field.data_type(),
2386 DataType::Utf8 | DataType::LargeUtf8
2387 )
2388 }
2389 _ => false,
2390 };
2391
2392 if is_string_field {
2393 let column_path = if let Some(ancestors) =
2395 self.dataset.schema().field_ancestry_by_id(field.id)
2396 {
2397 let field_refs: Vec<&str> =
2398 ancestors.iter().map(|f| f.name.as_str()).collect();
2399 format_field_path(&field_refs)
2400 } else {
2401 continue; };
2403
2404 let has_fts_index = self
2406 .dataset
2407 .load_scalar_index(
2408 ScalarIndexCriteria::default()
2409 .for_column(&column_path)
2410 .supports_fts(),
2411 )
2412 .await?
2413 .is_some();
2414
2415 if has_fts_index {
2416 indexed_columns.push(column_path);
2417 }
2418 }
2419 }
2420
2421 fill_fts_query_column(&query.query, &indexed_columns, false)?
2422 } else {
2423 query.query.clone()
2424 };
2425
2426 let prefilter_source = self
2430 .prefilter_source(
2431 filter_plan,
2432 self.fragments_covered_by_fts_query(&query).await?,
2433 )
2434 .await?;
2435 let fts_exec = self
2436 .plan_fts(&query, ¶ms, filter_plan, &prefilter_source)
2437 .await?;
2438 Ok(fts_exec)
2439 }
2440
2441 async fn plan_fts(
2442 &self,
2443 query: &FtsQuery,
2444 params: &FtsSearchParams,
2445 filter_plan: &FilterPlan,
2446 prefilter_source: &PreFilterSource,
2447 ) -> Result<Arc<dyn ExecutionPlan>> {
2448 let plan: Arc<dyn ExecutionPlan> = match query {
2449 FtsQuery::Match(query) => {
2450 self.plan_match_query(query, params, filter_plan, prefilter_source)
2451 .await?
2452 }
2453 FtsQuery::Phrase(query) => {
2454 self.plan_phrase_query(query, params, prefilter_source)
2455 .await?
2456 }
2457
2458 FtsQuery::Boost(query) => {
2459 let unlimited_params = params.clone().with_limit(None);
2463 let positive_exec = Box::pin(self.plan_fts(
2464 &query.positive,
2465 &unlimited_params,
2466 filter_plan,
2467 prefilter_source,
2468 ));
2469 let negative_exec = Box::pin(self.plan_fts(
2470 &query.negative,
2471 &unlimited_params,
2472 filter_plan,
2473 prefilter_source,
2474 ));
2475 let (positive_exec, negative_exec) =
2476 futures::future::try_join(positive_exec, negative_exec).await?;
2477 Arc::new(BoostQueryExec::new(
2478 query.clone(),
2479 params.clone(),
2480 positive_exec,
2481 negative_exec,
2482 ))
2483 }
2484
2485 FtsQuery::MultiMatch(query) => {
2486 let mut children = Vec::with_capacity(query.match_queries.len());
2487 for match_query in &query.match_queries {
2488 let child =
2489 self.plan_match_query(match_query, params, filter_plan, prefilter_source);
2490 children.push(child);
2491 }
2492 let children = futures::future::try_join_all(children).await?;
2493
2494 let schema = children[0].schema();
2495 let group_expr = vec![(
2496 expressions::col(ROW_ID, schema.as_ref())?,
2497 ROW_ID.to_string(),
2498 )];
2499
2500 let fts_node = Arc::new(UnionExec::new(children));
2501 let fts_node = Arc::new(RepartitionExec::try_new(
2502 fts_node,
2503 Partitioning::RoundRobinBatch(1),
2504 )?);
2505 let fts_node = Arc::new(AggregateExec::try_new(
2507 AggregateMode::Single,
2508 PhysicalGroupBy::new_single(group_expr),
2509 vec![Arc::new(
2510 AggregateExprBuilder::new(
2511 functions_aggregate::min_max::max_udaf(),
2512 vec![expressions::col(SCORE_COL, &schema)?],
2513 )
2514 .schema(schema.clone())
2515 .alias(SCORE_COL)
2516 .build()?,
2517 )],
2518 vec![None],
2519 fts_node,
2520 schema,
2521 )?);
2522 let sort_expr = PhysicalSortExpr {
2523 expr: expressions::col(SCORE_COL, fts_node.schema().as_ref())?,
2524 options: SortOptions {
2525 descending: true,
2526 nulls_first: false,
2527 },
2528 };
2529
2530 Arc::new(
2531 SortExec::new([sort_expr].into(), fts_node)
2532 .with_fetch(self.limit.map(|l| l as usize)),
2533 )
2534 }
2535 FtsQuery::Boolean(query) => {
2536 let unlimited_params = params.clone().with_limit(None);
2541
2542 let mut should = Vec::with_capacity(query.should.len());
2544 for subquery in &query.should {
2545 let plan = Box::pin(self.plan_fts(
2546 subquery,
2547 &unlimited_params,
2548 filter_plan,
2549 prefilter_source,
2550 ))
2551 .await?;
2552 should.push(plan);
2553 }
2554 let should = if should.is_empty() {
2555 Arc::new(EmptyExec::new(FTS_SCHEMA.clone()))
2556 } else if should.len() == 1 {
2557 should.pop().unwrap()
2558 } else {
2559 let unioned = Arc::new(UnionExec::new(should));
2560 Arc::new(RepartitionExec::try_new(
2561 unioned,
2562 Partitioning::RoundRobinBatch(1),
2563 )?)
2564 };
2565
2566 let mut must = None;
2568 for query in &query.must {
2569 let plan = Box::pin(self.plan_fts(
2570 query,
2571 &unlimited_params,
2572 filter_plan,
2573 prefilter_source,
2574 ))
2575 .await?;
2576 if let Some(joined_plan) = must {
2577 must = Some(Arc::new(HashJoinExec::try_new(
2578 joined_plan,
2579 plan,
2580 vec![(
2581 Arc::new(Column::new_with_schema(ROW_ID, &FTS_SCHEMA)?),
2582 Arc::new(Column::new_with_schema(ROW_ID, &FTS_SCHEMA)?),
2583 )],
2584 None,
2585 &datafusion_expr::JoinType::Inner,
2586 None,
2587 datafusion_physical_plan::joins::PartitionMode::CollectLeft,
2588 NullEquality::NullEqualsNothing,
2589 )?) as _);
2590 } else {
2591 must = Some(plan);
2592 }
2593 }
2594
2595 let mut must_not = Vec::with_capacity(query.must_not.len());
2597 for query in &query.must_not {
2598 let plan = Box::pin(self.plan_fts(
2599 query,
2600 &unlimited_params,
2601 filter_plan,
2602 prefilter_source,
2603 ))
2604 .await?;
2605 must_not.push(plan);
2606 }
2607 let must_not = if must_not.is_empty() {
2608 Arc::new(EmptyExec::new(FTS_SCHEMA.clone()))
2609 } else if must_not.len() == 1 {
2610 must_not.pop().unwrap()
2611 } else {
2612 let unioned = Arc::new(UnionExec::new(must_not));
2613 Arc::new(RepartitionExec::try_new(
2614 unioned,
2615 Partitioning::RoundRobinBatch(1),
2616 )?)
2617 };
2618
2619 if query.should.is_empty() && must.is_none() {
2620 return Err(Error::invalid_input(
2621 "boolean query must have at least one should/must query".to_string(),
2622 location!(),
2623 ));
2624 }
2625
2626 Arc::new(BooleanQueryExec::new(
2627 query.clone(),
2628 params.clone(),
2629 should,
2630 must,
2631 must_not,
2632 ))
2633 }
2634 };
2635
2636 Ok(plan)
2637 }
2638
2639 async fn plan_phrase_query(
2640 &self,
2641 query: &PhraseQuery,
2642 params: &FtsSearchParams,
2643 prefilter_source: &PreFilterSource,
2644 ) -> Result<Arc<dyn ExecutionPlan>> {
2645 let column = query.column.clone().ok_or(Error::invalid_input(
2646 "the column must be specified in the query".to_string(),
2647 location!(),
2648 ))?;
2649
2650 let index_meta = self
2651 .dataset
2652 .load_scalar_index(
2653 ScalarIndexCriteria::default()
2654 .for_column(&column)
2655 .supports_fts(),
2656 )
2657 .await?
2658 .ok_or(Error::invalid_input(
2659 format!("No Inverted index found for column {}", column),
2660 location!(),
2661 ))?;
2662
2663 let details_any =
2664 crate::index::scalar::fetch_index_details(&self.dataset, &column, &index_meta).await?;
2665 let details = details_any
2666 .as_ref()
2667 .to_msg::<lance_index::pbold::InvertedIndexDetails>()?;
2668 if !details.with_position {
2669 return Err(Error::invalid_input(
2670 "position is not found but required for phrase queries, try recreating the index with position"
2671 .to_string(),
2672 location!(),
2673 ));
2674 }
2675
2676 Ok(Arc::new(PhraseQueryExec::new(
2677 self.dataset.clone(),
2678 query.clone(),
2679 params.clone(),
2680 prefilter_source.clone(),
2681 )))
2682 }
2683
2684 async fn plan_match_query(
2685 &self,
2686 query: &MatchQuery,
2687 params: &FtsSearchParams,
2688 filter_plan: &FilterPlan,
2689 prefilter_source: &PreFilterSource,
2690 ) -> Result<Arc<dyn ExecutionPlan>> {
2691 let column = query
2692 .column
2693 .as_ref()
2694 .ok_or(Error::invalid_input(
2695 "the column must be specified in the query".to_string(),
2696 location!(),
2697 ))?
2698 .clone();
2699
2700 let index = self
2701 .dataset
2702 .load_scalar_index(
2703 ScalarIndexCriteria::default()
2704 .for_column(&column)
2705 .supports_fts(),
2706 )
2707 .await?;
2708
2709 let (match_plan, flat_match_plan) = match &index {
2710 Some(index) => {
2711 let match_plan: Arc<dyn ExecutionPlan> = Arc::new(MatchQueryExec::new(
2712 self.dataset.clone(),
2713 query.clone(),
2714 params.clone(),
2715 prefilter_source.clone(),
2716 ));
2717
2718 let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
2719 if unindexed_fragments.is_empty() {
2720 (Some(match_plan), None)
2721 } else {
2722 let flat_match_plan = self
2723 .plan_flat_match_query(unindexed_fragments, query, params, filter_plan)
2724 .await?;
2725 (Some(match_plan), Some(flat_match_plan))
2726 }
2727 }
2728 None => {
2729 let unindexed_fragments = self.dataset.fragments().iter().cloned().collect();
2730 let flat_match_plan = self
2731 .plan_flat_match_query(unindexed_fragments, query, params, filter_plan)
2732 .await?;
2733 (None, Some(flat_match_plan))
2734 }
2735 };
2736
2737 let plan = match (match_plan, flat_match_plan) {
2739 (Some(match_plan), Some(flat_match_plan)) => {
2740 let match_plan = Arc::new(UnionExec::new(vec![match_plan, flat_match_plan]));
2741 let match_plan = Arc::new(RepartitionExec::try_new(
2742 match_plan,
2743 Partitioning::RoundRobinBatch(1),
2744 )?);
2745 let sort_expr = PhysicalSortExpr {
2746 expr: expressions::col(SCORE_COL, match_plan.schema().as_ref())?,
2747 options: SortOptions {
2748 descending: true,
2749 nulls_first: false,
2750 },
2751 };
2752 Arc::new(SortExec::new([sort_expr].into(), match_plan).with_fetch(params.limit))
2753 }
2754 (Some(match_plan), None) => match_plan,
2755 (None, Some(flat_match_plan)) => flat_match_plan,
2756 (None, None) => unreachable!(),
2757 };
2758
2759 Ok(plan)
2760 }
2761
2762 async fn plan_flat_match_query(
2764 &self,
2765 fragments: Vec<Fragment>,
2766 query: &MatchQuery,
2767 params: &FtsSearchParams,
2768 filter_plan: &FilterPlan,
2769 ) -> Result<Arc<dyn ExecutionPlan>> {
2770 let column = query
2771 .column
2772 .as_ref()
2773 .ok_or(Error::invalid_input(
2774 "the column must be specified in the query".to_string(),
2775 location!(),
2776 ))?
2777 .clone();
2778
2779 let mut columns = vec![column];
2780 if let Some(expr) = filter_plan.full_expr.as_ref() {
2781 let filter_columns = Planner::column_names_in_expr(expr);
2782 columns.extend(filter_columns);
2783 }
2784 let flat_fts_scan_schema = Arc::new(self.dataset.schema().project(&columns).unwrap());
2785 let mut scan_node = self.scan_fragments(
2786 true,
2787 false,
2788 false,
2789 false,
2790 false,
2791 flat_fts_scan_schema,
2792 Arc::new(fragments),
2793 None,
2794 false,
2795 );
2796
2797 if let Some(expr) = filter_plan.full_expr.as_ref() {
2798 scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?);
2800 }
2801
2802 let flat_match_plan = Arc::new(FlatMatchQueryExec::new(
2803 self.dataset.clone(),
2804 query.clone(),
2805 params.clone(),
2806 scan_node,
2807 ));
2808 Ok(flat_match_plan)
2809 }
2810
2811 async fn vector_search(&self, filter_plan: &FilterPlan) -> Result<Arc<dyn ExecutionPlan>> {
2813 let Some(q) = self.nearest.as_ref() else {
2814 return Err(Error::invalid_input(
2815 "No nearest query".to_string(),
2816 location!(),
2817 ));
2818 };
2819
2820 let (vector_type, _) = get_vector_type(self.dataset.schema(), &q.column)?;
2822
2823 let column_id = self.dataset.schema().field_id(q.column.as_str())?;
2824 let use_index = self.nearest.as_ref().map(|q| q.use_index).unwrap_or(false);
2825 let indices = if use_index {
2826 self.dataset.load_indices().await?
2827 } else {
2828 Arc::new(vec![])
2829 };
2830 if let Some(index) = indices.iter().find(|i| i.fields.contains(&column_id)) {
2831 log::trace!("index found for vector search");
2832 if matches!(q.refine_factor, Some(0)) {
2835 return Err(Error::invalid_input(
2836 "Refine factor cannot be zero".to_string(),
2837 location!(),
2838 ));
2839 }
2840
2841 let deltas = self.dataset.load_indices_by_name(&index.name).await?;
2843 let ann_node = match vector_type {
2844 DataType::FixedSizeList(_, _) => self.ann(q, &deltas, filter_plan).await?,
2845 DataType::List(_) => self.multivec_ann(q, &deltas, filter_plan).await?,
2846 _ => unreachable!(),
2847 };
2848
2849 let mut knn_node = if q.refine_factor.is_some() {
2850 let vector_projection = self
2851 .dataset
2852 .empty_projection()
2853 .union_column(&q.column, OnMissing::Error)
2854 .unwrap();
2855 let knn_node_with_vector = self.take(ann_node, vector_projection)?;
2856 let idx = self
2858 .dataset
2859 .open_vector_index(
2860 q.column.as_str(),
2861 &index.uuid.to_string(),
2862 &NoOpMetricsCollector,
2863 )
2864 .await?;
2865 let mut q = q.clone();
2866 q.metric_type = idx.metric_type();
2867 self.flat_knn(knn_node_with_vector, &q)?
2868 } else {
2869 ann_node
2870 }; if !self.fast_search {
2873 knn_node = self.knn_combined(q, index, knn_node, filter_plan).await?;
2874 }
2875
2876 Ok(knn_node)
2877 } else {
2878 let mut columns = vec![q.column.clone()];
2880 if let Some(refine_expr) = filter_plan.refine_expr.as_ref() {
2881 columns.extend(Planner::column_names_in_expr(refine_expr));
2882 }
2883 let mut vector_scan_projection = self
2884 .dataset
2885 .empty_projection()
2886 .with_row_id()
2887 .union_columns(&columns, OnMissing::Error)?;
2888
2889 vector_scan_projection.with_row_addr =
2890 self.projection_plan.physical_projection.with_row_addr;
2891
2892 let PlannedFilteredScan { mut plan, .. } = self
2893 .filtered_read(
2894 filter_plan,
2895 vector_scan_projection,
2896 true,
2897 None,
2898 None,
2899 true,
2900 )
2901 .await?;
2902
2903 if let Some(refine_expr) = &filter_plan.refine_expr {
2904 plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?);
2905 }
2906 Ok(self.flat_knn(plan, q)?)
2907 }
2908 }
2909
2910 async fn knn_combined(
2912 &self,
2913 q: &Query,
2914 index: &IndexMetadata,
2915 mut knn_node: Arc<dyn ExecutionPlan>,
2916 filter_plan: &FilterPlan,
2917 ) -> Result<Arc<dyn ExecutionPlan>> {
2918 let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
2920 if !unindexed_fragments.is_empty() {
2921 let idx = self
2924 .dataset
2925 .open_vector_index(
2926 q.column.as_str(),
2927 &index.uuid.to_string(),
2928 &NoOpMetricsCollector,
2929 )
2930 .await?;
2931 let mut q = q.clone();
2932 q.metric_type = idx.metric_type();
2933
2934 if knn_node.schema().column_with_name(&q.column).is_none() {
2937 let vector_projection = self
2938 .dataset
2939 .empty_projection()
2940 .union_column(&q.column, OnMissing::Error)
2941 .unwrap();
2942 knn_node = self.take(knn_node, vector_projection)?;
2943 }
2944
2945 let mut columns = vec![q.column.clone()];
2946 if let Some(expr) = filter_plan.full_expr.as_ref() {
2947 let filter_columns = Planner::column_names_in_expr(expr);
2948 columns.extend(filter_columns);
2949 }
2950 let vector_scan_projection = Arc::new(self.dataset.schema().project(&columns).unwrap());
2951 let mut scan_node = self.scan_fragments(
2955 true,
2956 false,
2957 false,
2958 false,
2959 false,
2960 vector_scan_projection,
2961 Arc::new(unindexed_fragments),
2962 None,
2964 false,
2967 );
2968
2969 if let Some(expr) = filter_plan.full_expr.as_ref() {
2970 scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?);
2972 }
2973 let topk_appended = self.flat_knn(scan_node, &q)?;
2975
2976 let topk_appended = project(topk_appended, knn_node.schema().as_ref())?;
2980 assert!(topk_appended
2981 .schema()
2982 .equivalent_names_and_types(&knn_node.schema()));
2983 let unioned = UnionExec::new(vec![Arc::new(topk_appended), knn_node]);
2985 let unioned = RepartitionExec::try_new(
2987 Arc::new(unioned),
2988 datafusion::physical_plan::Partitioning::RoundRobinBatch(1),
2989 )?;
2990 return self.flat_knn(Arc::new(unioned), &q);
2992 }
2993
2994 Ok(knn_node)
2995 }
2996
2997 #[async_recursion]
2998 async fn fragments_covered_by_index_query(
2999 &self,
3000 index_expr: &ScalarIndexExpr,
3001 ) -> Result<RoaringBitmap> {
3002 match index_expr {
3003 ScalarIndexExpr::And(lhs, rhs) => {
3004 Ok(self.fragments_covered_by_index_query(lhs).await?
3005 & self.fragments_covered_by_index_query(rhs).await?)
3006 }
3007 ScalarIndexExpr::Or(lhs, rhs) => Ok(self.fragments_covered_by_index_query(lhs).await?
3008 & self.fragments_covered_by_index_query(rhs).await?),
3009 ScalarIndexExpr::Not(expr) => self.fragments_covered_by_index_query(expr).await,
3010 ScalarIndexExpr::Query(search) => {
3011 let idx = self
3012 .dataset
3013 .load_scalar_index(ScalarIndexCriteria::default().with_name(&search.index_name))
3014 .await?
3015 .expect("Index not found even though it must have been found earlier");
3016 Ok(idx
3017 .fragment_bitmap
3018 .expect("scalar indices should always have a fragment bitmap"))
3019 }
3020 }
3021 }
3022
3023 async fn partition_frags_by_coverage(
3031 &self,
3032 index_expr: &ScalarIndexExpr,
3033 fragments: Arc<Vec<Fragment>>,
3034 ) -> Result<(Vec<Fragment>, Vec<Fragment>)> {
3035 let covered_frags = self.fragments_covered_by_index_query(index_expr).await?;
3036 let mut relevant_frags = Vec::with_capacity(fragments.len());
3037 let mut missing_frags = Vec::with_capacity(fragments.len());
3038 for fragment in fragments.iter() {
3039 if covered_frags.contains(fragment.id as u32) {
3040 relevant_frags.push(fragment.clone());
3041 } else {
3042 missing_frags.push(fragment.clone());
3043 }
3044 }
3045 Ok((relevant_frags, missing_frags))
3046 }
3047
3048 async fn scalar_indexed_scan(
3051 &self,
3052 projection: Projection,
3053 filter_plan: &FilterPlan,
3054 fragments: Arc<Vec<Fragment>>,
3055 ) -> Result<Arc<dyn ExecutionPlan>> {
3056 log::trace!("scalar indexed scan");
3057 let index_expr = filter_plan.index_query.as_ref().unwrap();
3064
3065 let needs_recheck = index_expr.needs_recheck();
3066
3067 let (relevant_frags, missing_frags) = self
3069 .partition_frags_by_coverage(index_expr, fragments)
3070 .await?;
3071
3072 let mut plan: Arc<dyn ExecutionPlan> = Arc::new(MaterializeIndexExec::new(
3073 self.dataset.clone(),
3074 index_expr.clone(),
3075 Arc::new(relevant_frags),
3076 ));
3077
3078 let refine_expr = filter_plan.refine_expr.as_ref();
3079
3080 let needs_take =
3083 needs_recheck || projection.has_data_fields() || filter_plan.refine_expr.is_some();
3084 if needs_take {
3085 let mut take_projection = projection.clone();
3086 if needs_recheck {
3087 let filter_expr = index_expr.to_expr();
3089 let filter_cols = Planner::column_names_in_expr(&filter_expr);
3090 take_projection = take_projection.union_columns(filter_cols, OnMissing::Error)?;
3091 }
3092 if let Some(refine_expr) = refine_expr {
3093 let refine_cols = Planner::column_names_in_expr(refine_expr);
3094 take_projection = take_projection.union_columns(refine_cols, OnMissing::Error)?;
3095 }
3096 log::trace!("need to take additional columns for scalar_indexed_scan");
3097 plan = self.take(plan, take_projection)?;
3098 }
3099
3100 let post_take_filter = match (needs_recheck, refine_expr) {
3101 (false, None) => None,
3102 (true, None) => {
3103 Some(index_expr.to_expr())
3105 }
3106 (true, Some(_)) => Some(filter_plan.full_expr.as_ref().unwrap().clone()),
3107 (false, Some(refine_expr)) => Some(refine_expr.clone()),
3108 };
3109
3110 if let Some(post_take_filter) = post_take_filter {
3111 let planner = Planner::new(plan.schema());
3112 let optimized_filter = planner.optimize_expr(post_take_filter)?;
3113
3114 log::trace!("applying post-take filter to indexed scan");
3115 plan = Arc::new(LanceFilterExec::try_new(optimized_filter, plan)?);
3116 }
3117
3118 if self.projection_plan.physical_projection.with_row_addr {
3119 plan = Arc::new(AddRowAddrExec::try_new(plan, self.dataset.clone(), 0)?);
3120 }
3121
3122 let new_data_path: Option<Arc<dyn ExecutionPlan>> = if !missing_frags.is_empty() {
3123 log::trace!(
3124 "scalar_indexed_scan will need full scan of {} missing fragments",
3125 missing_frags.len()
3126 );
3127
3128 let filter = filter_plan.full_expr.as_ref().unwrap();
3141 let filter_cols = Planner::column_names_in_expr(filter);
3142 let scan_projection = projection.union_columns(filter_cols, OnMissing::Error)?;
3143
3144 let scan_schema = Arc::new(scan_projection.to_bare_schema());
3145 let scan_arrow_schema = Arc::new(scan_schema.as_ref().into());
3146 let planner = Planner::new(scan_arrow_schema);
3147 let optimized_filter = planner.optimize_expr(filter.clone())?;
3148
3149 let new_data_scan = self.scan_fragments(
3150 true,
3151 self.projection_plan.physical_projection.with_row_addr,
3152 self.projection_plan
3153 .physical_projection
3154 .with_row_last_updated_at_version,
3155 self.projection_plan
3156 .physical_projection
3157 .with_row_created_at_version,
3158 false,
3159 scan_schema,
3160 missing_frags.into(),
3161 None,
3163 false,
3164 );
3165 let filtered = Arc::new(LanceFilterExec::try_new(optimized_filter, new_data_scan)?);
3166 Some(Arc::new(project(filtered, plan.schema().as_ref())?))
3167 } else {
3168 log::trace!("scalar_indexed_scan will not need full scan of any missing fragments");
3169 None
3170 };
3171
3172 if let Some(new_data_path) = new_data_path {
3173 let unioned = UnionExec::new(vec![plan, new_data_path]);
3174 let unioned = RepartitionExec::try_new(
3176 Arc::new(unioned),
3177 datafusion::physical_plan::Partitioning::RoundRobinBatch(1),
3178 )?;
3179 Ok(Arc::new(unioned))
3180 } else {
3181 Ok(plan)
3182 }
3183 }
3184
3185 fn get_io_buffer_size(&self) -> u64 {
3186 self.io_buffer_size.unwrap_or(*DEFAULT_IO_BUFFER_SIZE)
3187 }
3188
3189 #[allow(clippy::too_many_arguments)]
3194 pub(crate) fn scan(
3195 &self,
3196 with_row_id: bool,
3197 with_row_address: bool,
3198 with_row_last_updated_at_version: bool,
3199 with_row_created_at_version: bool,
3200 with_make_deletions_null: bool,
3201 range: Option<Range<u64>>,
3202 projection: Arc<Schema>,
3203 ) -> Arc<dyn ExecutionPlan> {
3204 let fragments = if let Some(fragment) = self.fragments.as_ref() {
3205 Arc::new(fragment.clone())
3206 } else {
3207 self.dataset.fragments().clone()
3208 };
3209 let ordered = if self.ordering.is_some() || self.nearest.is_some() {
3210 false
3212 } else {
3213 self.ordered
3214 };
3215 self.scan_fragments(
3216 with_row_id,
3217 with_row_address,
3218 with_row_last_updated_at_version,
3219 with_row_created_at_version,
3220 with_make_deletions_null,
3221 projection,
3222 fragments,
3223 range,
3224 ordered,
3225 )
3226 }
3227
3228 #[allow(clippy::too_many_arguments)]
3229 fn scan_fragments(
3230 &self,
3231 with_row_id: bool,
3232 with_row_address: bool,
3233 with_row_last_updated_at_version: bool,
3234 with_row_created_at_version: bool,
3235 with_make_deletions_null: bool,
3236 projection: Arc<Schema>,
3237 fragments: Arc<Vec<Fragment>>,
3238 range: Option<Range<u64>>,
3239 ordered: bool,
3240 ) -> Arc<dyn ExecutionPlan> {
3241 log::trace!("scan_fragments covered {} fragments", fragments.len());
3242 let config = LanceScanConfig {
3243 batch_size: self.get_batch_size(),
3244 batch_readahead: self.batch_readahead,
3245 fragment_readahead: self.fragment_readahead,
3246 io_buffer_size: self.get_io_buffer_size(),
3247 with_row_id,
3248 with_row_address,
3249 with_row_last_updated_at_version,
3250 with_row_created_at_version,
3251 with_make_deletions_null,
3252 ordered_output: ordered,
3253 };
3254 Arc::new(LanceScanExec::new(
3255 self.dataset.clone(),
3256 fragments,
3257 range,
3258 projection,
3259 config,
3260 ))
3261 }
3262
3263 fn pushdown_scan(
3264 &self,
3265 make_deletions_null: bool,
3266 filter_plan: &FilterPlan,
3267 ) -> Result<Arc<dyn ExecutionPlan>> {
3268 log::trace!("pushdown_scan");
3269
3270 let config = ScanConfig {
3271 batch_readahead: self.batch_readahead,
3272 fragment_readahead: self
3273 .fragment_readahead
3274 .unwrap_or(LEGACY_DEFAULT_FRAGMENT_READAHEAD),
3275 with_row_id: self.projection_plan.physical_projection.with_row_id,
3276 with_row_address: self.projection_plan.physical_projection.with_row_addr,
3277 make_deletions_null,
3278 ordered_output: self.ordered,
3279 file_reader_options: self
3280 .file_reader_options
3281 .clone()
3282 .or_else(|| self.dataset.file_reader_options.clone()),
3283 };
3284
3285 let fragments = if let Some(fragment) = self.fragments.as_ref() {
3286 Arc::new(fragment.clone())
3287 } else {
3288 self.dataset.fragments().clone()
3289 };
3290
3291 Ok(Arc::new(LancePushdownScanExec::try_new(
3292 self.dataset.clone(),
3293 fragments,
3294 Arc::new(self.projection_plan.physical_projection.to_bare_schema()),
3295 filter_plan.refine_expr.clone().unwrap(),
3296 config,
3297 )?))
3298 }
3299
3300 fn flat_knn(&self, input: Arc<dyn ExecutionPlan>, q: &Query) -> Result<Arc<dyn ExecutionPlan>> {
3302 let flat_dist = Arc::new(KNNVectorDistanceExec::try_new(
3303 input,
3304 &q.column,
3305 q.key.clone(),
3306 q.metric_type,
3307 )?);
3308
3309 let lower: Option<(Expr, Arc<dyn PhysicalExpr>)> = q
3310 .lower_bound
3311 .map(|v| -> Result<(Expr, Arc<dyn PhysicalExpr>)> {
3312 let logical = col(DIST_COL).gt_eq(lit(v));
3313 let schema = flat_dist.schema();
3314 let df_schema = DFSchema::try_from(schema)?;
3315 let physical = create_physical_expr(&logical, &df_schema, &ExecutionProps::new())?;
3316 Ok::<(Expr, Arc<dyn PhysicalExpr>), _>((logical, physical))
3317 })
3318 .transpose()?;
3319
3320 let upper = q
3321 .upper_bound
3322 .map(|v| -> Result<(Expr, Arc<dyn PhysicalExpr>)> {
3323 let logical = col(DIST_COL).lt(lit(v));
3324 let schema = flat_dist.schema();
3325 let df_schema = DFSchema::try_from(schema)?;
3326 let physical = create_physical_expr(&logical, &df_schema, &ExecutionProps::new())?;
3327 Ok::<(Expr, Arc<dyn PhysicalExpr>), _>((logical, physical))
3328 })
3329 .transpose()?;
3330
3331 let filter_expr = match (lower, upper) {
3332 (Some((llog, _)), Some((ulog, _))) => {
3333 let logical = llog.and(ulog);
3334 let schema = flat_dist.schema();
3335 let df_schema = DFSchema::try_from(schema)?;
3336 let physical = create_physical_expr(&logical, &df_schema, &ExecutionProps::new())?;
3337 Some((logical, physical))
3338 }
3339 (Some((llog, lphys)), None) => Some((llog, lphys)),
3340 (None, Some((ulog, uphys))) => Some((ulog, uphys)),
3341 (None, None) => None,
3342 };
3343
3344 let knn_plan: Arc<dyn ExecutionPlan> = if let Some(filter_expr) = filter_expr {
3345 Arc::new(LanceFilterExec::try_new(filter_expr.0, flat_dist)?)
3346 } else {
3347 flat_dist
3348 };
3349
3350 let sort = SortExec::new(
3352 [
3353 PhysicalSortExpr {
3354 expr: expressions::col(DIST_COL, knn_plan.schema().as_ref())?,
3355 options: SortOptions {
3356 descending: false,
3357 nulls_first: false,
3358 },
3359 },
3360 PhysicalSortExpr {
3361 expr: expressions::col(ROW_ID, knn_plan.schema().as_ref())?,
3362 options: SortOptions {
3363 descending: false,
3364 nulls_first: false,
3365 },
3366 },
3367 ]
3368 .into(),
3369 knn_plan,
3370 )
3371 .with_fetch(Some(q.k));
3372
3373 let logical_not_null = col(DIST_COL).is_not_null();
3374 let not_nulls = Arc::new(LanceFilterExec::try_new(logical_not_null, Arc::new(sort))?);
3375
3376 Ok(not_nulls)
3377 }
3378
3379 fn get_fragments_as_bitmap(&self) -> RoaringBitmap {
3380 if let Some(fragments) = &self.fragments {
3381 RoaringBitmap::from_iter(fragments.iter().map(|f| f.id as u32))
3382 } else {
3383 RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32))
3384 }
3385 }
3386
3387 fn get_indexed_frags(&self, index: &[IndexMetadata]) -> RoaringBitmap {
3388 let all_fragments = self.get_fragments_as_bitmap();
3389
3390 let mut all_indexed_frags = RoaringBitmap::new();
3391 for idx in index {
3392 if let Some(fragmap) = idx.fragment_bitmap.as_ref() {
3393 all_indexed_frags |= fragmap;
3394 } else {
3395 return all_fragments;
3398 }
3399 }
3400
3401 all_indexed_frags & all_fragments
3402 }
3403
3404 async fn ann(
3406 &self,
3407 q: &Query,
3408 index: &[IndexMetadata],
3409 filter_plan: &FilterPlan,
3410 ) -> Result<Arc<dyn ExecutionPlan>> {
3411 let prefilter_source = self
3412 .prefilter_source(filter_plan, self.get_indexed_frags(index))
3413 .await?;
3414 let inner_fanout_search = new_knn_exec(self.dataset.clone(), index, q, prefilter_source)?;
3415 let sort_expr = PhysicalSortExpr {
3416 expr: expressions::col(DIST_COL, inner_fanout_search.schema().as_ref())?,
3417 options: SortOptions {
3418 descending: false,
3419 nulls_first: false,
3420 },
3421 };
3422 let sort_expr_row_id = PhysicalSortExpr {
3423 expr: expressions::col(ROW_ID, inner_fanout_search.schema().as_ref())?,
3424 options: SortOptions {
3425 descending: false,
3426 nulls_first: false,
3427 },
3428 };
3429 Ok(Arc::new(
3430 SortExec::new([sort_expr, sort_expr_row_id].into(), inner_fanout_search)
3431 .with_fetch(Some(q.k * q.refine_factor.unwrap_or(1) as usize)),
3432 ))
3433 }
3434
3435 async fn multivec_ann(
3437 &self,
3438 q: &Query,
3439 index: &[IndexMetadata],
3440 filter_plan: &FilterPlan,
3441 ) -> Result<Arc<dyn ExecutionPlan>> {
3442 let over_fetch_factor = *DEFAULT_XTR_OVERFETCH;
3447
3448 let prefilter_source = self
3449 .prefilter_source(filter_plan, self.get_indexed_frags(index))
3450 .await?;
3451 let dim = get_vector_dim(self.dataset.schema(), &q.column)?;
3452
3453 let num_queries = q.key.len() / dim;
3454 let new_queries = (0..num_queries)
3455 .map(|i| q.key.slice(i * dim, dim))
3456 .map(|query_vec| {
3457 let mut new_query = q.clone();
3458 new_query.key = query_vec;
3459 new_query.refine_factor = Some(over_fetch_factor);
3463 new_query
3464 });
3465 let mut ann_nodes = Vec::with_capacity(new_queries.len());
3466 for query in new_queries {
3467 let ann_node = new_knn_exec(
3469 self.dataset.clone(),
3470 index,
3471 &query,
3472 prefilter_source.clone(),
3473 )?;
3474 let sort_expr = PhysicalSortExpr {
3475 expr: expressions::col(DIST_COL, ann_node.schema().as_ref())?,
3476 options: SortOptions {
3477 descending: false,
3478 nulls_first: false,
3479 },
3480 };
3481 let sort_expr_row_id = PhysicalSortExpr {
3482 expr: expressions::col(ROW_ID, ann_node.schema().as_ref())?,
3483 options: SortOptions {
3484 descending: false,
3485 nulls_first: false,
3486 },
3487 };
3488 let ann_node = Arc::new(
3489 SortExec::new([sort_expr, sort_expr_row_id].into(), ann_node)
3490 .with_fetch(Some(q.k * over_fetch_factor as usize)),
3491 );
3492 ann_nodes.push(ann_node as Arc<dyn ExecutionPlan>);
3493 }
3494
3495 let ann_node = Arc::new(MultivectorScoringExec::try_new(ann_nodes, q.clone())?);
3496
3497 let sort_expr = PhysicalSortExpr {
3498 expr: expressions::col(DIST_COL, ann_node.schema().as_ref())?,
3499 options: SortOptions {
3500 descending: false,
3501 nulls_first: false,
3502 },
3503 };
3504 let sort_expr_row_id = PhysicalSortExpr {
3505 expr: expressions::col(ROW_ID, ann_node.schema().as_ref())?,
3506 options: SortOptions {
3507 descending: false,
3508 nulls_first: false,
3509 },
3510 };
3511 let ann_node = Arc::new(
3512 SortExec::new([sort_expr, sort_expr_row_id].into(), ann_node)
3513 .with_fetch(Some(q.k * q.refine_factor.unwrap_or(1) as usize)),
3514 );
3515
3516 Ok(ann_node)
3517 }
3518
3519 async fn prefilter_source(
3524 &self,
3525 filter_plan: &FilterPlan,
3526 required_frags: RoaringBitmap,
3527 ) -> Result<PreFilterSource> {
3528 if filter_plan.is_empty() {
3529 log::trace!("no filter plan, no prefilter");
3530 return Ok(PreFilterSource::None);
3531 }
3532
3533 let fragments = Arc::new(
3534 self.dataset
3535 .manifest
3536 .fragments
3537 .iter()
3538 .filter(|f| required_frags.contains(f.id as u32))
3539 .cloned()
3540 .collect::<Vec<_>>(),
3541 );
3542
3543 if filter_plan.is_exact_index_search() && self.fragments.is_none() {
3549 let index_query = filter_plan.index_query.as_ref().expect_ok()?;
3550 let (_, missing_frags) = self
3551 .partition_frags_by_coverage(index_query, fragments.clone())
3552 .await?;
3553
3554 if missing_frags.is_empty() {
3555 log::trace!("prefilter entirely satisfied by exact index search");
3556 return Ok(PreFilterSource::ScalarIndexQuery(Arc::new(
3561 ScalarIndexExec::new(self.dataset.clone(), index_query.clone()),
3562 )));
3563 } else {
3564 log::trace!("exact index search did not cover all fragments");
3565 }
3566 }
3567
3568 log::trace!(
3570 "prefilter is a filtered read of {} fragments",
3571 fragments.len()
3572 );
3573 let PlannedFilteredScan { plan, .. } = self
3574 .filtered_read(
3575 filter_plan,
3576 self.dataset.empty_projection().with_row_id(),
3577 false,
3578 Some(fragments),
3579 None,
3580 true,
3581 )
3582 .await?;
3583 Ok(PreFilterSource::FilteredRowIds(plan))
3584 }
3585
3586 fn take(
3588 &self,
3589 input: Arc<dyn ExecutionPlan>,
3590 output_projection: Projection,
3591 ) -> Result<Arc<dyn ExecutionPlan>> {
3592 let coalesced = Arc::new(CoalesceBatchesExec::new(
3593 input.clone(),
3594 self.get_batch_size(),
3595 ));
3596 if let Some(take_plan) =
3597 TakeExec::try_new(self.dataset.clone(), coalesced, output_projection)?
3598 {
3599 Ok(Arc::new(take_plan))
3600 } else {
3601 Ok(input)
3603 }
3604 }
3605
3606 fn limit_node(&self, plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
3608 Arc::new(GlobalLimitExec::new(
3609 plan,
3610 *self.offset.as_ref().unwrap_or(&0) as usize,
3611 self.limit.map(|l| l as usize),
3612 ))
3613 }
3614
3615 #[instrument(level = "info", skip(self))]
3616 pub async fn analyze_plan(&self) -> Result<String> {
3617 let plan = self.create_plan().await?;
3618 let res = analyze_plan(
3619 plan,
3620 LanceExecutionOptions {
3621 batch_size: self.batch_size,
3622 ..Default::default()
3623 },
3624 )
3625 .await;
3626 res
3627 }
3628
3629 #[instrument(level = "info", skip(self))]
3630 pub async fn explain_plan(&self, verbose: bool) -> Result<String> {
3631 let plan = self.create_plan().await?;
3632 let display = DisplayableExecutionPlan::new(plan.as_ref());
3633
3634 Ok(format!("{}", display.indent(verbose)))
3635 }
3636}
3637
3638#[pin_project::pin_project]
3642pub struct DatasetRecordBatchStream {
3643 #[pin]
3644 exec_node: SendableRecordBatchStream,
3645 span: Span,
3646}
3647
3648impl DatasetRecordBatchStream {
3649 pub fn new(exec_node: SendableRecordBatchStream) -> Self {
3650 let schema = exec_node.schema();
3651 let adapter = SchemaAdapter::new(schema.clone());
3652 let exec_node = if SchemaAdapter::requires_logical_conversion(&schema) {
3653 adapter.to_logical_stream(exec_node)
3654 } else {
3655 exec_node
3656 };
3657
3658 let span = info_span!("DatasetRecordBatchStream");
3659 Self { exec_node, span }
3660 }
3661}
3662
3663impl RecordBatchStream for DatasetRecordBatchStream {
3664 fn schema(&self) -> SchemaRef {
3665 self.exec_node.schema()
3666 }
3667}
3668
3669impl Stream for DatasetRecordBatchStream {
3670 type Item = Result<RecordBatch>;
3671
3672 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3673 let mut this = self.project();
3674 let _guard = this.span.enter();
3675 match this.exec_node.poll_next_unpin(cx) {
3676 Poll::Ready(result) => {
3677 Poll::Ready(result.map(|r| r.map_err(|e| Error::io(e.to_string(), location!()))))
3678 }
3679 Poll::Pending => Poll::Pending,
3680 }
3681 }
3682}
3683
3684impl From<DatasetRecordBatchStream> for SendableRecordBatchStream {
3685 fn from(stream: DatasetRecordBatchStream) -> Self {
3686 stream.exec_node
3687 }
3688}
3689
3690#[cfg(test)]
3691pub mod test_dataset {
3692
3693 use super::*;
3694
3695 use std::{collections::HashMap, vec};
3696
3697 use arrow_array::{
3698 ArrayRef, FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterator, StringArray,
3699 };
3700 use arrow_schema::{ArrowError, DataType};
3701 use lance_arrow::FixedSizeListArrayExt;
3702 use lance_core::utils::tempfile::TempStrDir;
3703 use lance_file::version::LanceFileVersion;
3704 use lance_index::{
3705 scalar::{inverted::tokenizer::InvertedIndexParams, ScalarIndexParams},
3706 IndexType,
3707 };
3708
3709 use crate::dataset::WriteParams;
3710 use crate::index::vector::VectorIndexParams;
3711
3712 pub struct TestVectorDataset {
3722 pub tmp_dir: TempStrDir,
3723 pub schema: Arc<ArrowSchema>,
3724 pub dataset: Dataset,
3725 dimension: u32,
3726 }
3727
3728 impl TestVectorDataset {
3729 pub async fn new(
3730 data_storage_version: LanceFileVersion,
3731 stable_row_ids: bool,
3732 ) -> Result<Self> {
3733 Self::new_with_dimension(data_storage_version, stable_row_ids, 32).await
3734 }
3735
3736 pub async fn new_with_dimension(
3737 data_storage_version: LanceFileVersion,
3738 stable_row_ids: bool,
3739 dimension: u32,
3740 ) -> Result<Self> {
3741 let path = TempStrDir::default();
3742
3743 let metadata: HashMap<String, String> =
3745 vec![("dataset".to_string(), "vector".to_string())]
3746 .into_iter()
3747 .collect();
3748
3749 let schema = Arc::new(ArrowSchema::new_with_metadata(
3750 vec![
3751 ArrowField::new("i", DataType::Int32, true),
3752 ArrowField::new("s", DataType::Utf8, true),
3753 ArrowField::new(
3754 "vec",
3755 DataType::FixedSizeList(
3756 Arc::new(ArrowField::new("item", DataType::Float32, true)),
3757 dimension as i32,
3758 ),
3759 true,
3760 ),
3761 ],
3762 metadata,
3763 ));
3764
3765 let batches: Vec<RecordBatch> = (0..5)
3766 .map(|i| {
3767 let vector_values: Float32Array =
3768 (0..dimension * 80).map(|v| v as f32).collect();
3769 let vectors =
3770 FixedSizeListArray::try_new_from_values(vector_values, dimension as i32)
3771 .unwrap();
3772 RecordBatch::try_new(
3773 schema.clone(),
3774 vec![
3775 Arc::new(Int32Array::from_iter_values(i * 80..(i + 1) * 80)),
3776 Arc::new(StringArray::from_iter_values(
3777 (i * 80..(i + 1) * 80).map(|v| format!("s-{}", v)),
3778 )),
3779 Arc::new(vectors),
3780 ],
3781 )
3782 })
3783 .collect::<std::result::Result<Vec<_>, ArrowError>>()?;
3784
3785 let params = WriteParams {
3786 max_rows_per_group: 10,
3787 max_rows_per_file: 200,
3788 data_storage_version: Some(data_storage_version),
3789 enable_stable_row_ids: stable_row_ids,
3790 ..Default::default()
3791 };
3792 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
3793
3794 let dataset = Dataset::write(reader, &path, Some(params)).await?;
3795
3796 Ok(Self {
3797 tmp_dir: path,
3798 schema,
3799 dataset,
3800 dimension,
3801 })
3802 }
3803
3804 pub async fn make_vector_index(&mut self) -> Result<()> {
3805 let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2);
3806 self.dataset
3807 .create_index(
3808 &["vec"],
3809 IndexType::Vector,
3810 Some("idx".to_string()),
3811 ¶ms,
3812 true,
3813 )
3814 .await
3815 }
3816
3817 pub async fn make_scalar_index(&mut self) -> Result<()> {
3818 self.dataset
3819 .create_index(
3820 &["i"],
3821 IndexType::Scalar,
3822 None,
3823 &ScalarIndexParams::default(),
3824 true,
3825 )
3826 .await
3827 }
3828
3829 pub async fn make_fts_index(&mut self) -> Result<()> {
3830 let params = InvertedIndexParams::default().with_position(true);
3831 self.dataset
3832 .create_index(&["s"], IndexType::Inverted, None, ¶ms, true)
3833 .await
3834 }
3835
3836 pub async fn append_new_data(&mut self) -> Result<()> {
3837 let vector_values: Float32Array = (0..10)
3838 .flat_map(|i| vec![i as f32; self.dimension as usize].into_iter())
3839 .collect();
3840 let new_vectors =
3841 FixedSizeListArray::try_new_from_values(vector_values, self.dimension as i32)
3842 .unwrap();
3843 let new_data: Vec<ArrayRef> = vec![
3844 Arc::new(Int32Array::from_iter_values(400..410)), Arc::new(StringArray::from_iter_values(
3846 (400..410).map(|v| format!("s-{}", v)),
3847 )),
3848 Arc::new(new_vectors),
3849 ];
3850 let reader = RecordBatchIterator::new(
3851 vec![RecordBatch::try_new(self.schema.clone(), new_data).unwrap()]
3852 .into_iter()
3853 .map(Ok),
3854 self.schema.clone(),
3855 );
3856 self.dataset.append(reader, None).await?;
3857 Ok(())
3858 }
3859 }
3860}
3861
3862#[cfg(test)]
3863mod test {
3864
3865 use std::collections::BTreeSet;
3866 use std::time::{Duration, Instant};
3867 use std::vec;
3868
3869 use arrow::array::as_primitive_array;
3870 use arrow::datatypes::{Float64Type, Int32Type, Int64Type};
3871 use arrow_array::cast::AsArray;
3872 use arrow_array::types::{Float32Type, UInt64Type};
3873 use arrow_array::{
3874 ArrayRef, FixedSizeListArray, Float16Array, Int32Array, LargeStringArray, PrimitiveArray,
3875 RecordBatchIterator, StringArray, StructArray,
3876 };
3877
3878 use arrow_ord::sort::sort_to_indices;
3879 use arrow_schema::Fields;
3880 use arrow_select::take;
3881 use datafusion::logical_expr::{col, lit};
3882 use half::f16;
3883 use lance_arrow::SchemaExt;
3884 use lance_core::utils::tempfile::TempStrDir;
3885 use lance_core::{ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION};
3886 use lance_datagen::{
3887 array, gen_batch, ArrayGeneratorExt, BatchCount, ByteCount, Dimension, RowCount,
3888 };
3889 use lance_file::version::LanceFileVersion;
3890 use lance_index::optimize::OptimizeOptions;
3891 use lance_index::scalar::inverted::query::{MatchQuery, PhraseQuery};
3892 use lance_index::vector::hnsw::builder::HnswBuildParams;
3893 use lance_index::vector::ivf::IvfBuildParams;
3894 use lance_index::vector::pq::PQBuildParams;
3895 use lance_index::vector::sq::builder::SQBuildParams;
3896 use lance_index::{scalar::ScalarIndexParams, IndexType};
3897 use lance_io::assert_io_gt;
3898 use lance_io::object_store::ObjectStoreParams;
3899 use lance_io::utils::tracking_store::IOTracker;
3900 use lance_linalg::distance::DistanceType;
3901 use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
3902 use object_store::throttle::ThrottleConfig;
3903 use rstest::rstest;
3904
3905 use super::*;
3906 use crate::dataset::optimize::{compact_files, CompactionOptions};
3907 use crate::dataset::scanner::test_dataset::TestVectorDataset;
3908 use crate::dataset::WriteMode;
3909 use crate::dataset::WriteParams;
3910 use crate::index::vector::{StageParams, VectorIndexParams};
3911 use crate::utils::test::{
3912 assert_plan_node_equals, DatagenExt, FragmentCount, FragmentRowCount, ThrottledStoreWrapper,
3913 };
3914
3915 #[tokio::test]
3916 async fn test_batch_size() {
3917 let schema = Arc::new(ArrowSchema::new(vec![
3918 ArrowField::new("i", DataType::Int32, true),
3919 ArrowField::new("s", DataType::Utf8, true),
3920 ]));
3921
3922 let batches: Vec<RecordBatch> = (0..5)
3923 .map(|i| {
3924 RecordBatch::try_new(
3925 schema.clone(),
3926 vec![
3927 Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)),
3928 Arc::new(StringArray::from_iter_values(
3929 (i * 20..(i + 1) * 20).map(|v| format!("s-{}", v)),
3930 )),
3931 ],
3932 )
3933 .unwrap()
3934 })
3935 .collect();
3936
3937 for use_filter in [false, true] {
3938 let test_dir = TempStrDir::default();
3939 let test_uri = &test_dir;
3940 let write_params = WriteParams {
3941 max_rows_per_file: 40,
3942 max_rows_per_group: 10,
3943 ..Default::default()
3944 };
3945 let batches =
3946 RecordBatchIterator::new(batches.clone().into_iter().map(Ok), schema.clone());
3947 Dataset::write(batches, test_uri, Some(write_params))
3948 .await
3949 .unwrap();
3950
3951 let dataset = Dataset::open(test_uri).await.unwrap();
3952 let mut builder = dataset.scan();
3953 builder.batch_size(8);
3954 if use_filter {
3955 builder.filter("i IS NOT NULL").unwrap();
3956 }
3957 let mut stream = builder.try_into_stream().await.unwrap();
3958 let mut rows_read = 0;
3959 while let Some(next) = stream.next().await {
3960 let next = next.unwrap();
3961 let expected = 8.min(100 - rows_read);
3962 assert_eq!(next.num_rows(), expected);
3963 rows_read += next.num_rows();
3964 }
3965 }
3966 }
3967
3968 #[tokio::test]
3969 async fn test_strict_batch_size() {
3970 let dataset = lance_datagen::gen_batch()
3971 .col("x", array::step::<Int32Type>())
3972 .anon_col(array::step::<Int64Type>())
3973 .into_ram_dataset(FragmentCount::from(7), FragmentRowCount::from(6))
3974 .await
3975 .unwrap();
3976
3977 let mut scan = dataset.scan();
3978 scan.batch_size(10)
3979 .strict_batch_size(true)
3980 .filter("x % 2 == 0")
3981 .unwrap();
3982
3983 let batches = scan
3984 .try_into_stream()
3985 .await
3986 .unwrap()
3987 .try_collect::<Vec<_>>()
3988 .await
3989 .unwrap();
3990
3991 let batch_sizes = batches.iter().map(|b| b.num_rows()).collect::<Vec<_>>();
3992 assert_eq!(batch_sizes, vec![10, 10, 1]);
3993 }
3994
3995 #[tokio::test]
3996 async fn test_column_not_exist() {
3997 let dataset = lance_datagen::gen_batch()
3998 .col("x", array::step::<Int32Type>())
3999 .into_ram_dataset(FragmentCount::from(7), FragmentRowCount::from(6))
4000 .await
4001 .unwrap();
4002
4003 let check_err_msg = |r: Result<DatasetRecordBatchStream>| {
4004 let Err(err) = r else {
4005 panic!(
4006 "Expected an error to be raised saying column y is not found but got no error"
4007 )
4008 };
4009
4010 assert!(
4011 err.to_string().contains("No field named y"),
4012 "Expected error to contain 'No field named y' but got {}",
4013 err
4014 );
4015 };
4016
4017 let mut scan = dataset.scan();
4018 scan.project(&["x", "y"]).unwrap();
4019 check_err_msg(scan.try_into_stream().await);
4020
4021 let mut scan = dataset.scan();
4022 scan.project(&["y"]).unwrap();
4023 check_err_msg(scan.try_into_stream().await);
4024
4025 let mut scan = dataset.scan();
4028 scan.project_with_transform(&[("foo", "1")]).unwrap();
4029 match scan.try_into_stream().await {
4030 Ok(_) => panic!("Expected an error to be raised saying not supported"),
4031 Err(e) => {
4032 assert!(
4033 e.to_string().contains("Received only dynamic expressions"),
4034 "Expected error to contain 'Received only dynamic expressions' but got {}",
4035 e
4036 );
4037 }
4038 }
4039 }
4040
4041 #[cfg(not(windows))]
4042 #[tokio::test]
4043 async fn test_local_object_store() {
4044 let schema = Arc::new(ArrowSchema::new(vec![
4045 ArrowField::new("i", DataType::Int32, true),
4046 ArrowField::new("s", DataType::Utf8, true),
4047 ]));
4048
4049 let batches: Vec<RecordBatch> = (0..5)
4050 .map(|i| {
4051 RecordBatch::try_new(
4052 schema.clone(),
4053 vec![
4054 Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)),
4055 Arc::new(StringArray::from_iter_values(
4056 (i * 20..(i + 1) * 20).map(|v| format!("s-{}", v)),
4057 )),
4058 ],
4059 )
4060 .unwrap()
4061 })
4062 .collect();
4063
4064 let test_dir = TempStrDir::default();
4065 let test_uri = &test_dir;
4066 let write_params = WriteParams {
4067 max_rows_per_file: 40,
4068 max_rows_per_group: 10,
4069 ..Default::default()
4070 };
4071 let batches = RecordBatchIterator::new(batches.clone().into_iter().map(Ok), schema.clone());
4072 Dataset::write(batches, test_uri, Some(write_params))
4073 .await
4074 .unwrap();
4075
4076 let dataset = Dataset::open(&format!("file-object-store://{}", test_uri))
4077 .await
4078 .unwrap();
4079 let mut builder = dataset.scan();
4080 builder.batch_size(8);
4081 let mut stream = builder.try_into_stream().await.unwrap();
4082 let mut rows_read = 0;
4083 while let Some(next) = stream.next().await {
4084 let next = next.unwrap();
4085 let expected = 8.min(100 - rows_read);
4086 assert_eq!(next.num_rows(), expected);
4087 rows_read += next.num_rows();
4088 }
4089 }
4090
4091 #[tokio::test]
4092 async fn test_filter_parsing() -> Result<()> {
4093 let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false).await?;
4094 let dataset = &test_ds.dataset;
4095
4096 let mut scan = dataset.scan();
4097 assert!(scan.filter.is_none());
4098
4099 scan.filter("i > 50")?;
4100 assert_eq!(scan.get_filter().unwrap(), Some(col("i").gt(lit(50))));
4101
4102 for use_stats in [false, true] {
4103 let batches = scan
4104 .project(&["s"])?
4105 .use_stats(use_stats)
4106 .try_into_stream()
4107 .await?
4108 .try_collect::<Vec<_>>()
4109 .await?;
4110 let batch = concat_batches(&batches[0].schema(), &batches)?;
4111
4112 let expected_batch = RecordBatch::try_new(
4113 Arc::new(test_ds.schema.project(&[1])?),
4115 vec![Arc::new(StringArray::from_iter_values(
4116 (51..400).map(|v| format!("s-{}", v)),
4117 ))],
4118 )?;
4119 assert_eq!(batch, expected_batch);
4120 }
4121 Ok(())
4122 }
4123
4124 #[tokio::test]
4125 async fn test_scan_regexp_match_and_non_empty_captions() {
4126 let schema = Arc::new(ArrowSchema::new(vec![
4129 ArrowField::new("keywords", DataType::Utf8, true),
4130 ArrowField::new("natural_caption", DataType::Utf8, true),
4131 ArrowField::new("poetic_caption", DataType::Utf8, true),
4132 ]));
4133
4134 let batch = RecordBatch::try_new(
4135 schema.clone(),
4136 vec![
4137 Arc::new(StringArray::from(vec![
4138 Some("Liberty for all"),
4139 Some("peace"),
4140 Some("revolution now"),
4141 Some("Liberty"),
4142 Some("revolutionary"),
4143 Some("none"),
4144 ])) as ArrayRef,
4145 Arc::new(StringArray::from(vec![
4146 Some("a"),
4147 Some("b"),
4148 None,
4149 Some(""),
4150 Some("c"),
4151 Some("d"),
4152 ])) as ArrayRef,
4153 Arc::new(StringArray::from(vec![
4154 Some("x"),
4155 Some(""),
4156 Some("y"),
4157 Some("z"),
4158 None,
4159 Some("w"),
4160 ])) as ArrayRef,
4161 ],
4162 )
4163 .unwrap();
4164
4165 let reader = RecordBatchIterator::new(vec![Ok(batch.clone())], schema.clone());
4166 let dataset = Dataset::write(reader, "memory://", None).await.unwrap();
4167
4168 let mut scan = dataset.scan();
4169 scan.filter(
4170 "regexp_match(keywords, 'Liberty|revolution') AND \
4171 (natural_caption IS NOT NULL AND natural_caption <> '' AND \
4172 poetic_caption IS NOT NULL AND poetic_caption <> '')",
4173 )
4174 .unwrap();
4175
4176 let out = scan.try_into_batch().await.unwrap();
4177 assert_eq!(out.num_rows(), 1);
4178
4179 let out_keywords = out
4180 .column_by_name("keywords")
4181 .unwrap()
4182 .as_string::<i32>()
4183 .value(0);
4184 let out_nat = out
4185 .column_by_name("natural_caption")
4186 .unwrap()
4187 .as_string::<i32>()
4188 .value(0);
4189 let out_poetic = out
4190 .column_by_name("poetic_caption")
4191 .unwrap()
4192 .as_string::<i32>()
4193 .value(0);
4194
4195 assert_eq!(out_keywords, "Liberty for all");
4196 assert_eq!(out_nat, "a");
4197 assert_eq!(out_poetic, "x");
4198 }
4199
4200 #[tokio::test]
4201 async fn test_nested_projection() {
4202 let point_fields: Fields = vec![
4203 ArrowField::new("x", DataType::Float32, true),
4204 ArrowField::new("y", DataType::Float32, true),
4205 ]
4206 .into();
4207 let metadata_fields: Fields = vec![
4208 ArrowField::new("location", DataType::Struct(point_fields), true),
4209 ArrowField::new("age", DataType::Int32, true),
4210 ]
4211 .into();
4212 let metadata_field = ArrowField::new("metadata", DataType::Struct(metadata_fields), true);
4213 let schema = Arc::new(ArrowSchema::new(vec![
4214 metadata_field,
4215 ArrowField::new("idx", DataType::Int32, true),
4216 ]));
4217 let data = lance_datagen::rand(&schema)
4218 .into_ram_dataset(FragmentCount::from(7), FragmentRowCount::from(6))
4219 .await
4220 .unwrap();
4221
4222 let mut scan = data.scan();
4223 scan.project(&["metadata.location.x", "metadata.age"])
4224 .unwrap();
4225 let batch = scan.try_into_batch().await.unwrap();
4226
4227 assert_eq!(
4228 batch.schema().as_ref(),
4229 &ArrowSchema::new(vec![
4230 ArrowField::new("metadata.location.x", DataType::Float32, true),
4231 ArrowField::new("metadata.age", DataType::Int32, true),
4232 ])
4233 );
4234
4235 let take_schema = data.schema().project_by_ids(&[0, 2, 4], false);
4239
4240 let taken = data.take_rows(&[0, 5], take_schema).await.unwrap();
4241
4242 let part_point_fields = Fields::from(vec![ArrowField::new("x", DataType::Float32, true)]);
4244 let part_metadata_fields = Fields::from(vec![
4245 ArrowField::new("location", DataType::Struct(part_point_fields), true),
4246 ArrowField::new("age", DataType::Int32, true),
4247 ]);
4248 let part_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
4249 "metadata",
4250 DataType::Struct(part_metadata_fields),
4251 true,
4252 )]));
4253
4254 assert_eq!(taken.schema(), part_schema);
4255 }
4256
4257 #[rstest]
4258 #[tokio::test]
4259 async fn test_limit(
4260 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4261 data_storage_version: LanceFileVersion,
4262 ) -> Result<()> {
4263 let test_ds = TestVectorDataset::new(data_storage_version, false).await?;
4264 let dataset = &test_ds.dataset;
4265
4266 let full_data = dataset.scan().try_into_batch().await?.slice(19, 2);
4267
4268 let actual = dataset
4269 .scan()
4270 .limit(Some(2), Some(19))?
4271 .try_into_batch()
4272 .await?;
4273
4274 assert_eq!(actual.num_rows(), 2);
4275 assert_eq!(actual, full_data);
4276 Ok(())
4277 }
4278
4279 #[test_log::test(tokio::test)]
4280 async fn test_limit_cancel() {
4281 let throttled = Arc::new(ThrottledStoreWrapper {
4290 config: ThrottleConfig {
4291 wait_get_per_call: Duration::from_secs(1),
4292 ..Default::default()
4293 },
4294 });
4295 let write_params = WriteParams {
4296 store_params: Some(ObjectStoreParams {
4297 object_store_wrapper: Some(throttled.clone()),
4298 ..Default::default()
4299 }),
4300 max_rows_per_file: 1,
4301 ..Default::default()
4302 };
4303
4304 let dataset = gen_batch()
4306 .col("i", array::step::<Int32Type>().with_random_nulls(0.1))
4307 .into_ram_dataset_with_params(
4308 FragmentCount::from(2000),
4309 FragmentRowCount::from(1),
4310 Some(write_params),
4311 )
4312 .await
4313 .unwrap();
4314
4315 let mut scan = dataset.scan();
4316 scan.filter("i IS NOT NULL").unwrap();
4317 scan.limit(Some(10), None).unwrap();
4318
4319 let start = Instant::now();
4320 scan.try_into_stream()
4321 .await
4322 .unwrap()
4323 .try_collect::<Vec<_>>()
4324 .await
4325 .unwrap();
4326 let duration = start.elapsed();
4327
4328 assert!(duration < Duration::from_secs(10));
4332 }
4333
4334 #[rstest]
4335 #[tokio::test]
4336 async fn test_knn_nodes(
4337 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4338 data_storage_version: LanceFileVersion,
4339 #[values(false, true)] stable_row_ids: bool,
4340 #[values(false, true)] build_index: bool,
4341 ) {
4342 let mut test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4343 .await
4344 .unwrap();
4345 if build_index {
4346 test_ds.make_vector_index().await.unwrap();
4347 }
4348 let dataset = &test_ds.dataset;
4349
4350 let mut scan = dataset.scan();
4351 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4352 scan.nearest("vec", &key, 5).unwrap();
4353 scan.refine(5);
4354
4355 let batch = scan.try_into_batch().await.unwrap();
4356
4357 assert_eq!(batch.num_rows(), 5);
4358 assert_eq!(
4359 batch.schema().as_ref(),
4360 &ArrowSchema::new(vec![
4361 ArrowField::new("i", DataType::Int32, true),
4362 ArrowField::new("s", DataType::Utf8, true),
4363 ArrowField::new(
4364 "vec",
4365 DataType::FixedSizeList(
4366 Arc::new(ArrowField::new("item", DataType::Float32, true)),
4367 32,
4368 ),
4369 true,
4370 ),
4371 ArrowField::new(DIST_COL, DataType::Float32, true),
4372 ])
4373 .with_metadata([("dataset".into(), "vector".into())].into())
4374 );
4375
4376 let expected_i = BTreeSet::from_iter(vec![1, 81, 161, 241, 321]);
4377 let column_i = batch.column_by_name("i").unwrap();
4378 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
4379 .values()
4380 .iter()
4381 .copied()
4382 .collect();
4383 assert_eq!(expected_i, actual_i);
4384 }
4385
4386 #[rstest]
4387 #[tokio::test]
4388 async fn test_can_project_distance() {
4389 let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, true)
4390 .await
4391 .unwrap();
4392 let dataset = &test_ds.dataset;
4393
4394 let mut scan = dataset.scan();
4395 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4396 scan.nearest("vec", &key, 5).unwrap();
4397 scan.refine(5);
4398 scan.project(&["_distance"]).unwrap();
4399
4400 let batch = scan.try_into_batch().await.unwrap();
4401
4402 assert_eq!(batch.num_rows(), 5);
4403 assert_eq!(batch.num_columns(), 1);
4404 }
4405
4406 #[rstest]
4407 #[tokio::test]
4408 async fn test_knn_with_new_data(
4409 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4410 data_storage_version: LanceFileVersion,
4411 #[values(false, true)] stable_row_ids: bool,
4412 ) {
4413 let mut test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4414 .await
4415 .unwrap();
4416 test_ds.make_vector_index().await.unwrap();
4417 test_ds.append_new_data().await.unwrap();
4418 let dataset = &test_ds.dataset;
4419
4420 let key: Float32Array = [0f32; 32].into_iter().collect();
4422 let k = 20;
4425
4426 #[derive(Debug)]
4427 struct TestCase {
4428 filter: Option<&'static str>,
4429 limit: Option<i64>,
4430 use_index: bool,
4431 }
4432
4433 let mut cases = vec![];
4434 for filter in [Some("i > 100"), None] {
4435 for limit in [None, Some(10)] {
4436 for use_index in [true, false] {
4437 cases.push(TestCase {
4438 filter,
4439 limit,
4440 use_index,
4441 });
4442 }
4443 }
4444 }
4445
4446 for case in cases {
4448 let mut scanner = dataset.scan();
4449 scanner
4450 .nearest("vec", &key, k)
4451 .unwrap()
4452 .limit(case.limit, None)
4453 .unwrap()
4454 .refine(3)
4455 .use_index(case.use_index);
4456 if let Some(filter) = case.filter {
4457 scanner.filter(filter).unwrap();
4458 }
4459
4460 let result = scanner
4461 .try_into_stream()
4462 .await
4463 .unwrap()
4464 .try_collect::<Vec<_>>()
4465 .await
4466 .unwrap();
4467 assert!(!result.is_empty());
4468 let result = concat_batches(&result[0].schema(), result.iter()).unwrap();
4469
4470 if case.filter.is_some() {
4471 let result_rows = result.num_rows();
4472 let expected_rows = case.limit.unwrap_or(k as i64) as usize;
4473 assert!(
4474 result_rows <= expected_rows,
4475 "Expected less than {} rows, got {}",
4476 expected_rows,
4477 result_rows
4478 );
4479 } else {
4480 assert_eq!(result.num_rows(), case.limit.unwrap_or(k as i64) as usize);
4482 }
4483
4484 assert_eq!(
4486 as_primitive_array::<Int32Type>(result.column(0).as_ref()).value(0),
4487 400
4488 );
4489 }
4490 }
4491
4492 #[rstest]
4493 #[tokio::test]
4494 async fn test_knn_with_prefilter(
4495 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4496 data_storage_version: LanceFileVersion,
4497 #[values(false, true)] stable_row_ids: bool,
4498 ) {
4499 let mut test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4500 .await
4501 .unwrap();
4502 test_ds.make_vector_index().await.unwrap();
4503 let dataset = &test_ds.dataset;
4504
4505 let mut scan = dataset.scan();
4506 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4507 scan.filter("i > 100").unwrap();
4508 scan.prefilter(true);
4509 scan.project(&["i", "vec"]).unwrap();
4510 scan.nearest("vec", &key, 5).unwrap();
4511 scan.use_index(false);
4512
4513 let results = scan
4514 .try_into_stream()
4515 .await
4516 .unwrap()
4517 .try_collect::<Vec<_>>()
4518 .await
4519 .unwrap();
4520
4521 assert_eq!(results.len(), 1);
4522 let batch = &results[0];
4523
4524 assert_eq!(batch.num_rows(), 5);
4525 assert_eq!(
4526 batch.schema().as_ref(),
4527 &ArrowSchema::new(vec![
4528 ArrowField::new("i", DataType::Int32, true),
4529 ArrowField::new(
4530 "vec",
4531 DataType::FixedSizeList(
4532 Arc::new(ArrowField::new("item", DataType::Float32, true)),
4533 32,
4534 ),
4535 true,
4536 ),
4537 ArrowField::new(DIST_COL, DataType::Float32, true),
4538 ])
4539 .with_metadata([("dataset".into(), "vector".into())].into())
4540 );
4541
4542 let exact_i = BTreeSet::from_iter(vec![161, 241, 321]);
4544 let close_i = BTreeSet::from_iter(vec![161, 241, 321, 160, 162, 240, 242, 320, 322]);
4546 let column_i = batch.column_by_name("i").unwrap();
4547 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
4548 .values()
4549 .iter()
4550 .copied()
4551 .collect();
4552 assert!(exact_i.is_subset(&actual_i));
4553 assert!(actual_i.is_subset(&close_i));
4554 }
4555
4556 #[rstest]
4557 #[tokio::test]
4558 async fn test_knn_filter_new_data(
4559 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4560 data_storage_version: LanceFileVersion,
4561 #[values(false, true)] stable_row_ids: bool,
4562 ) {
4563 let mut test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4567 .await
4568 .unwrap();
4569 test_ds.make_vector_index().await.unwrap();
4570 test_ds.append_new_data().await.unwrap();
4571 let dataset = &test_ds.dataset;
4572
4573 let key: Float32Array = [0f32; 32].into_iter().collect();
4575
4576 let mut query = dataset.scan();
4577 query.nearest("vec", &key, 20).unwrap();
4578
4579 let results = query
4581 .try_into_stream()
4582 .await
4583 .unwrap()
4584 .try_collect::<Vec<_>>()
4585 .await
4586 .unwrap();
4587
4588 let results_i = results[0]["i"]
4589 .as_primitive::<Int32Type>()
4590 .values()
4591 .iter()
4592 .copied()
4593 .collect::<BTreeSet<_>>();
4594
4595 assert!(results_i.contains(&400));
4596
4597 for prefilter in [false, true] {
4599 let mut query = dataset.scan();
4600 query
4601 .filter("i != 400")
4602 .unwrap()
4603 .prefilter(prefilter)
4604 .nearest("vec", &key, 20)
4605 .unwrap();
4606
4607 let results = query
4608 .try_into_stream()
4609 .await
4610 .unwrap()
4611 .try_collect::<Vec<_>>()
4612 .await
4613 .unwrap();
4614
4615 let results_i = results[0]["i"]
4616 .as_primitive::<Int32Type>()
4617 .values()
4618 .iter()
4619 .copied()
4620 .collect::<BTreeSet<_>>();
4621
4622 assert!(!results_i.contains(&400));
4623 }
4624 }
4625
4626 #[rstest]
4627 #[tokio::test]
4628 async fn test_knn_with_filter(
4629 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4630 data_storage_version: LanceFileVersion,
4631 #[values(false, true)] stable_row_ids: bool,
4632 ) {
4633 let test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4634 .await
4635 .unwrap();
4636 let dataset = &test_ds.dataset;
4637
4638 let mut scan = dataset.scan();
4639 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4640 scan.nearest("vec", &key, 5).unwrap();
4641 scan.filter("i > 100").unwrap();
4642 scan.project(&["i", "vec"]).unwrap();
4643 scan.refine(5);
4644
4645 let results = scan
4646 .try_into_stream()
4647 .await
4648 .unwrap()
4649 .try_collect::<Vec<_>>()
4650 .await
4651 .unwrap();
4652
4653 assert_eq!(results.len(), 1);
4654 let batch = &results[0];
4655
4656 assert_eq!(batch.num_rows(), 3);
4657 assert_eq!(
4658 batch.schema().as_ref(),
4659 &ArrowSchema::new(vec![
4660 ArrowField::new("i", DataType::Int32, true),
4661 ArrowField::new(
4662 "vec",
4663 DataType::FixedSizeList(
4664 Arc::new(ArrowField::new("item", DataType::Float32, true)),
4665 32,
4666 ),
4667 true,
4668 ),
4669 ArrowField::new(DIST_COL, DataType::Float32, true),
4670 ])
4671 .with_metadata([("dataset".into(), "vector".into())].into())
4672 );
4673
4674 let expected_i = BTreeSet::from_iter(vec![161, 241, 321]);
4675 let column_i = batch.column_by_name("i").unwrap();
4676 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
4677 .values()
4678 .iter()
4679 .copied()
4680 .collect();
4681 assert_eq!(expected_i, actual_i);
4682 }
4683
4684 #[rstest]
4685 #[tokio::test]
4686 async fn test_refine_factor(
4687 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4688 data_storage_version: LanceFileVersion,
4689 #[values(false, true)] stable_row_ids: bool,
4690 ) {
4691 let test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4692 .await
4693 .unwrap();
4694 let dataset = &test_ds.dataset;
4695
4696 let mut scan = dataset.scan();
4697 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4698 scan.nearest("vec", &key, 5).unwrap();
4699 scan.refine(5);
4700
4701 let results = scan
4702 .try_into_stream()
4703 .await
4704 .unwrap()
4705 .try_collect::<Vec<_>>()
4706 .await
4707 .unwrap();
4708
4709 assert_eq!(results.len(), 1);
4710 let batch = &results[0];
4711
4712 assert_eq!(batch.num_rows(), 5);
4713 assert_eq!(
4714 batch.schema().as_ref(),
4715 &ArrowSchema::new(vec![
4716 ArrowField::new("i", DataType::Int32, true),
4717 ArrowField::new("s", DataType::Utf8, true),
4718 ArrowField::new(
4719 "vec",
4720 DataType::FixedSizeList(
4721 Arc::new(ArrowField::new("item", DataType::Float32, true)),
4722 32,
4723 ),
4724 true,
4725 ),
4726 ArrowField::new(DIST_COL, DataType::Float32, true),
4727 ])
4728 .with_metadata([("dataset".into(), "vector".into())].into())
4729 );
4730
4731 let expected_i = BTreeSet::from_iter(vec![1, 81, 161, 241, 321]);
4732 let column_i = batch.column_by_name("i").unwrap();
4733 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
4734 .values()
4735 .iter()
4736 .copied()
4737 .collect();
4738 assert_eq!(expected_i, actual_i);
4739 }
4740
4741 #[rstest]
4742 #[tokio::test]
4743 async fn test_only_row_id(
4744 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4745 data_storage_version: LanceFileVersion,
4746 ) {
4747 let test_ds = TestVectorDataset::new(data_storage_version, false)
4748 .await
4749 .unwrap();
4750 let dataset = &test_ds.dataset;
4751
4752 let mut scan = dataset.scan();
4753 scan.project::<&str>(&[]).unwrap().with_row_id();
4754
4755 let batch = scan.try_into_batch().await.unwrap();
4756
4757 assert_eq!(batch.num_columns(), 1);
4758 assert_eq!(batch.num_rows(), 400);
4759 let expected_schema =
4760 ArrowSchema::new(vec![ArrowField::new(ROW_ID, DataType::UInt64, true)])
4761 .with_metadata(dataset.schema().metadata.clone());
4762 assert_eq!(batch.schema().as_ref(), &expected_schema,);
4763
4764 let expected_row_ids: Vec<u64> = (0..200_u64).chain((1 << 32)..((1 << 32) + 200)).collect();
4765 let actual_row_ids: Vec<u64> = as_primitive_array::<UInt64Type>(batch.column(0).as_ref())
4766 .values()
4767 .iter()
4768 .copied()
4769 .collect();
4770 assert_eq!(expected_row_ids, actual_row_ids);
4771 }
4772
4773 #[tokio::test]
4774 async fn test_scan_unordered_with_row_id() {
4775 let test_ds = TestVectorDataset::new(LanceFileVersion::Legacy, false)
4777 .await
4778 .unwrap();
4779 let dataset = &test_ds.dataset;
4780
4781 let mut scan = dataset.scan();
4782 scan.with_row_id();
4783
4784 let ordered_batches = scan
4785 .try_into_stream()
4786 .await
4787 .unwrap()
4788 .try_collect::<Vec<RecordBatch>>()
4789 .await
4790 .unwrap();
4791 assert!(ordered_batches.len() > 2);
4792 let ordered_batch =
4793 concat_batches(&ordered_batches[0].schema(), ordered_batches.iter()).unwrap();
4794
4795 scan.scan_in_order(false);
4797 for _ in 0..10 {
4798 let unordered_batches = scan
4799 .try_into_stream()
4800 .await
4801 .unwrap()
4802 .try_collect::<Vec<RecordBatch>>()
4803 .await
4804 .unwrap();
4805 let unordered_batch =
4806 concat_batches(&unordered_batches[0].schema(), unordered_batches.iter()).unwrap();
4807
4808 assert_eq!(ordered_batch.num_rows(), unordered_batch.num_rows());
4809
4810 if ordered_batch != unordered_batch {
4812 let sort_indices = sort_to_indices(&unordered_batch[ROW_ID], None, None).unwrap();
4813
4814 let ordered_i = ordered_batch["i"].clone();
4815 let sorted_i = take::take(&unordered_batch["i"], &sort_indices, None).unwrap();
4816
4817 assert_eq!(&ordered_i, &sorted_i);
4818
4819 break;
4820 }
4821 }
4822 }
4823
4824 #[tokio::test]
4825 async fn test_scan_with_wildcard() {
4826 let data = gen_batch()
4827 .col("x", array::step::<Float64Type>())
4828 .col("y", array::step::<Float64Type>())
4829 .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(100))
4830 .await
4831 .unwrap();
4832
4833 let check_cols = async |projection: &[&str], expected_cols: &[&str]| {
4834 let mut scan = data.scan();
4835 scan.project(projection).unwrap();
4836 let stream = scan.try_into_stream().await.unwrap();
4837 let schema = stream.schema();
4838 let field_names = schema.field_names();
4839 assert_eq!(field_names, expected_cols);
4840 };
4841
4842 check_cols(&["*"], &["x", "y"]).await;
4843 check_cols(&["x", "y"], &["x", "y"]).await;
4844 check_cols(&["x"], &["x"]).await;
4845 check_cols(&["_rowid", "*"], &["_rowid", "x", "y"]).await;
4846 check_cols(&["*", "_rowid"], &["x", "y", "_rowid"]).await;
4847 check_cols(
4848 &["_rowid", "*", "_rowoffset"],
4849 &["_rowid", "x", "y", "_rowoffset"],
4850 )
4851 .await;
4852
4853 let check_exprs = async |exprs: &[&str], expected_cols: &[&str]| {
4854 let mut scan = data.scan();
4855 let projection = exprs
4856 .iter()
4857 .map(|e| (e.to_string(), e.to_string()))
4858 .collect::<Vec<_>>();
4859 scan.project_with_transform(&projection).unwrap();
4860 let stream = scan.try_into_stream().await.unwrap();
4861 let schema = stream.schema();
4862 let field_names = schema.field_names();
4863 assert_eq!(field_names, expected_cols);
4864 };
4865
4866 check_exprs(&["_rowid", "*", "x * 2"], &["_rowid", "x", "y", "x * 2"]).await;
4868
4869 let check_fails = |projection: &[&str]| {
4870 let mut scan = data.scan();
4871 assert!(scan.project(projection).is_err());
4872 };
4873
4874 check_fails(&["x", "*"]);
4876 check_fails(&["_rowid", "_rowid"]);
4877 }
4878
4879 #[rstest]
4880 #[tokio::test]
4881 async fn test_scan_order(
4882 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4883 data_storage_version: LanceFileVersion,
4884 ) {
4885 let test_dir = TempStrDir::default();
4886 let test_uri = &test_dir;
4887
4888 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
4889 "i",
4890 DataType::Int32,
4891 true,
4892 )]));
4893
4894 let batch1 = RecordBatch::try_new(
4895 schema.clone(),
4896 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
4897 )
4898 .unwrap();
4899
4900 let batch2 = RecordBatch::try_new(
4901 schema.clone(),
4902 vec![Arc::new(Int32Array::from(vec![6, 7, 8]))],
4903 )
4904 .unwrap();
4905
4906 let params = WriteParams {
4907 mode: WriteMode::Append,
4908 data_storage_version: Some(data_storage_version),
4909 ..Default::default()
4910 };
4911
4912 let write_batch = |batch: RecordBatch| async {
4913 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
4914 Dataset::write(reader, test_uri, Some(params)).await
4915 };
4916
4917 write_batch.clone()(batch1.clone()).await.unwrap();
4918 write_batch(batch2.clone()).await.unwrap();
4919
4920 let dataset = Arc::new(Dataset::open(test_uri).await.unwrap());
4921 let fragment1 = dataset.get_fragment(0).unwrap().metadata().clone();
4922 let fragment2 = dataset.get_fragment(1).unwrap().metadata().clone();
4923
4924 let mut scanner = dataset.scan();
4926 scanner.with_fragments(vec![fragment1.clone(), fragment2.clone()]);
4927 let output = scanner
4928 .try_into_stream()
4929 .await
4930 .unwrap()
4931 .try_collect::<Vec<_>>()
4932 .await
4933 .unwrap();
4934 assert_eq!(output.len(), 2);
4935 assert_eq!(output[0], batch1);
4936 assert_eq!(output[1], batch2);
4937
4938 let mut scanner = dataset.scan();
4940 scanner.with_fragments(vec![fragment2, fragment1]);
4941 let output = scanner
4942 .try_into_stream()
4943 .await
4944 .unwrap()
4945 .try_collect::<Vec<_>>()
4946 .await
4947 .unwrap();
4948 assert_eq!(output.len(), 2);
4949 assert_eq!(output[0], batch2);
4950 assert_eq!(output[1], batch1);
4951 }
4952
4953 #[rstest]
4954 #[tokio::test]
4955 async fn test_scan_sort(
4956 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4957 data_storage_version: LanceFileVersion,
4958 ) {
4959 let test_dir = TempStrDir::default();
4960 let test_uri = &test_dir;
4961
4962 let data = gen_batch()
4963 .col("int", array::cycle::<Int32Type>(vec![5, 4, 1, 2, 3]))
4964 .col(
4965 "str",
4966 array::cycle_utf8_literals(&["a", "b", "c", "e", "d"]),
4967 );
4968
4969 let sorted_by_int = gen_batch()
4970 .col("int", array::cycle::<Int32Type>(vec![1, 2, 3, 4, 5]))
4971 .col(
4972 "str",
4973 array::cycle_utf8_literals(&["c", "e", "d", "b", "a"]),
4974 )
4975 .into_batch_rows(RowCount::from(5))
4976 .unwrap();
4977
4978 let sorted_by_str = gen_batch()
4979 .col("int", array::cycle::<Int32Type>(vec![5, 4, 1, 3, 2]))
4980 .col(
4981 "str",
4982 array::cycle_utf8_literals(&["a", "b", "c", "d", "e"]),
4983 )
4984 .into_batch_rows(RowCount::from(5))
4985 .unwrap();
4986
4987 Dataset::write(
4988 data.into_reader_rows(RowCount::from(5), BatchCount::from(1)),
4989 test_uri,
4990 Some(WriteParams {
4991 data_storage_version: Some(data_storage_version),
4992 ..Default::default()
4993 }),
4994 )
4995 .await
4996 .unwrap();
4997
4998 let dataset = Arc::new(Dataset::open(test_uri).await.unwrap());
4999
5000 let batches_by_int = dataset
5001 .scan()
5002 .order_by(Some(vec![ColumnOrdering::asc_nulls_first(
5003 "int".to_string(),
5004 )]))
5005 .unwrap()
5006 .try_into_stream()
5007 .await
5008 .unwrap()
5009 .try_collect::<Vec<_>>()
5010 .await
5011 .unwrap();
5012
5013 assert_eq!(batches_by_int[0], sorted_by_int);
5014
5015 let batches_by_str = dataset
5016 .scan()
5017 .order_by(Some(vec![ColumnOrdering::asc_nulls_first(
5018 "str".to_string(),
5019 )]))
5020 .unwrap()
5021 .try_into_stream()
5022 .await
5023 .unwrap()
5024 .try_collect::<Vec<_>>()
5025 .await
5026 .unwrap();
5027
5028 assert_eq!(batches_by_str[0], sorted_by_str);
5029
5030 dataset
5032 .scan()
5033 .order_by(Some(vec![]))
5034 .unwrap()
5035 .try_into_stream()
5036 .await
5037 .unwrap()
5038 .try_collect::<Vec<_>>()
5039 .await
5040 .unwrap();
5041 }
5042
5043 #[rstest]
5044 #[tokio::test]
5045 async fn test_sort_multi_columns(
5046 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5047 data_storage_version: LanceFileVersion,
5048 ) {
5049 let test_dir = TempStrDir::default();
5050 let test_uri = &test_dir;
5051
5052 let data = gen_batch()
5053 .col("int", array::cycle::<Int32Type>(vec![5, 5, 1, 1, 3]))
5054 .col(
5055 "float",
5056 array::cycle::<Float32Type>(vec![7.3, -f32::NAN, f32::NAN, 4.3, f32::INFINITY]),
5057 );
5058
5059 let sorted_by_int_then_float = gen_batch()
5060 .col("int", array::cycle::<Int32Type>(vec![1, 1, 3, 5, 5]))
5061 .col(
5062 "float",
5063 array::cycle::<Float32Type>(vec![4.3, f32::NAN, f32::INFINITY, -f32::NAN, 7.3]),
5065 )
5066 .into_batch_rows(RowCount::from(5))
5067 .unwrap();
5068
5069 Dataset::write(
5070 data.into_reader_rows(RowCount::from(5), BatchCount::from(1)),
5071 test_uri,
5072 Some(WriteParams {
5073 data_storage_version: Some(data_storage_version),
5074 ..Default::default()
5075 }),
5076 )
5077 .await
5078 .unwrap();
5079
5080 let dataset = Arc::new(Dataset::open(test_uri).await.unwrap());
5081
5082 let batches_by_int_then_float = dataset
5083 .scan()
5084 .order_by(Some(vec![
5085 ColumnOrdering::asc_nulls_first("int".to_string()),
5086 ColumnOrdering::asc_nulls_first("float".to_string()),
5087 ]))
5088 .unwrap()
5089 .try_into_stream()
5090 .await
5091 .unwrap()
5092 .try_collect::<Vec<_>>()
5093 .await
5094 .unwrap();
5095
5096 assert_eq!(batches_by_int_then_float[0], sorted_by_int_then_float);
5097 }
5098
5099 #[rstest]
5100 #[tokio::test]
5101 async fn test_ann_prefilter(
5102 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5103 data_storage_version: LanceFileVersion,
5104 #[values(false, true)] stable_row_ids: bool,
5105 #[values(
5106 VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2),
5107 VectorIndexParams::with_ivf_hnsw_sq_params(
5108 MetricType::L2,
5109 IvfBuildParams::new(2),
5110 HnswBuildParams::default(),
5111 SQBuildParams::default()
5112 )
5113 )]
5114 index_params: VectorIndexParams,
5115 ) {
5116 use lance_arrow::{fixed_size_list_type, FixedSizeListArrayExt};
5117
5118 let test_dir = TempStrDir::default();
5119 let test_uri = &test_dir;
5120
5121 let schema = Arc::new(ArrowSchema::new(vec![
5122 ArrowField::new("filterable", DataType::Int32, true),
5123 ArrowField::new("vector", fixed_size_list_type(2, DataType::Float32), true),
5124 ]));
5125
5126 let vector_values = Float32Array::from_iter_values((0..600).map(|x| x as f32));
5127
5128 let batches = vec![RecordBatch::try_new(
5129 schema.clone(),
5130 vec![
5131 Arc::new(Int32Array::from_iter_values(0..300)),
5132 Arc::new(FixedSizeListArray::try_new_from_values(vector_values, 2).unwrap()),
5133 ],
5134 )
5135 .unwrap()];
5136
5137 let write_params = WriteParams {
5138 data_storage_version: Some(data_storage_version),
5139 max_rows_per_file: 300, enable_stable_row_ids: stable_row_ids,
5141 ..Default::default()
5142 };
5143 let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
5144 let mut dataset = Dataset::write(batches, test_uri, Some(write_params))
5145 .await
5146 .unwrap();
5147
5148 dataset
5149 .create_index(&["vector"], IndexType::Vector, None, &index_params, false)
5150 .await
5151 .unwrap();
5152
5153 let query_key = Arc::new(Float32Array::from_iter_values((0..2).map(|x| x as f32)));
5154 let mut scan = dataset.scan();
5155 scan.filter("filterable > 5").unwrap();
5156 scan.nearest("vector", query_key.as_ref(), 1).unwrap();
5157 scan.minimum_nprobes(100);
5158 scan.with_row_id();
5159
5160 let batches = scan
5161 .try_into_stream()
5162 .await
5163 .unwrap()
5164 .try_collect::<Vec<_>>()
5165 .await
5166 .unwrap();
5167
5168 assert_eq!(batches.len(), 0);
5169
5170 scan.prefilter(true);
5171
5172 let batches = scan
5173 .try_into_stream()
5174 .await
5175 .unwrap()
5176 .try_collect::<Vec<_>>()
5177 .await
5178 .unwrap();
5179 assert_eq!(batches.len(), 1);
5180
5181 let first_match = batches[0][ROW_ID].as_primitive::<UInt64Type>().values()[0];
5182
5183 assert_eq!(6, first_match);
5184 }
5185
5186 #[rstest]
5187 #[tokio::test]
5188 async fn test_filter_on_large_utf8(
5189 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5190 data_storage_version: LanceFileVersion,
5191 ) {
5192 let test_dir = TempStrDir::default();
5193 let test_uri = &test_dir;
5194
5195 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
5196 "ls",
5197 DataType::LargeUtf8,
5198 true,
5199 )]));
5200
5201 let batches = vec![RecordBatch::try_new(
5202 schema.clone(),
5203 vec![Arc::new(LargeStringArray::from_iter_values(
5204 (0..10).map(|v| format!("s-{}", v)),
5205 ))],
5206 )
5207 .unwrap()];
5208
5209 let write_params = WriteParams {
5210 data_storage_version: Some(data_storage_version),
5211 ..Default::default()
5212 };
5213 let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
5214 Dataset::write(batches, test_uri, Some(write_params))
5215 .await
5216 .unwrap();
5217
5218 let dataset = Dataset::open(test_uri).await.unwrap();
5219 let mut scan = dataset.scan();
5220 scan.filter("ls = 's-8'").unwrap();
5221
5222 let batches = scan
5223 .try_into_stream()
5224 .await
5225 .unwrap()
5226 .try_collect::<Vec<_>>()
5227 .await
5228 .unwrap();
5229 let batch = &batches[0];
5230
5231 let expected = RecordBatch::try_new(
5232 schema.clone(),
5233 vec![Arc::new(LargeStringArray::from_iter_values(
5234 (8..9).map(|v| format!("s-{}", v)),
5235 ))],
5236 )
5237 .unwrap();
5238
5239 assert_eq!(batch, &expected);
5240 }
5241
5242 #[rstest]
5243 #[tokio::test]
5244 async fn test_filter_with_regex(
5245 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5246 data_storage_version: LanceFileVersion,
5247 ) {
5248 let test_dir = TempStrDir::default();
5249 let test_uri = &test_dir;
5250
5251 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
5252 "ls",
5253 DataType::Utf8,
5254 true,
5255 )]));
5256
5257 let batches = vec![RecordBatch::try_new(
5258 schema.clone(),
5259 vec![Arc::new(StringArray::from_iter_values(
5260 (0..20).map(|v| format!("s-{}", v)),
5261 ))],
5262 )
5263 .unwrap()];
5264
5265 let write_params = WriteParams {
5266 data_storage_version: Some(data_storage_version),
5267 ..Default::default()
5268 };
5269 let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
5270 Dataset::write(batches, test_uri, Some(write_params))
5271 .await
5272 .unwrap();
5273
5274 let dataset = Dataset::open(test_uri).await.unwrap();
5275 let mut scan = dataset.scan();
5276 scan.filter("regexp_match(ls, 's-1.')").unwrap();
5277
5278 let stream = scan.try_into_stream().await.unwrap();
5279 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
5280 let batch = &batches[0];
5281
5282 let expected = RecordBatch::try_new(
5283 schema.clone(),
5284 vec![Arc::new(StringArray::from_iter_values(
5285 (10..=19).map(|v| format!("s-{}", v)),
5286 ))],
5287 )
5288 .unwrap();
5289
5290 assert_eq!(batch, &expected);
5291 }
5292
5293 #[tokio::test]
5294 async fn test_filter_proj_bug() {
5295 let struct_i_field = ArrowField::new("i", DataType::Int32, true);
5296 let struct_o_field = ArrowField::new("o", DataType::Utf8, true);
5297 let schema = Arc::new(ArrowSchema::new(vec![
5298 ArrowField::new(
5299 "struct",
5300 DataType::Struct(vec![struct_i_field.clone(), struct_o_field.clone()].into()),
5301 true,
5302 ),
5303 ArrowField::new("s", DataType::Utf8, true),
5304 ]));
5305
5306 let input_batches: Vec<RecordBatch> = (0..5)
5307 .map(|i| {
5308 let struct_i_arr: Arc<Int32Array> =
5309 Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20));
5310 let struct_o_arr: Arc<StringArray> = Arc::new(StringArray::from_iter_values(
5311 (i * 20..(i + 1) * 20).map(|v| format!("o-{:02}", v)),
5312 ));
5313 RecordBatch::try_new(
5314 schema.clone(),
5315 vec![
5316 Arc::new(StructArray::from(vec![
5317 (Arc::new(struct_i_field.clone()), struct_i_arr as ArrayRef),
5318 (Arc::new(struct_o_field.clone()), struct_o_arr as ArrayRef),
5319 ])),
5320 Arc::new(StringArray::from_iter_values(
5321 (i * 20..(i + 1) * 20).map(|v| format!("s-{}", v)),
5322 )),
5323 ],
5324 )
5325 .unwrap()
5326 })
5327 .collect();
5328 let batches =
5329 RecordBatchIterator::new(input_batches.clone().into_iter().map(Ok), schema.clone());
5330 let test_dir = TempStrDir::default();
5331 let test_uri = &test_dir;
5332 let write_params = WriteParams {
5333 max_rows_per_file: 40,
5334 max_rows_per_group: 10,
5335 data_storage_version: Some(LanceFileVersion::Legacy),
5336 ..Default::default()
5337 };
5338 Dataset::write(batches, test_uri, Some(write_params))
5339 .await
5340 .unwrap();
5341
5342 let dataset = Dataset::open(test_uri).await.unwrap();
5343 let batches = dataset
5344 .scan()
5345 .filter("struct.i >= 20")
5346 .unwrap()
5347 .try_into_stream()
5348 .await
5349 .unwrap()
5350 .try_collect::<Vec<_>>()
5351 .await
5352 .unwrap();
5353 let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
5354
5355 let expected_batch = concat_batches(&schema, &input_batches.as_slice()[1..]).unwrap();
5356 assert_eq!(batch, expected_batch);
5357
5358 let batches = dataset
5360 .scan()
5361 .filter("struct.o >= 'o-20'")
5362 .unwrap()
5363 .try_into_stream()
5364 .await
5365 .unwrap()
5366 .try_collect::<Vec<_>>()
5367 .await
5368 .unwrap();
5369 let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
5370 assert_eq!(batch, expected_batch);
5371
5372 let batches = dataset
5374 .scan()
5375 .project(vec!["struct"].as_slice())
5376 .unwrap()
5377 .try_into_stream()
5378 .await
5379 .unwrap()
5380 .try_collect::<Vec<_>>()
5381 .await
5382 .unwrap();
5383 concat_batches(&batches[0].schema(), &batches).unwrap();
5384 }
5385
5386 #[rstest]
5387 #[tokio::test]
5388 async fn test_ann_with_deletion(
5389 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5390 data_storage_version: LanceFileVersion,
5391 #[values(false, true)] stable_row_ids: bool,
5392 ) {
5393 let vec_params = vec![
5394 VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 2),
5397 ];
5398 for params in vec_params {
5399 use lance_arrow::FixedSizeListArrayExt;
5400
5401 let test_dir = TempStrDir::default();
5402 let test_uri = &test_dir;
5403
5404 let schema = Arc::new(ArrowSchema::new(vec![
5406 ArrowField::new("i", DataType::Int32, true),
5407 ArrowField::new(
5408 "vec",
5409 DataType::FixedSizeList(
5410 Arc::new(ArrowField::new("item", DataType::Float32, true)),
5411 32,
5412 ),
5413 true,
5414 ),
5415 ]));
5416
5417 let vector_values: Float32Array =
5419 (0..32 * 512).map(|v| (v / 32) as f32 + 1.0).collect();
5420 let vectors = FixedSizeListArray::try_new_from_values(vector_values, 32).unwrap();
5421
5422 let batches = vec![RecordBatch::try_new(
5423 schema.clone(),
5424 vec![
5425 Arc::new(Int32Array::from_iter_values(0..512)),
5426 Arc::new(vectors.clone()),
5427 ],
5428 )
5429 .unwrap()];
5430
5431 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
5432 let mut dataset = Dataset::write(
5433 reader,
5434 test_uri,
5435 Some(WriteParams {
5436 data_storage_version: Some(data_storage_version),
5437 enable_stable_row_ids: stable_row_ids,
5438 ..Default::default()
5439 }),
5440 )
5441 .await
5442 .unwrap();
5443
5444 assert_eq!(dataset.index_cache_entry_count().await, 0);
5445 dataset
5446 .create_index(
5447 &["vec"],
5448 IndexType::Vector,
5449 Some("idx".to_string()),
5450 ¶ms,
5451 true,
5452 )
5453 .await
5454 .unwrap();
5455
5456 let mut scan = dataset.scan();
5457 let key: Float32Array = (0..32).map(|_v| 1.0_f32).collect();
5459 scan.nearest("vec", &key, 5).unwrap();
5460 scan.refine(100);
5461 scan.minimum_nprobes(100);
5462
5463 assert_eq!(
5464 dataset.index_cache_entry_count().await,
5465 2, );
5467 let results = scan
5468 .try_into_stream()
5469 .await
5470 .unwrap()
5471 .try_collect::<Vec<_>>()
5472 .await
5473 .unwrap();
5474
5475 assert_eq!(
5476 dataset.index_cache_entry_count().await,
5477 5 + dataset.versions().await.unwrap().len()
5478 );
5479 assert_eq!(results.len(), 1);
5480 let batch = &results[0];
5481
5482 let expected_i = BTreeSet::from_iter(vec![0, 1, 2, 3, 4]);
5483 let column_i = batch.column_by_name("i").unwrap();
5484 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
5485 .values()
5486 .iter()
5487 .copied()
5488 .collect();
5489 assert_eq!(expected_i, actual_i);
5490
5491 dataset.delete("i = 1").await.unwrap();
5494 let mut scan = dataset.scan();
5495 scan.nearest("vec", &key, 5).unwrap();
5496 scan.refine(100);
5497 scan.minimum_nprobes(100);
5498
5499 let results = scan
5500 .try_into_stream()
5501 .await
5502 .unwrap()
5503 .try_collect::<Vec<_>>()
5504 .await
5505 .unwrap();
5506
5507 assert_eq!(results.len(), 1);
5508 let batch = &results[0];
5509
5510 let expected_i = BTreeSet::from_iter(vec![0, 2, 3, 4, 5]);
5512 let column_i = batch.column_by_name("i").unwrap();
5513 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
5514 .values()
5515 .iter()
5516 .copied()
5517 .collect();
5518 assert_eq!(expected_i, actual_i);
5519
5520 let batches = vec![RecordBatch::try_new(
5523 schema.clone(),
5524 vec![
5525 Arc::new(Int32Array::from_iter_values(512..1024)),
5526 Arc::new(vectors),
5527 ],
5528 )
5529 .unwrap()];
5530
5531 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
5532 let mut dataset = Dataset::write(
5533 reader,
5534 test_uri,
5535 Some(WriteParams {
5536 mode: WriteMode::Append,
5537 data_storage_version: Some(data_storage_version),
5538 ..Default::default()
5539 }),
5540 )
5541 .await
5542 .unwrap();
5543 dataset
5544 .create_index(
5545 &["vec"],
5546 IndexType::Vector,
5547 Some("idx".to_string()),
5548 ¶ms,
5549 true,
5550 )
5551 .await
5552 .unwrap();
5553
5554 dataset.delete("i < 512").await.unwrap();
5555
5556 let mut scan = dataset.scan();
5557 scan.nearest("vec", &key, 5).unwrap();
5558 scan.refine(100);
5559 scan.minimum_nprobes(100);
5560
5561 let results = scan
5562 .try_into_stream()
5563 .await
5564 .unwrap()
5565 .try_collect::<Vec<_>>()
5566 .await
5567 .unwrap();
5568
5569 assert_eq!(results.len(), 1);
5570 let batch = &results[0];
5571
5572 let expected_i = BTreeSet::from_iter(vec![512, 513, 514, 515, 516]);
5574 let column_i = batch.column_by_name("i").unwrap();
5575 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
5576 .values()
5577 .iter()
5578 .copied()
5579 .collect();
5580 assert_eq!(expected_i, actual_i);
5581 }
5582 }
5583
5584 #[tokio::test]
5585 async fn test_projection_order() {
5586 let vec_params = VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 2);
5587 let mut data = gen_batch()
5588 .col("vec", array::rand_vec::<Float32Type>(Dimension::from(4)))
5589 .col("text", array::rand_utf8(ByteCount::from(10), false))
5590 .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(100))
5591 .await
5592 .unwrap();
5593 data.create_index(&["vec"], IndexType::Vector, None, &vec_params, true)
5594 .await
5595 .unwrap();
5596
5597 let mut scan = data.scan();
5598 scan.nearest("vec", &Float32Array::from(vec![1.0, 1.0, 1.0, 1.0]), 5)
5599 .unwrap();
5600 scan.with_row_id().project(&["text"]).unwrap();
5601
5602 let results = scan
5603 .try_into_stream()
5604 .await
5605 .unwrap()
5606 .try_collect::<Vec<_>>()
5607 .await
5608 .unwrap();
5609
5610 assert_eq!(
5611 results[0].schema().field_names(),
5612 vec!["text", "_distance", "_rowid"]
5613 );
5614 }
5615
5616 #[rstest]
5617 #[tokio::test]
5618 async fn test_count_rows_with_filter(
5619 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5620 data_storage_version: LanceFileVersion,
5621 ) {
5622 let test_dir = TempStrDir::default();
5623 let test_uri = &test_dir;
5624 let mut data_gen = BatchGenerator::new().col(Box::new(
5625 IncrementingInt32::new().named("Filter_me".to_owned()),
5626 ));
5627 Dataset::write(
5628 data_gen.batch(32),
5629 test_uri,
5630 Some(WriteParams {
5631 data_storage_version: Some(data_storage_version),
5632 ..Default::default()
5633 }),
5634 )
5635 .await
5636 .unwrap();
5637
5638 let dataset = Dataset::open(test_uri).await.unwrap();
5639 assert_eq!(32, dataset.count_rows(None).await.unwrap());
5640 assert_eq!(
5641 16,
5642 dataset
5643 .count_rows(Some("`Filter_me` > 15".to_string()))
5644 .await
5645 .unwrap()
5646 );
5647 }
5648
5649 #[rstest]
5650 #[tokio::test]
5651 async fn test_dynamic_projection(
5652 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5653 data_storage_version: LanceFileVersion,
5654 ) {
5655 let test_dir = TempStrDir::default();
5656 let test_uri = &test_dir;
5657 let mut data_gen =
5658 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("i".to_owned())));
5659 Dataset::write(
5660 data_gen.batch(32),
5661 test_uri,
5662 Some(WriteParams {
5663 data_storage_version: Some(data_storage_version),
5664 ..Default::default()
5665 }),
5666 )
5667 .await
5668 .unwrap();
5669
5670 let dataset = Dataset::open(test_uri).await.unwrap();
5671 assert_eq!(dataset.count_rows(None).await.unwrap(), 32);
5672
5673 let mut scanner = dataset.scan();
5674
5675 let scan_res = scanner
5676 .project_with_transform(&[("bool", "i > 15")])
5677 .unwrap()
5678 .try_into_batch()
5679 .await
5680 .unwrap();
5681
5682 assert_eq!(1, scan_res.num_columns());
5683
5684 let bool_col = scan_res
5685 .column_by_name("bool")
5686 .expect("bool column should exist");
5687 let bool_arr = bool_col.as_boolean();
5688 for i in 0..32 {
5689 if i > 15 {
5690 assert!(bool_arr.value(i));
5691 } else {
5692 assert!(!bool_arr.value(i));
5693 }
5694 }
5695 }
5696
5697 #[rstest]
5698 #[tokio::test]
5699 async fn test_column_casting_function(
5700 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5701 data_storage_version: LanceFileVersion,
5702 ) {
5703 let test_dir = TempStrDir::default();
5704 let test_uri = &test_dir;
5705 let mut data_gen =
5706 BatchGenerator::new().col(Box::new(RandomVector::new().named("vec".to_owned())));
5707 Dataset::write(
5708 data_gen.batch(32),
5709 test_uri,
5710 Some(WriteParams {
5711 data_storage_version: Some(data_storage_version),
5712 ..Default::default()
5713 }),
5714 )
5715 .await
5716 .unwrap();
5717
5718 let dataset = Dataset::open(test_uri).await.unwrap();
5719 assert_eq!(dataset.count_rows(None).await.unwrap(), 32);
5720
5721 let mut scanner = dataset.scan();
5722
5723 let scan_res = scanner
5724 .project_with_transform(&[("f16", "_cast_list_f16(vec)")])
5725 .unwrap()
5726 .try_into_batch()
5727 .await
5728 .unwrap();
5729
5730 assert_eq!(1, scan_res.num_columns());
5731 assert_eq!(32, scan_res.num_rows());
5732 assert_eq!("f16", scan_res.schema().field(0).name());
5733
5734 let mut scanner = dataset.scan();
5735 let scan_res_original = scanner
5736 .project(&["vec"])
5737 .unwrap()
5738 .try_into_batch()
5739 .await
5740 .unwrap();
5741
5742 let f32_col: &Float32Array = scan_res_original
5743 .column_by_name("vec")
5744 .unwrap()
5745 .as_fixed_size_list()
5746 .values()
5747 .as_primitive();
5748 let f16_col: &Float16Array = scan_res
5749 .column_by_name("f16")
5750 .unwrap()
5751 .as_fixed_size_list()
5752 .values()
5753 .as_primitive();
5754
5755 for (f32_val, f16_val) in f32_col.iter().zip(f16_col.iter()) {
5756 let f32_val = f32_val.unwrap();
5757 let f16_val = f16_val.unwrap();
5758 assert_eq!(f16::from_f32(f32_val), f16_val);
5759 }
5760 }
5761
5762 struct ScalarIndexTestFixture {
5763 _test_dir: TempStrDir,
5764 dataset: Dataset,
5765 sample_query: Arc<dyn Array>,
5766 delete_query: Arc<dyn Array>,
5767 original_version: u64,
5769 compact_version: u64,
5771 append_version: u64,
5773 updated_version: u64,
5775 delete_version: u64,
5777 append_then_delete_version: u64,
5779 }
5780
5781 #[derive(Debug, PartialEq)]
5782 struct ScalarTestParams {
5783 use_index: bool,
5784 use_projection: bool,
5785 use_deleted_data: bool,
5786 use_new_data: bool,
5787 with_row_id: bool,
5788 use_compaction: bool,
5789 use_updated: bool,
5790 }
5791
5792 impl ScalarIndexTestFixture {
5793 async fn new(data_storage_version: LanceFileVersion, use_stable_row_ids: bool) -> Self {
5794 let test_dir = TempStrDir::default();
5795 let test_uri = &test_dir;
5796
5797 let data = gen_batch()
5803 .col(
5804 "vector",
5805 array::rand_vec::<Float32Type>(Dimension::from(32)),
5806 )
5807 .col("indexed", array::step::<Int32Type>())
5808 .col("not_indexed", array::step::<Int32Type>())
5809 .into_batch_rows(RowCount::from(1000))
5810 .unwrap();
5811
5812 let mut dataset = Dataset::write(
5814 RecordBatchIterator::new(vec![Ok(data.clone())], data.schema().clone()),
5815 test_uri,
5816 Some(WriteParams {
5817 max_rows_per_file: 500,
5818 data_storage_version: Some(data_storage_version),
5819 enable_stable_row_ids: use_stable_row_ids,
5820 ..Default::default()
5821 }),
5822 )
5823 .await
5824 .unwrap();
5825
5826 dataset
5827 .create_index(
5828 &["vector"],
5829 IndexType::Vector,
5830 None,
5831 &VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2),
5832 false,
5833 )
5834 .await
5835 .unwrap();
5836
5837 dataset
5838 .create_index(
5839 &["indexed"],
5840 IndexType::Scalar,
5841 None,
5842 &ScalarIndexParams::default(),
5843 false,
5844 )
5845 .await
5846 .unwrap();
5847
5848 let original_version = dataset.version().version;
5849 let sample_query = data["vector"].as_fixed_size_list().value(50);
5850 let delete_query = data["vector"].as_fixed_size_list().value(75);
5851
5852 let new_indexed =
5857 arrow_arith::numeric::add(&data["indexed"], &Int32Array::new_scalar(1000)).unwrap();
5858 let new_not_indexed =
5859 arrow_arith::numeric::add(&data["indexed"], &Int32Array::new_scalar(1000)).unwrap();
5860 let append_data = RecordBatch::try_new(
5861 data.schema(),
5862 vec![data["vector"].clone(), new_indexed, new_not_indexed],
5863 )
5864 .unwrap();
5865
5866 dataset
5867 .append(
5868 RecordBatchIterator::new(vec![Ok(append_data)], data.schema()),
5869 Some(WriteParams {
5870 data_storage_version: Some(data_storage_version),
5871 ..Default::default()
5872 }),
5873 )
5874 .await
5875 .unwrap();
5876
5877 let append_version = dataset.version().version;
5878
5879 dataset
5882 .optimize_indices(&OptimizeOptions::merge(1))
5883 .await
5884 .unwrap();
5885 let updated_version = dataset.version().version;
5886
5887 dataset.checkout_version(append_version).await.unwrap();
5890 dataset.restore().await.unwrap();
5891
5892 dataset.delete("not_indexed = 75").await.unwrap();
5893
5894 let append_then_delete_version = dataset.version().version;
5895
5896 let mut dataset = dataset.checkout_version(original_version).await.unwrap();
5899 dataset.restore().await.unwrap();
5900
5901 dataset.delete("not_indexed = 75").await.unwrap();
5902
5903 let delete_version = dataset.version().version;
5904
5905 compact_files(&mut dataset, CompactionOptions::default(), None)
5907 .await
5908 .unwrap();
5909 let compact_version = dataset.version().version;
5910 dataset.checkout_version(original_version).await.unwrap();
5911 dataset.restore().await.unwrap();
5912
5913 Self {
5914 _test_dir: test_dir,
5915 dataset,
5916 sample_query,
5917 delete_query,
5918 original_version,
5919 compact_version,
5920 append_version,
5921 updated_version,
5922 delete_version,
5923 append_then_delete_version,
5924 }
5925 }
5926
5927 fn sample_query(&self) -> &PrimitiveArray<Float32Type> {
5928 self.sample_query.as_primitive::<Float32Type>()
5929 }
5930
5931 fn delete_query(&self) -> &PrimitiveArray<Float32Type> {
5932 self.delete_query.as_primitive::<Float32Type>()
5933 }
5934
5935 async fn get_dataset(&self, params: &ScalarTestParams) -> Dataset {
5936 let version = if params.use_compaction {
5937 if params.use_deleted_data || params.use_new_data || params.use_updated {
5939 panic!(
5940 "There is no test data combining new/deleted/updated data with compaction"
5941 );
5942 } else {
5943 self.compact_version
5944 }
5945 } else if params.use_updated {
5946 if params.use_deleted_data || params.use_new_data || params.use_compaction {
5948 panic!(
5949 "There is no test data combining updated data with new/deleted/compaction"
5950 );
5951 } else {
5952 self.updated_version
5953 }
5954 } else {
5955 match (params.use_new_data, params.use_deleted_data) {
5956 (false, false) => self.original_version,
5957 (false, true) => self.delete_version,
5958 (true, false) => self.append_version,
5959 (true, true) => self.append_then_delete_version,
5960 }
5961 };
5962 self.dataset.checkout_version(version).await.unwrap()
5963 }
5964
5965 async fn run_query(
5966 &self,
5967 query: &str,
5968 vector: Option<&PrimitiveArray<Float32Type>>,
5969 params: &ScalarTestParams,
5970 ) -> (String, RecordBatch) {
5971 let dataset = self.get_dataset(params).await;
5972 let mut scan = dataset.scan();
5973 if let Some(vector) = vector {
5974 scan.nearest("vector", vector, 10).unwrap();
5975 }
5976 if params.use_projection {
5977 scan.project(&["indexed"]).unwrap();
5978 }
5979 if params.with_row_id {
5980 scan.with_row_id();
5981 }
5982 scan.scan_in_order(true);
5983 scan.use_index(params.use_index);
5984 scan.filter(query).unwrap();
5985 scan.prefilter(true);
5986
5987 let plan = scan.explain_plan(true).await.unwrap();
5988 let batch = scan.try_into_batch().await.unwrap();
5989
5990 if params.use_projection {
5991 let mut expected_columns = 1;
5993 if vector.is_some() {
5994 expected_columns += 1;
5996 }
5997 if params.with_row_id {
5998 expected_columns += 1;
5999 }
6000 assert_eq!(batch.num_columns(), expected_columns);
6001 } else {
6002 let mut expected_columns = 3;
6003 if vector.is_some() {
6004 expected_columns += 1;
6006 }
6007 if params.with_row_id {
6008 expected_columns += 1;
6009 }
6010 assert_eq!(batch.num_columns(), expected_columns);
6012 }
6013
6014 (plan, batch)
6015 }
6016
6017 fn assert_none<F: Fn(i32) -> bool>(
6018 &self,
6019 batch: &RecordBatch,
6020 predicate: F,
6021 message: &str,
6022 ) {
6023 let indexed = batch["indexed"].as_primitive::<Int32Type>();
6024 if indexed.iter().map(|val| val.unwrap()).any(predicate) {
6025 panic!("{}", message);
6026 }
6027 }
6028
6029 fn assert_one<F: Fn(i32) -> bool>(&self, batch: &RecordBatch, predicate: F, message: &str) {
6030 let indexed = batch["indexed"].as_primitive::<Int32Type>();
6031 if !indexed.iter().map(|val| val.unwrap()).any(predicate) {
6032 panic!("{}", message);
6033 }
6034 }
6035
6036 async fn check_vector_scalar_indexed_and_refine(&self, params: &ScalarTestParams) {
6037 let (query_plan, batch) = self
6038 .run_query(
6039 "indexed != 50 AND ((not_indexed < 100) OR (not_indexed >= 1000 AND not_indexed < 1100))",
6040 Some(self.sample_query()),
6041 params,
6042 )
6043 .await;
6044 if self.dataset.is_legacy_storage() {
6046 assert!(query_plan.contains("MaterializeIndex"));
6047 }
6048 self.assert_none(
6050 &batch,
6051 |val| val == 50,
6052 "The query contained 50 even though it was filtered",
6053 );
6054 if !params.use_new_data {
6055 self.assert_none(
6057 &batch,
6058 |val| (100..1000).contains(&val) || (val >= 1100),
6059 "The non-indexed refine filter was not applied",
6060 );
6061 }
6062
6063 if params.use_new_data || params.use_updated {
6065 self.assert_one(
6066 &batch,
6067 |val| val == 1050,
6068 "The query did not contain 1050 from the new data",
6069 );
6070 }
6071 }
6072
6073 async fn check_vector_scalar_indexed_only(&self, params: &ScalarTestParams) {
6074 let (query_plan, batch) = self
6075 .run_query("indexed != 50", Some(self.sample_query()), params)
6076 .await;
6077 if self.dataset.is_legacy_storage() {
6078 if params.use_index {
6079 assert!(query_plan.contains("ScalarIndexQuery"));
6082 } else {
6083 assert!(query_plan.contains("MaterializeIndex"));
6085 }
6086 }
6087 self.assert_none(
6089 &batch,
6090 |val| val == 50,
6091 "The query contained 50 even though it was filtered",
6092 );
6093 if params.use_new_data {
6095 self.assert_one(
6096 &batch,
6097 |val| val == 1050,
6098 "The query did not contain 1050 from the new data",
6099 );
6100 if !params.use_new_data {
6101 let (_, batch) = self
6103 .run_query("indexed == 1050", Some(self.sample_query()), params)
6104 .await;
6105 assert_eq!(batch.num_rows(), 1);
6106 }
6107 }
6108 if params.use_deleted_data {
6109 let (_, batch) = self
6110 .run_query("indexed == 75", Some(self.delete_query()), params)
6111 .await;
6112 if !params.use_new_data {
6113 assert_eq!(batch.num_rows(), 0);
6114 }
6115 }
6116 }
6117
6118 async fn check_vector_queries(&self, params: &ScalarTestParams) {
6119 self.check_vector_scalar_indexed_only(params).await;
6120 self.check_vector_scalar_indexed_and_refine(params).await;
6121 }
6122
6123 async fn check_simple_indexed_only(&self, params: &ScalarTestParams) {
6124 let (query_plan, batch) = self.run_query("indexed != 50", None, params).await;
6125 if self.dataset.is_legacy_storage() {
6127 assert!(query_plan.contains("MaterializeIndex"));
6128 } else {
6129 assert!(query_plan.contains("LanceRead"));
6130 }
6131 self.assert_none(
6133 &batch,
6134 |val| val == 50,
6135 "The query contained 50 even though it was filtered",
6136 );
6137 let mut expected_num_rows = if params.use_new_data || params.use_updated {
6138 1999
6139 } else {
6140 999
6141 };
6142 if params.use_deleted_data || params.use_compaction {
6143 expected_num_rows -= 1;
6144 }
6145 assert_eq!(batch.num_rows(), expected_num_rows);
6146
6147 if params.use_new_data || params.use_updated {
6149 let (_, batch) = self.run_query("indexed == 1050", None, params).await;
6150 assert_eq!(batch.num_rows(), 1);
6151 }
6152
6153 if params.use_deleted_data || params.use_compaction {
6155 let (_, batch) = self.run_query("indexed == 75", None, params).await;
6156 assert_eq!(batch.num_rows(), 0);
6157 }
6158 }
6159
6160 async fn check_simple_indexed_and_refine(&self, params: &ScalarTestParams) {
6161 let (query_plan, batch) = self.run_query(
6162 "indexed != 50 AND ((not_indexed < 100) OR (not_indexed >= 1000 AND not_indexed < 1100))",
6163 None,
6164 params
6165 ).await;
6166 if self.dataset.is_legacy_storage() {
6168 assert!(query_plan.contains("MaterializeIndex"));
6169 } else {
6170 assert!(query_plan.contains("LanceRead"));
6171 }
6172 self.assert_none(
6174 &batch,
6175 |val| val == 50,
6176 "The query contained 50 even though it was filtered",
6177 );
6178 self.assert_none(
6180 &batch,
6181 |val| (100..1000).contains(&val) || (val >= 1100),
6182 "The non-indexed refine filter was not applied",
6183 );
6184
6185 let mut expected_num_rows = if params.use_new_data || params.use_updated {
6186 199
6187 } else {
6188 99
6189 };
6190 if params.use_deleted_data || params.use_compaction {
6191 expected_num_rows -= 1;
6192 }
6193 assert_eq!(batch.num_rows(), expected_num_rows);
6194 }
6195
6196 async fn check_simple_queries(&self, params: &ScalarTestParams) {
6197 self.check_simple_indexed_only(params).await;
6198 self.check_simple_indexed_and_refine(params).await;
6199 }
6200 }
6201
6202 #[rstest]
6206 #[tokio::test]
6207 async fn test_secondary_index_scans(
6208 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
6209 data_storage_version: LanceFileVersion,
6210 #[values(false, true)] use_stable_row_ids: bool,
6211 ) {
6212 let fixture = Box::pin(ScalarIndexTestFixture::new(
6213 data_storage_version,
6214 use_stable_row_ids,
6215 ))
6216 .await;
6217
6218 for use_index in [false, true] {
6219 for use_projection in [false, true] {
6220 for use_deleted_data in [false, true] {
6221 for use_new_data in [false, true] {
6222 let compaction_choices =
6227 if use_deleted_data || use_new_data || use_stable_row_ids {
6228 vec![false]
6229 } else {
6230 vec![false, true]
6231 };
6232 for use_compaction in compaction_choices {
6233 let updated_choices =
6234 if use_deleted_data || use_new_data || use_compaction {
6235 vec![false]
6236 } else {
6237 vec![false, true]
6238 };
6239 for use_updated in updated_choices {
6240 for with_row_id in [false, true] {
6241 let params = ScalarTestParams {
6242 use_index,
6243 use_projection,
6244 use_deleted_data,
6245 use_new_data,
6246 with_row_id,
6247 use_compaction,
6248 use_updated,
6249 };
6250 fixture.check_vector_queries(¶ms).await;
6251 fixture.check_simple_queries(¶ms).await;
6252 }
6253 }
6254 }
6255 }
6256 }
6257 }
6258 }
6259 }
6260
6261 #[tokio::test]
6262 async fn can_filter_row_id() {
6263 let dataset = lance_datagen::gen_batch()
6264 .col("x", array::step::<Int32Type>())
6265 .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(1000))
6266 .await
6267 .unwrap();
6268
6269 let mut scan = dataset.scan();
6270 scan.with_row_id();
6271 scan.project::<&str>(&[]).unwrap();
6272 scan.filter("_rowid == 50").unwrap();
6273 let batch = scan.try_into_batch().await.unwrap();
6274 assert_eq!(batch.num_rows(), 1);
6275 assert_eq!(batch.column(0).as_primitive::<UInt64Type>().values()[0], 50);
6276 }
6277
6278 #[rstest]
6279 #[tokio::test]
6280 async fn test_index_take_batch_size() {
6281 let fixture = Box::pin(ScalarIndexTestFixture::new(LanceFileVersion::Stable, false)).await;
6282 let stream = fixture
6283 .dataset
6284 .scan()
6285 .filter("indexed > 0")
6286 .unwrap()
6287 .batch_size(16)
6288 .try_into_stream()
6289 .await
6290 .unwrap();
6291 let batches = stream.collect::<Vec<_>>().await;
6292 assert_eq!(batches.len(), 1000_usize.div_ceil(16));
6293 }
6294
6295 async fn assert_plan_equals(
6299 dataset: &Dataset,
6300 plan: impl Fn(&mut Scanner) -> Result<&mut Scanner>,
6301 expected: &str,
6302 ) -> Result<()> {
6303 let mut scan = dataset.scan();
6304 plan(&mut scan)?;
6305 let exec_plan = scan.create_plan().await?;
6306 assert_plan_node_equals(exec_plan, expected).await
6307 }
6308
6309 #[tokio::test]
6310 async fn test_count_plan() {
6311 let dim = 256;
6313 let fixture = TestVectorDataset::new_with_dimension(LanceFileVersion::Stable, true, dim)
6314 .await
6315 .unwrap();
6316
6317 let err = fixture
6319 .dataset
6320 .scan()
6321 .create_count_plan()
6322 .await
6323 .unwrap_err();
6324 assert!(matches!(err, Error::InvalidInput { .. }));
6325
6326 let mut scan = fixture.dataset.scan();
6327 scan.project(&Vec::<String>::default()).unwrap();
6328
6329 let err = scan.create_count_plan().await.unwrap_err();
6331 assert!(matches!(err, Error::InvalidInput { .. }));
6332
6333 scan.with_row_id();
6334
6335 let plan = scan.create_count_plan().await.unwrap();
6336
6337 assert_plan_node_equals(
6338 plan,
6339 "AggregateExec: mode=Single, gby=[], aggr=[count_rows]
6340 LanceRead: uri=..., projection=[], num_fragments=2, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=--, refine_filter=--",
6341 )
6342 .await
6343 .unwrap();
6344
6345 scan.filter("s == ''").unwrap();
6346
6347 let plan = scan.create_count_plan().await.unwrap();
6348
6349 assert_plan_node_equals(
6350 plan,
6351 "AggregateExec: mode=Single, gby=[], aggr=[count_rows]
6352 ProjectionExec: expr=[_rowid@1 as _rowid]
6353 LanceRead: uri=..., projection=[s], num_fragments=2, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=s = Utf8(\"\"), refine_filter=s = Utf8(\"\")",
6354 )
6355 .await
6356 .unwrap();
6357 }
6358
6359 #[tokio::test]
6360 async fn test_inexact_scalar_index_plans() {
6361 let data = gen_batch()
6362 .col("ngram", array::rand_utf8(ByteCount::from(5), false))
6363 .col("exact", array::rand_type(&DataType::UInt32))
6364 .col("no_index", array::rand_type(&DataType::UInt32))
6365 .into_reader_rows(RowCount::from(1000), BatchCount::from(5));
6366
6367 let mut dataset = Dataset::write(data, "memory://test", None).await.unwrap();
6368 dataset
6369 .create_index(
6370 &["ngram"],
6371 IndexType::NGram,
6372 None,
6373 &ScalarIndexParams::default(),
6374 true,
6375 )
6376 .await
6377 .unwrap();
6378 dataset
6379 .create_index(
6380 &["exact"],
6381 IndexType::BTree,
6382 None,
6383 &ScalarIndexParams::default(),
6384 true,
6385 )
6386 .await
6387 .unwrap();
6388
6389 assert_plan_equals(
6391 &dataset,
6392 |scanner| scanner.filter("contains(ngram, 'test string')"),
6393 "LanceRead: uri=..., projection=[ngram, exact, no_index], num_fragments=1, \
6394 range_before=None, range_after=None, row_id=false, row_addr=false, \
6395 full_filter=contains(ngram, Utf8(\"test string\")), refine_filter=--
6396 ScalarIndexQuery: query=[contains(ngram, Utf8(\"test string\"))]@ngram_idx",
6397 )
6398 .await
6399 .unwrap();
6400
6401 assert_plan_equals(
6403 &dataset,
6404 |scanner| scanner.filter("contains(ngram, 'test string') and exact < 50"),
6405 "LanceRead: uri=..., projection=[ngram, exact, no_index], num_fragments=1, \
6406 range_before=None, range_after=None, row_id=false, row_addr=false, \
6407 full_filter=contains(ngram, Utf8(\"test string\")) AND exact < UInt32(50), \
6408 refine_filter=--
6409 ScalarIndexQuery: query=AND([contains(ngram, Utf8(\"test string\"))]@ngram_idx,[exact < 50]@exact_idx)",
6410 )
6411 .await
6412 .unwrap();
6413
6414 assert_plan_equals(
6416 &dataset,
6417 |scanner| {
6418 scanner.filter("contains(ngram, 'test string') and exact < 50 AND no_index > 100")
6419 },
6420 "ProjectionExec: expr=[ngram@0 as ngram, exact@1 as exact, no_index@2 as no_index]
6421 LanceRead: uri=..., projection=[ngram, exact, no_index], num_fragments=1, range_before=None, \
6422 range_after=None, row_id=true, row_addr=false, full_filter=contains(ngram, Utf8(\"test string\")) AND exact < UInt32(50) AND no_index > UInt32(100), \
6423 refine_filter=no_index > UInt32(100)
6424 ScalarIndexQuery: query=AND([contains(ngram, Utf8(\"test string\"))]@ngram_idx,[exact < 50]@exact_idx)",
6425 )
6426 .await
6427 .unwrap();
6428 }
6429
6430 #[rstest]
6431 #[tokio::test]
6432 async fn test_late_materialization(
6433 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
6434 data_storage_version: LanceFileVersion,
6435 ) {
6436 use lance_io::assert_io_lt;
6437 use lance_table::io::commit::RenameCommitHandler;
6440 let data = gen_batch()
6441 .col(
6442 "vector",
6443 array::rand_vec::<Float32Type>(Dimension::from(32)),
6444 )
6445 .col("indexed", array::step::<Int32Type>())
6446 .col("not_indexed", array::step::<Int32Type>())
6447 .into_reader_rows(RowCount::from(1000), BatchCount::from(20));
6448
6449 let io_tracker = Arc::new(IOTracker::default());
6450 let mut dataset = Dataset::write(
6451 data,
6452 "memory://test",
6453 Some(WriteParams {
6454 store_params: Some(ObjectStoreParams {
6455 object_store_wrapper: Some(io_tracker.clone()),
6456 ..Default::default()
6457 }),
6458 commit_handler: Some(Arc::new(RenameCommitHandler)),
6459 data_storage_version: Some(data_storage_version),
6460 ..Default::default()
6461 }),
6462 )
6463 .await
6464 .unwrap();
6465 dataset
6466 .create_index(
6467 &["indexed"],
6468 IndexType::Scalar,
6469 None,
6470 &ScalarIndexParams::default(),
6471 false,
6472 )
6473 .await
6474 .unwrap();
6475
6476 let _ = io_tracker.incremental_stats(); dataset.scan().try_into_batch().await.unwrap();
6479 let io_stats = io_tracker.incremental_stats();
6480 let full_scan_bytes = io_stats.read_bytes;
6481
6482 dataset
6484 .scan()
6485 .use_stats(false)
6486 .filter("not_indexed = 50")
6487 .unwrap()
6488 .try_into_batch()
6489 .await
6490 .unwrap();
6491 let io_stats = io_tracker.incremental_stats();
6492 assert_io_lt!(io_stats, read_bytes, full_scan_bytes);
6493 let filtered_scan_bytes = io_stats.read_bytes;
6494
6495 if data_storage_version == LanceFileVersion::Legacy {
6498 dataset
6499 .scan()
6500 .filter("not_indexed = 50")
6501 .unwrap()
6502 .try_into_batch()
6503 .await
6504 .unwrap();
6505 let io_stats = io_tracker.incremental_stats();
6506 assert_io_lt!(io_stats, read_bytes, filtered_scan_bytes);
6507 }
6508
6509 dataset
6513 .scan()
6514 .filter("indexed = 50")
6515 .unwrap()
6516 .try_into_batch()
6517 .await
6518 .unwrap();
6519 let io_stats = io_tracker.incremental_stats();
6520 assert_io_lt!(io_stats, read_bytes, full_scan_bytes);
6521 let index_scan_bytes = io_stats.read_bytes;
6522
6523 dataset
6526 .scan()
6527 .filter("indexed = 50")
6528 .unwrap()
6529 .try_into_batch()
6530 .await
6531 .unwrap();
6532 let io_stats = io_tracker.incremental_stats();
6533 assert_io_lt!(io_stats, read_bytes, index_scan_bytes);
6534 }
6535
6536 #[rstest]
6537 #[tokio::test]
6538 async fn test_project_nested(
6539 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
6540 data_storage_version: LanceFileVersion,
6541 ) -> Result<()> {
6542 let struct_i_field = ArrowField::new("i", DataType::Int32, true);
6543 let struct_o_field = ArrowField::new("o", DataType::Utf8, true);
6544 let schema = Arc::new(ArrowSchema::new(vec![
6545 ArrowField::new(
6546 "struct",
6547 DataType::Struct(vec![struct_i_field.clone(), struct_o_field.clone()].into()),
6548 true,
6549 ),
6550 ArrowField::new("s", DataType::Utf8, true),
6551 ]));
6552
6553 let input_batches: Vec<RecordBatch> = (0..5)
6554 .map(|i| {
6555 let struct_i_arr: Arc<Int32Array> =
6556 Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20));
6557 let struct_o_arr: Arc<StringArray> = Arc::new(StringArray::from_iter_values(
6558 (i * 20..(i + 1) * 20).map(|v| format!("o-{:02}", v)),
6559 ));
6560 RecordBatch::try_new(
6561 schema.clone(),
6562 vec![
6563 Arc::new(StructArray::from(vec![
6564 (Arc::new(struct_i_field.clone()), struct_i_arr as ArrayRef),
6565 (Arc::new(struct_o_field.clone()), struct_o_arr as ArrayRef),
6566 ])),
6567 Arc::new(StringArray::from_iter_values(
6568 (i * 20..(i + 1) * 20).map(|v| format!("s-{}", v)),
6569 )),
6570 ],
6571 )
6572 .unwrap()
6573 })
6574 .collect();
6575 let batches =
6576 RecordBatchIterator::new(input_batches.clone().into_iter().map(Ok), schema.clone());
6577 let test_dir = TempStrDir::default();
6578 let test_uri = &test_dir;
6579 let write_params = WriteParams {
6580 max_rows_per_file: 40,
6581 max_rows_per_group: 10,
6582 data_storage_version: Some(data_storage_version),
6583 ..Default::default()
6584 };
6585 Dataset::write(batches, test_uri, Some(write_params))
6586 .await
6587 .unwrap();
6588
6589 let dataset = Dataset::open(test_uri).await.unwrap();
6590
6591 let batches = dataset
6592 .scan()
6593 .project(&["struct.i"])
6594 .unwrap()
6595 .try_into_stream()
6596 .await
6597 .unwrap()
6598 .try_collect::<Vec<_>>()
6599 .await
6600 .unwrap();
6601 let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
6602 assert!(batch.column_by_name("struct.i").is_some());
6603 Ok(())
6604 }
6605
6606 #[rstest]
6607 #[tokio::test]
6608 async fn test_plans(
6609 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
6610 data_storage_version: LanceFileVersion,
6611 #[values(false, true)] stable_row_id: bool,
6612 ) -> Result<()> {
6613 use lance_index::scalar::inverted::query::BoostQuery;
6616 let dim = 256;
6617 let mut dataset =
6618 TestVectorDataset::new_with_dimension(data_storage_version, stable_row_id, dim).await?;
6619 let lance_schema = dataset.dataset.schema();
6620
6621 if data_storage_version == LanceFileVersion::Legacy {
6625 log::info!("Test case: Pushdown scan");
6626 assert_plan_equals(
6627 &dataset.dataset,
6628 |scan| scan.project(&["s"])?.filter("i > 10 and i < 20"),
6629 "LancePushdownScan: uri=..., projection=[s], predicate=i > Int32(10) AND i < Int32(20), row_id=false, row_addr=false, ordered=true"
6630 ).await?;
6631 }
6632
6633 log::info!("Test case: Project and filter");
6634 let expected = if data_storage_version == LanceFileVersion::Legacy {
6635 "ProjectionExec: expr=[s@2 as s]
6636 Take: columns=\"i, _rowid, (s)\"
6637 CoalesceBatchesExec: target_batch_size=8192
6638 FilterExec: i@0 > 10 AND i@0 < 20
6639 LanceScan: uri..., projection=[i], row_id=true, row_addr=false, ordered=true, range=None"
6640 } else {
6641 "ProjectionExec: expr=[s@2 as s]
6642 Take: columns=\"i, _rowid, (s)\"
6643 CoalesceBatchesExec: target_batch_size=8192
6644 LanceRead: ..., projection=[i], num_fragments=2, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10) AND i < Int32(20), refine_filter=i > Int32(10) AND i < Int32(20)"
6645 };
6646 assert_plan_equals(
6647 &dataset.dataset,
6648 |scan| {
6649 scan.use_stats(false)
6650 .project(&["s"])?
6651 .filter("i > 10 and i < 20")
6652 },
6653 expected,
6654 )
6655 .await?;
6656
6657 log::info!("Test case: Late materialization");
6660 let expected = if data_storage_version == LanceFileVersion::Legacy {
6661 "ProjectionExec: expr=[i@0 as i, s@1 as s, vec@3 as vec]
6662 Take: columns=\"i, s, _rowid, (vec)\"
6663 CoalesceBatchesExec: target_batch_size=8192
6664 FilterExec: s@1 IS NOT NULL
6665 LanceScan: uri..., projection=[i, s], row_id=true, row_addr=false, ordered=true, range=None"
6666 } else {
6667 "ProjectionExec: expr=[i@0 as i, s@1 as s, vec@3 as vec]
6668 Take: columns=\"i, s, _rowid, (vec)\"
6669 CoalesceBatchesExec: target_batch_size=8192
6670 LanceRead: uri=..., projection=[i, s], num_fragments=2, range_before=None, range_after=None, \
6671 row_id=true, row_addr=false, full_filter=s IS NOT NULL, refine_filter=s IS NOT NULL"
6672 };
6673 assert_plan_equals(
6674 &dataset.dataset,
6675 |scan| scan.use_stats(false).filter("s IS NOT NULL"),
6676 expected,
6677 )
6678 .await?;
6679
6680 log::info!("Test case: Custom materialization (all early)");
6682 let expected = if data_storage_version == LanceFileVersion::Legacy {
6683 "ProjectionExec: expr=[i@0 as i, s@1 as s, vec@2 as vec]
6684 FilterExec: s@1 IS NOT NULL
6685 LanceScan: uri..., projection=[i, s, vec], row_id=true, row_addr=false, ordered=true, range=None"
6686 } else {
6687 "ProjectionExec: expr=[i@0 as i, s@1 as s, vec@2 as vec]
6688 LanceRead: uri=..., projection=[i, s, vec], num_fragments=2, range_before=None, \
6689 range_after=None, row_id=true, row_addr=false, full_filter=s IS NOT NULL, refine_filter=s IS NOT NULL"
6690 };
6691 assert_plan_equals(
6692 &dataset.dataset,
6693 |scan| {
6694 scan.use_stats(false)
6695 .materialization_style(MaterializationStyle::AllEarly)
6696 .filter("s IS NOT NULL")
6697 },
6698 expected,
6699 )
6700 .await?;
6701
6702 log::info!("Test case: Custom materialization 2 (all late)");
6703 let expected = if data_storage_version == LanceFileVersion::Legacy {
6704 "ProjectionExec: expr=[i@2 as i, s@0 as s, vec@3 as vec]
6705 Take: columns=\"s, _rowid, (i), (vec)\"
6706 CoalesceBatchesExec: target_batch_size=8192
6707 FilterExec: s@0 IS NOT NULL
6708 LanceScan: uri..., projection=[s], row_id=true, row_addr=false, ordered=true, range=None"
6709 } else {
6710 "ProjectionExec: expr=[i@2 as i, s@0 as s, vec@3 as vec]
6711 Take: columns=\"s, _rowid, (i), (vec)\"
6712 CoalesceBatchesExec: target_batch_size=8192
6713 LanceRead: uri=..., projection=[s], num_fragments=2, range_before=None, \
6714 range_after=None, row_id=true, row_addr=false, full_filter=s IS NOT NULL, refine_filter=s IS NOT NULL"
6715 };
6716 assert_plan_equals(
6717 &dataset.dataset,
6718 |scan| {
6719 scan.use_stats(false)
6720 .materialization_style(MaterializationStyle::AllLate)
6721 .filter("s IS NOT NULL")
6722 },
6723 expected,
6724 )
6725 .await?;
6726
6727 log::info!("Test case: Custom materialization 3 (mixed)");
6728 let expected = if data_storage_version == LanceFileVersion::Legacy {
6729 "ProjectionExec: expr=[i@3 as i, s@0 as s, vec@1 as vec]
6730 Take: columns=\"s, vec, _rowid, (i)\"
6731 CoalesceBatchesExec: target_batch_size=8192
6732 FilterExec: s@0 IS NOT NULL
6733 LanceScan: uri..., projection=[s, vec], row_id=true, row_addr=false, ordered=true, range=None"
6734 } else {
6735 "ProjectionExec: expr=[i@3 as i, s@0 as s, vec@1 as vec]
6736 Take: columns=\"s, vec, _rowid, (i)\"
6737 CoalesceBatchesExec: target_batch_size=8192
6738 LanceRead: uri=..., projection=[s, vec], num_fragments=2, range_before=None, range_after=None, \
6739 row_id=true, row_addr=false, full_filter=s IS NOT NULL, refine_filter=s IS NOT NULL"
6740 };
6741 assert_plan_equals(
6742 &dataset.dataset,
6743 |scan| {
6744 scan.use_stats(false)
6745 .materialization_style(
6746 MaterializationStyle::all_early_except(&["i"], lance_schema).unwrap(),
6747 )
6748 .filter("s IS NOT NULL")
6749 },
6750 expected,
6751 )
6752 .await?;
6753
6754 log::info!("Test case: Scan out of order");
6755 let expected = if data_storage_version == LanceFileVersion::Legacy {
6756 "LanceScan: uri=..., projection=[s], row_id=true, row_addr=false, ordered=false, range=None"
6757 } else {
6758 "LanceRead: uri=..., projection=[s], num_fragments=2, range_before=None, range_after=None, row_id=true, \
6759 row_addr=false, full_filter=--, refine_filter=--"
6760 };
6761 assert_plan_equals(
6762 &dataset.dataset,
6763 |scan| Ok(scan.project(&["s"])?.with_row_id().scan_in_order(false)),
6764 expected,
6765 )
6766 .await?;
6767
6768 let q: Float32Array = (32..32 + dim).map(|v| v as f32).collect();
6771 log::info!("Test case: Basic KNN");
6772 let expected = if data_storage_version == LanceFileVersion::Legacy {
6773 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6774 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6775 CoalesceBatchesExec: target_batch_size=8192
6776 FilterExec: _distance@2 IS NOT NULL
6777 SortExec: TopK(fetch=5), expr=...
6778 KNNVectorDistance: metric=l2
6779 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None"
6780 } else {
6781 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6782 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6783 CoalesceBatchesExec: target_batch_size=8192
6784 FilterExec: _distance@2 IS NOT NULL
6785 SortExec: TopK(fetch=5), expr=...
6786 KNNVectorDistance: metric=l2
6787 LanceRead: uri=..., projection=[vec], num_fragments=2, range_before=None, range_after=None, \
6788 row_id=true, row_addr=false, full_filter=--, refine_filter=--"
6789 };
6790 assert_plan_equals(
6791 &dataset.dataset,
6792 |scan| scan.nearest("vec", &q, 5),
6793 expected,
6794 )
6795 .await?;
6796
6797 let q: Float32Array = (32..32 + dim).map(|v| v as f32).collect();
6800 log::info!("Test case: KNN with extraneous limit");
6801 let expected = if data_storage_version == LanceFileVersion::Legacy {
6802 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6803 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6804 CoalesceBatchesExec: target_batch_size=8192
6805 GlobalLimitExec: skip=0, fetch=1
6806 FilterExec: _distance@2 IS NOT NULL
6807 SortExec: TopK(fetch=5), expr=...
6808 KNNVectorDistance: metric=l2
6809 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None"
6810 } else {
6811 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6812 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6813 CoalesceBatchesExec: target_batch_size=8192
6814 GlobalLimitExec: skip=0, fetch=1
6815 FilterExec: _distance@2 IS NOT NULL
6816 SortExec: TopK(fetch=5), expr=...
6817 KNNVectorDistance: metric=l2
6818 LanceRead: uri=..., projection=[vec], num_fragments=2, range_before=None, range_after=None, \
6819 row_id=true, row_addr=false, full_filter=--, refine_filter=--"
6820 };
6821 assert_plan_equals(
6822 &dataset.dataset,
6823 |scan| scan.nearest("vec", &q, 5)?.limit(Some(1), None),
6824 expected,
6825 )
6826 .await?;
6827
6828 dataset.make_vector_index().await?;
6831 log::info!("Test case: Basic ANN");
6832 let expected =
6833 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
6834 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
6835 CoalesceBatchesExec: target_batch_size=8192
6836 SortExec: TopK(fetch=42), expr=...
6837 ANNSubIndex: name=..., k=42, deltas=1
6838 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
6839 assert_plan_equals(
6840 &dataset.dataset,
6841 |scan| scan.nearest("vec", &q, 42),
6842 expected,
6843 )
6844 .await?;
6845
6846 log::info!("Test case: ANN with refine");
6847 let expected =
6848 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6849 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
6850 CoalesceBatchesExec: target_batch_size=8192
6851 FilterExec: _distance@... IS NOT NULL
6852 SortExec: TopK(fetch=10), expr=...
6853 KNNVectorDistance: metric=l2
6854 Take: columns=\"_distance, _rowid, (vec)\"
6855 CoalesceBatchesExec: target_batch_size=8192
6856 SortExec: TopK(fetch=40), expr=...
6857 ANNSubIndex: name=..., k=40, deltas=1
6858 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
6859 assert_plan_equals(
6860 &dataset.dataset,
6861 |scan| Ok(scan.nearest("vec", &q, 10)?.refine(4)),
6862 expected,
6863 )
6864 .await?;
6865
6866 log::info!("Test case: ANN with index disabled");
6868 let expected = if data_storage_version == LanceFileVersion::Legacy {
6869 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6870 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6871 CoalesceBatchesExec: target_batch_size=8192
6872 FilterExec: _distance@... IS NOT NULL
6873 SortExec: TopK(fetch=13), expr=...
6874 KNNVectorDistance: metric=l2
6875 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None"
6876 } else {
6877 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6878 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6879 CoalesceBatchesExec: target_batch_size=8192
6880 FilterExec: _distance@... IS NOT NULL
6881 SortExec: TopK(fetch=13), expr=...
6882 KNNVectorDistance: metric=l2
6883 LanceRead: uri=..., projection=[vec], num_fragments=2, range_before=None, range_after=None, \
6884 row_id=true, row_addr=false, full_filter=--, refine_filter=--"
6885 };
6886 assert_plan_equals(
6887 &dataset.dataset,
6888 |scan| Ok(scan.nearest("vec", &q, 13)?.use_index(false)),
6889 expected,
6890 )
6891 .await?;
6892
6893 log::info!("Test case: ANN with postfilter");
6894 let expected = "ProjectionExec: expr=[s@3 as s, vec@4 as vec, _distance@0 as _distance, _rowid@1 as _rowid]
6895 Take: columns=\"_distance, _rowid, i, (s), (vec)\"
6896 CoalesceBatchesExec: target_batch_size=8192
6897 FilterExec: i@2 > 10
6898 Take: columns=\"_distance, _rowid, (i)\"
6899 CoalesceBatchesExec: target_batch_size=8192
6900 SortExec: TopK(fetch=17), expr=...
6901 ANNSubIndex: name=..., k=17, deltas=1
6902 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
6903 assert_plan_equals(
6904 &dataset.dataset,
6905 |scan| {
6906 Ok(scan
6907 .nearest("vec", &q, 17)?
6908 .filter("i > 10")?
6909 .project(&["s", "vec"])?
6910 .with_row_id())
6911 },
6912 expected,
6913 )
6914 .await?;
6915
6916 log::info!("Test case: ANN with prefilter");
6917 let expected = if data_storage_version == LanceFileVersion::Legacy {
6918 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
6919 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
6920 CoalesceBatchesExec: target_batch_size=8192
6921 SortExec: TopK(fetch=17), expr=...
6922 ANNSubIndex: name=..., k=17, deltas=1
6923 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6924 FilterExec: i@0 > 10
6925 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None"
6926 } else {
6927 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
6928 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
6929 CoalesceBatchesExec: target_batch_size=8192
6930 SortExec: TopK(fetch=17), expr=...
6931 ANNSubIndex: name=..., k=17, deltas=1
6932 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6933 LanceRead: uri=..., projection=[], num_fragments=2, range_before=None, range_after=None, \
6934 row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)
6935"
6936 };
6937 assert_plan_equals(
6938 &dataset.dataset,
6939 |scan| {
6940 Ok(scan
6941 .nearest("vec", &q, 17)?
6942 .filter("i > 10")?
6943 .prefilter(true))
6944 },
6945 expected,
6946 )
6947 .await?;
6948
6949 dataset.append_new_data().await?;
6950 log::info!("Test case: Combined KNN/ANN");
6951 let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6952 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
6953 CoalesceBatchesExec: target_batch_size=8192
6954 FilterExec: _distance@... IS NOT NULL
6955 SortExec: TopK(fetch=6), expr=...
6956 KNNVectorDistance: metric=l2
6957 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
6958 UnionExec
6959 ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec]
6960 FilterExec: _distance@... IS NOT NULL
6961 SortExec: TopK(fetch=6), expr=...
6962 KNNVectorDistance: metric=l2
6963 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None
6964 Take: columns=\"_distance, _rowid, (vec)\"
6965 CoalesceBatchesExec: target_batch_size=8192
6966 SortExec: TopK(fetch=6), expr=...
6967 ANNSubIndex: name=..., k=6, deltas=1
6968 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
6969 assert_plan_equals(
6970 &dataset.dataset,
6971 |scan| scan.nearest("vec", &q, 6),
6972 expected,
6975 )
6976 .await?;
6977
6978 log::info!("Test case: Combined KNN/ANN with postfilter");
6980 let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6981 Take: columns=\"_rowid, vec, _distance, i, (s)\"
6982 CoalesceBatchesExec: target_batch_size=8192
6983 FilterExec: i@3 > 10
6984 Take: columns=\"_rowid, vec, _distance, (i)\"
6985 CoalesceBatchesExec: target_batch_size=8192
6986 FilterExec: _distance@... IS NOT NULL
6987 SortExec: TopK(fetch=15), expr=...
6988 KNNVectorDistance: metric=l2
6989 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
6990 UnionExec
6991 ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec]
6992 FilterExec: _distance@... IS NOT NULL
6993 SortExec: TopK(fetch=15), expr=...
6994 KNNVectorDistance: metric=l2
6995 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None
6996 Take: columns=\"_distance, _rowid, (vec)\"
6997 CoalesceBatchesExec: target_batch_size=8192
6998 SortExec: TopK(fetch=15), expr=...
6999 ANNSubIndex: name=..., k=15, deltas=1
7000 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
7001 assert_plan_equals(
7002 &dataset.dataset,
7003 |scan| scan.nearest("vec", &q, 15)?.filter("i > 10"),
7004 expected,
7005 )
7006 .await?;
7007
7008 log::info!("Test case: Combined KNN/ANN with prefilter");
7010 let expected = if data_storage_version == LanceFileVersion::Legacy {
7011 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
7012 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
7013 CoalesceBatchesExec: target_batch_size=8192
7014 FilterExec: _distance@... IS NOT NULL
7015 SortExec: TopK(fetch=5), expr=...
7016 KNNVectorDistance: metric=l2
7017 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7018 UnionExec
7019 ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec]
7020 FilterExec: _distance@... IS NOT NULL
7021 SortExec: TopK(fetch=5), expr=...
7022 KNNVectorDistance: metric=l2
7023 FilterExec: i@1 > 10
7024 LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None
7025 Take: columns=\"_distance, _rowid, (vec)\"
7026 CoalesceBatchesExec: target_batch_size=8192
7027 SortExec: TopK(fetch=5), expr=...
7028 ANNSubIndex: name=..., k=5, deltas=1
7029 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
7030 FilterExec: i@0 > 10
7031 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None"
7032 } else {
7033 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
7034 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
7035 CoalesceBatchesExec: target_batch_size=8192
7036 FilterExec: _distance@... IS NOT NULL
7037 SortExec: TopK(fetch=5), expr=...
7038 KNNVectorDistance: metric=l2
7039 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7040 UnionExec
7041 ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec]
7042 FilterExec: _distance@... IS NOT NULL
7043 SortExec: TopK(fetch=5), expr=...
7044 KNNVectorDistance: metric=l2
7045 FilterExec: i@1 > 10
7046 LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None
7047 Take: columns=\"_distance, _rowid, (vec)\"
7048 CoalesceBatchesExec: target_batch_size=8192
7049 SortExec: TopK(fetch=5), expr=...
7050 ANNSubIndex: name=..., k=5, deltas=1
7051 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
7052 LanceRead: uri=..., projection=[], num_fragments=2, range_before=None, range_after=None, \
7053 row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)"
7054 };
7055 assert_plan_equals(
7056 &dataset.dataset,
7057 |scan| {
7058 Ok(scan
7059 .nearest("vec", &q, 5)?
7060 .filter("i > 10")?
7061 .prefilter(true))
7062 },
7063 expected,
7066 )
7067 .await?;
7068
7069 dataset.make_vector_index().await?;
7073 dataset.make_scalar_index().await?;
7074
7075 log::info!("Test case: ANN with scalar index");
7076 let expected =
7077 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
7078 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
7079 CoalesceBatchesExec: target_batch_size=8192
7080 SortExec: TopK(fetch=5), expr=...
7081 ANNSubIndex: name=..., k=5, deltas=1
7082 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
7083 ScalarIndexQuery: query=[i > 10]@i_idx";
7084 assert_plan_equals(
7085 &dataset.dataset,
7086 |scan| {
7087 Ok(scan
7088 .nearest("vec", &q, 5)?
7089 .filter("i > 10")?
7090 .prefilter(true))
7091 },
7092 expected,
7093 )
7094 .await?;
7095
7096 log::info!("Test case: ANN with scalar index disabled");
7097 let expected = if data_storage_version == LanceFileVersion::Legacy {
7098 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
7099 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
7100 CoalesceBatchesExec: target_batch_size=8192
7101 SortExec: TopK(fetch=5), expr=...
7102 ANNSubIndex: name=..., k=5, deltas=1
7103 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
7104 FilterExec: i@0 > 10
7105 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None"
7106 } else {
7107 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
7108 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
7109 CoalesceBatchesExec: target_batch_size=8192
7110 SortExec: TopK(fetch=5), expr=...
7111 ANNSubIndex: name=..., k=5, deltas=1
7112 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
7113 LanceRead: uri=..., projection=[], num_fragments=3, range_before=None, \
7114 range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)"
7115 };
7116 assert_plan_equals(
7117 &dataset.dataset,
7118 |scan| {
7119 Ok(scan
7120 .nearest("vec", &q, 5)?
7121 .use_scalar_index(false)
7122 .filter("i > 10")?
7123 .prefilter(true))
7124 },
7125 expected,
7126 )
7127 .await?;
7128
7129 dataset.append_new_data().await?;
7130
7131 log::info!("Test case: Combined KNN/ANN with scalar index");
7132 let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
7133 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
7134 CoalesceBatchesExec: target_batch_size=8192
7135 FilterExec: _distance@... IS NOT NULL
7136 SortExec: TopK(fetch=8), expr=...
7137 KNNVectorDistance: metric=l2
7138 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7139 UnionExec
7140 ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec]
7141 FilterExec: _distance@... IS NOT NULL
7142 SortExec: TopK(fetch=8), expr=...
7143 KNNVectorDistance: metric=l2
7144 FilterExec: i@1 > 10
7145 LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None
7146 Take: columns=\"_distance, _rowid, (vec)\"
7147 CoalesceBatchesExec: target_batch_size=8192
7148 SortExec: TopK(fetch=8), expr=...
7149 ANNSubIndex: name=..., k=8, deltas=1
7150 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
7151 ScalarIndexQuery: query=[i > 10]@i_idx";
7152 assert_plan_equals(
7153 &dataset.dataset,
7154 |scan| {
7155 Ok(scan
7156 .nearest("vec", &q, 8)?
7157 .filter("i > 10")?
7158 .prefilter(true))
7159 },
7160 expected,
7161 )
7162 .await?;
7163
7164 log::info!(
7166 "Test case: Combined KNN/ANN with updated scalar index and outdated vector index"
7167 );
7168 let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
7169 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
7170 CoalesceBatchesExec: target_batch_size=8192
7171 FilterExec: _distance@... IS NOT NULL
7172 SortExec: TopK(fetch=11), expr=...
7173 KNNVectorDistance: metric=l2
7174 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7175 UnionExec
7176 ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec]
7177 FilterExec: _distance@... IS NOT NULL
7178 SortExec: TopK(fetch=11), expr=...
7179 KNNVectorDistance: metric=l2
7180 FilterExec: i@1 > 10
7181 LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None
7182 Take: columns=\"_distance, _rowid, (vec)\"
7183 CoalesceBatchesExec: target_batch_size=8192
7184 SortExec: TopK(fetch=11), expr=...
7185 ANNSubIndex: name=..., k=11, deltas=1
7186 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
7187 ScalarIndexQuery: query=[i > 10]@i_idx";
7188 dataset.make_scalar_index().await?;
7189 assert_plan_equals(
7190 &dataset.dataset,
7191 |scan| {
7192 Ok(scan
7193 .nearest("vec", &q, 11)?
7194 .filter("i > 10")?
7195 .prefilter(true))
7196 },
7197 expected,
7198 )
7199 .await?;
7200
7201 log::info!("Test case: Filtered read with scalar index");
7204 let expected = if data_storage_version == LanceFileVersion::Legacy {
7205 "ProjectionExec: expr=[s@1 as s]
7206 Take: columns=\"_rowid, (s)\"
7207 CoalesceBatchesExec: target_batch_size=8192
7208 MaterializeIndex: query=[i > 10]@i_idx"
7209 } else {
7210 "LanceRead: uri=..., projection=[s], num_fragments=4, range_before=None, \
7211 range_after=None, row_id=false, row_addr=false, full_filter=i > Int32(10), refine_filter=--
7212 ScalarIndexQuery: query=[i > 10]@i_idx"
7213 };
7214 assert_plan_equals(
7215 &dataset.dataset,
7216 |scan| scan.project(&["s"])?.filter("i > 10"),
7217 expected,
7218 )
7219 .await?;
7220
7221 if data_storage_version != LanceFileVersion::Legacy {
7222 log::info!(
7223 "Test case: Filtered read with scalar index disabled (late materialization)"
7224 );
7225 assert_plan_equals(
7226 &dataset.dataset,
7227 |scan| {
7228 scan.project(&["s"])?
7229 .use_scalar_index(false)
7230 .filter("i > 10")
7231 },
7232 "ProjectionExec: expr=[s@2 as s]
7233 Take: columns=\"i, _rowid, (s)\"
7234 CoalesceBatchesExec: target_batch_size=8192
7235 LanceRead: uri=..., projection=[i], num_fragments=4, range_before=None, \
7236 range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)",
7237 )
7238 .await?;
7239 }
7240
7241 log::info!("Test case: Empty projection");
7242 let expected = if data_storage_version == LanceFileVersion::Legacy {
7243 "ProjectionExec: expr=[_rowaddr@0 as _rowaddr]
7244 AddRowAddrExec
7245 MaterializeIndex: query=[i > 10]@i_idx"
7246 } else {
7247 "LanceRead: uri=..., projection=[], num_fragments=4, range_before=None, \
7248 range_after=None, row_id=false, row_addr=true, full_filter=i > Int32(10), refine_filter=--
7249 ScalarIndexQuery: query=[i > 10]@i_idx"
7250 };
7251 assert_plan_equals(
7252 &dataset.dataset,
7253 |scan| {
7254 scan.filter("i > 10")
7255 .unwrap()
7256 .with_row_address()
7257 .project::<&str>(&[])
7258 },
7259 expected,
7260 )
7261 .await?;
7262
7263 dataset.append_new_data().await?;
7264 log::info!("Test case: Combined Scalar/non-scalar filtered read");
7265 let expected = if data_storage_version == LanceFileVersion::Legacy {
7266 "ProjectionExec: expr=[s@1 as s]
7267 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7268 UnionExec
7269 Take: columns=\"_rowid, (s)\"
7270 CoalesceBatchesExec: target_batch_size=8192
7271 MaterializeIndex: query=[i > 10]@i_idx
7272 ProjectionExec: expr=[_rowid@2 as _rowid, s@1 as s]
7273 FilterExec: i@0 > 10
7274 LanceScan: uri=..., projection=[i, s], row_id=true, row_addr=false, ordered=false, range=None"
7275 } else {
7276 "LanceRead: uri=..., projection=[s], num_fragments=5, range_before=None, \
7277 range_after=None, row_id=false, row_addr=false, full_filter=i > Int32(10), refine_filter=--
7278 ScalarIndexQuery: query=[i > 10]@i_idx"
7279 };
7280 assert_plan_equals(
7281 &dataset.dataset,
7282 |scan| scan.project(&["s"])?.filter("i > 10"),
7283 expected,
7284 )
7285 .await?;
7286
7287 log::info!("Test case: Combined Scalar/non-scalar filtered read with empty projection");
7288 let expected = if data_storage_version == LanceFileVersion::Legacy {
7289 "ProjectionExec: expr=[_rowaddr@0 as _rowaddr]
7290 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7291 UnionExec
7292 AddRowAddrExec
7293 MaterializeIndex: query=[i > 10]@i_idx
7294 ProjectionExec: expr=[_rowaddr@2 as _rowaddr, _rowid@1 as _rowid]
7295 FilterExec: i@0 > 10
7296 LanceScan: uri=..., projection=[i], row_id=true, row_addr=true, ordered=false, range=None"
7297 } else {
7298 "LanceRead: uri=..., projection=[], num_fragments=5, range_before=None, \
7299 range_after=None, row_id=false, row_addr=true, full_filter=i > Int32(10), refine_filter=--
7300 ScalarIndexQuery: query=[i > 10]@i_idx"
7301 };
7302 assert_plan_equals(
7303 &dataset.dataset,
7304 |scan| {
7305 scan.filter("i > 10")
7306 .unwrap()
7307 .with_row_address()
7308 .project::<&str>(&[])
7309 },
7310 expected,
7311 )
7312 .await?;
7313
7314 log::info!("Test case: Dynamic projection");
7317 let expected = if data_storage_version == LanceFileVersion::Legacy {
7318 "ProjectionExec: expr=[regexp_match(s@1, .*) as matches]
7319 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7320 UnionExec
7321 Take: columns=\"_rowid, (s)\"
7322 CoalesceBatchesExec: target_batch_size=8192
7323 MaterializeIndex: query=[i > 10]@i_idx
7324 ProjectionExec: expr=[_rowid@2 as _rowid, s@1 as s]
7325 FilterExec: i@0 > 10
7326 LanceScan: uri=..., row_id=true, row_addr=false, ordered=false, range=None"
7327 } else {
7328 "ProjectionExec: expr=[regexp_match(s@0, .*) as matches]
7329 LanceRead: uri=..., projection=[s], num_fragments=5, range_before=None, \
7330 range_after=None, row_id=false, row_addr=false, full_filter=i > Int32(10), refine_filter=--
7331 ScalarIndexQuery: query=[i > 10]@i_idx"
7332 };
7333 assert_plan_equals(
7334 &dataset.dataset,
7335 |scan| {
7336 scan.project_with_transform(&[("matches", "regexp_match(s, \".*\")")])?
7337 .filter("i > 10")
7338 },
7339 expected,
7340 )
7341 .await?;
7342
7343 dataset.make_fts_index().await?;
7347 log::info!("Test case: Full text search (match query)");
7348 let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7349 Take: columns="_rowid, _score, (s)"
7350 CoalesceBatchesExec: target_batch_size=8192
7351 MatchQuery: query=hello"#;
7352 assert_plan_equals(
7353 &dataset.dataset,
7354 |scan| {
7355 scan.project(&["s"])?
7356 .with_row_id()
7357 .full_text_search(FullTextSearchQuery::new("hello".to_owned()))
7358 },
7359 expected,
7360 )
7361 .await?;
7362
7363 log::info!("Test case: Full text search (phrase query)");
7364 let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7365 Take: columns="_rowid, _score, (s)"
7366 CoalesceBatchesExec: target_batch_size=8192
7367 PhraseQuery: query=hello world"#;
7368 assert_plan_equals(
7369 &dataset.dataset,
7370 |scan| {
7371 let query = PhraseQuery::new("hello world".to_owned());
7372 scan.project(&["s"])?
7373 .with_row_id()
7374 .full_text_search(FullTextSearchQuery::new_query(query.into()))
7375 },
7376 expected,
7377 )
7378 .await?;
7379
7380 log::info!("Test case: Full text search (boost query)");
7381 let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7382 Take: columns="_rowid, _score, (s)"
7383 CoalesceBatchesExec: target_batch_size=8192
7384 BoostQuery: negative_boost=1
7385 MatchQuery: query=hello
7386 MatchQuery: query=world"#;
7387 assert_plan_equals(
7388 &dataset.dataset,
7389 |scan| {
7390 let positive =
7391 MatchQuery::new("hello".to_owned()).with_column(Some("s".to_owned()));
7392 let negative =
7393 MatchQuery::new("world".to_owned()).with_column(Some("s".to_owned()));
7394 let query = BoostQuery::new(positive.into(), negative.into(), Some(1.0));
7395 scan.project(&["s"])?
7396 .with_row_id()
7397 .full_text_search(FullTextSearchQuery::new_query(query.into()))
7398 },
7399 expected,
7400 )
7401 .await?;
7402
7403 log::info!("Test case: Full text search with prefilter");
7404 let expected = if data_storage_version == LanceFileVersion::Legacy {
7405 r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7406 Take: columns="_rowid, _score, (s)"
7407 CoalesceBatchesExec: target_batch_size=8192
7408 MatchQuery: query=hello
7409 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7410 UnionExec
7411 MaterializeIndex: query=[i > 10]@i_idx
7412 ProjectionExec: expr=[_rowid@1 as _rowid]
7413 FilterExec: i@0 > 10
7414 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None"#
7415 } else {
7416 r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7417 Take: columns="_rowid, _score, (s)"
7418 CoalesceBatchesExec: target_batch_size=8192
7419 MatchQuery: query=hello
7420 LanceRead: uri=..., projection=[], num_fragments=5, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=--
7421 ScalarIndexQuery: query=[i > 10]@i_idx"#
7422 };
7423 assert_plan_equals(
7424 &dataset.dataset,
7425 |scan| {
7426 scan.project(&["s"])?
7427 .with_row_id()
7428 .filter("i > 10")?
7429 .prefilter(true)
7430 .full_text_search(FullTextSearchQuery::new("hello".to_owned()))
7431 },
7432 expected,
7433 )
7434 .await?;
7435
7436 log::info!("Test case: Full text search with unindexed rows");
7437 let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7438 Take: columns="_rowid, _score, (s)"
7439 CoalesceBatchesExec: target_batch_size=8192
7440 SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false]
7441 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7442 UnionExec
7443 MatchQuery: query=hello
7444 FlatMatchQuery: query=hello
7445 LanceScan: uri=..., projection=[s], row_id=true, row_addr=false, ordered=false, range=None"#;
7446 dataset.append_new_data().await?;
7447 assert_plan_equals(
7448 &dataset.dataset,
7449 |scan| {
7450 scan.project(&["s"])?
7451 .with_row_id()
7452 .full_text_search(FullTextSearchQuery::new("hello".to_owned()))
7453 },
7454 expected,
7455 )
7456 .await?;
7457
7458 log::info!("Test case: Full text search with unindexed rows and prefilter");
7459 let expected = if data_storage_version == LanceFileVersion::Legacy {
7460 r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7461 Take: columns="_rowid, _score, (s)"
7462 CoalesceBatchesExec: target_batch_size=8192
7463 SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false]
7464 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7465 UnionExec
7466 MatchQuery: query=hello
7467 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7468 UnionExec
7469 MaterializeIndex: query=[i > 10]@i_idx
7470 ProjectionExec: expr=[_rowid@1 as _rowid]
7471 FilterExec: i@0 > 10
7472 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None
7473 FlatMatchQuery: query=hello
7474 FilterExec: i@1 > 10
7475 LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false, range=None"#
7476 } else {
7477 r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7478 Take: columns="_rowid, _score, (s)"
7479 CoalesceBatchesExec: target_batch_size=8192
7480 SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false]
7481 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7482 UnionExec
7483 MatchQuery: query=hello
7484 LanceRead: uri=..., projection=[], num_fragments=5, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=--
7485 ScalarIndexQuery: query=[i > 10]@i_idx
7486 FlatMatchQuery: query=hello
7487 FilterExec: i@1 > 10
7488 LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false, range=None"#
7489 };
7490 assert_plan_equals(
7491 &dataset.dataset,
7492 |scan| {
7493 scan.project(&["s"])?
7494 .with_row_id()
7495 .filter("i > 10")?
7496 .prefilter(true)
7497 .full_text_search(FullTextSearchQuery::new("hello".to_owned()))
7498 },
7499 expected,
7500 )
7501 .await?;
7502
7503 Ok(())
7504 }
7505
7506 #[tokio::test]
7507 async fn test_fast_search_plan() {
7508 let mut dataset = TestVectorDataset::new(LanceFileVersion::Stable, true)
7510 .await
7511 .unwrap();
7512 dataset.make_vector_index().await.unwrap();
7513 dataset.append_new_data().await.unwrap();
7514
7515 let q: Float32Array = (32..64).map(|v| v as f32).collect();
7516
7517 assert_plan_equals(
7518 &dataset.dataset,
7519 |scan| {
7520 scan.nearest("vec", &q, 32)?
7521 .fast_search()
7522 .project(&["_distance", "_rowid"])
7523 },
7524 "SortExec: TopK(fetch=32), expr=[_distance@0 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]...
7525 ANNSubIndex: name=idx, k=32, deltas=1
7526 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1",
7527 )
7528 .await
7529 .unwrap();
7530
7531 assert_plan_equals(
7532 &dataset.dataset,
7533 |scan| {
7534 scan.nearest("vec", &q, 33)?
7535 .fast_search()
7536 .with_row_id()
7537 .project(&["_distance", "_rowid"])
7538 },
7539 "SortExec: TopK(fetch=33), expr=[_distance@0 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]...
7540 ANNSubIndex: name=idx, k=33, deltas=1
7541 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1",
7542 )
7543 .await
7544 .unwrap();
7545
7546 assert_plan_equals(
7548 &dataset.dataset,
7549 |scan| {
7550 scan.nearest("vec", &q, 34)?
7551 .with_row_id()
7552 .project(&["_distance", "_rowid"])
7553 },
7554 "ProjectionExec: expr=[_distance@2 as _distance, _rowid@0 as _rowid]
7555 FilterExec: _distance@2 IS NOT NULL
7556 SortExec: TopK(fetch=34), expr=[_distance@2 ASC NULLS LAST, _rowid@0 ASC NULLS LAST]...
7557 KNNVectorDistance: metric=l2
7558 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7559 UnionExec
7560 ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec]
7561 FilterExec: _distance@2 IS NOT NULL
7562 SortExec: TopK(fetch=34), expr=[_distance@2 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]...
7563 KNNVectorDistance: metric=l2
7564 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None
7565 Take: columns=\"_distance, _rowid, (vec)\"
7566 CoalesceBatchesExec: target_batch_size=8192
7567 SortExec: TopK(fetch=34), expr=[_distance@0 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]...
7568 ANNSubIndex: name=idx, k=34, deltas=1
7569 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1",
7570 )
7571 .await
7572 .unwrap();
7573 }
7574
7575 #[rstest]
7576 #[tokio::test]
7577 pub async fn test_scan_planning_io(
7578 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
7579 data_storage_version: LanceFileVersion,
7580 ) {
7581 use lance_index::scalar::inverted::tokenizer::InvertedIndexParams;
7585 use lance_io::assert_io_eq;
7586 let data = gen_batch()
7587 .col(
7588 "vector",
7589 array::rand_vec::<Float32Type>(Dimension::from(32)),
7590 )
7591 .col("text", array::rand_utf8(ByteCount::from(4), false))
7592 .col("indexed", array::step::<Int32Type>())
7593 .col("not_indexed", array::step::<Int32Type>())
7594 .into_reader_rows(RowCount::from(100), BatchCount::from(5));
7595
7596 let io_tracker = Arc::new(IOTracker::default());
7597 let mut dataset = Dataset::write(
7598 data,
7599 "memory://test",
7600 Some(WriteParams {
7601 store_params: Some(ObjectStoreParams {
7602 object_store_wrapper: Some(io_tracker.clone()),
7603 ..Default::default()
7604 }),
7605 data_storage_version: Some(data_storage_version),
7606 ..Default::default()
7607 }),
7608 )
7609 .await
7610 .unwrap();
7611 dataset
7612 .create_index(
7613 &["indexed"],
7614 IndexType::Scalar,
7615 None,
7616 &ScalarIndexParams::default(),
7617 false,
7618 )
7619 .await
7620 .unwrap();
7621 dataset
7622 .create_index(
7623 &["text"],
7624 IndexType::Inverted,
7625 None,
7626 &InvertedIndexParams::default(),
7627 false,
7628 )
7629 .await
7630 .unwrap();
7631 dataset
7632 .create_index(
7633 &["vector"],
7634 IndexType::Vector,
7635 None,
7636 &VectorIndexParams {
7637 metric_type: DistanceType::L2,
7638 stages: vec![
7639 StageParams::Ivf(IvfBuildParams {
7640 max_iters: 2,
7641 num_partitions: Some(2),
7642 sample_rate: 2,
7643 ..Default::default()
7644 }),
7645 StageParams::PQ(PQBuildParams {
7646 max_iters: 2,
7647 num_sub_vectors: 2,
7648 ..Default::default()
7649 }),
7650 ],
7651 version: crate::index::vector::IndexFileVersion::Legacy,
7652 },
7653 false,
7654 )
7655 .await
7656 .unwrap();
7657
7658 dataset
7660 .scan()
7661 .prefilter(true)
7662 .filter("indexed > 10")
7663 .unwrap()
7664 .explain_plan(true)
7665 .await
7666 .unwrap();
7667
7668 let io_stats = io_tracker.incremental_stats();
7670 assert_io_gt!(io_stats, read_iops, 0);
7671
7672 dataset
7674 .scan()
7675 .prefilter(true)
7676 .filter("indexed > 10")
7677 .unwrap()
7678 .explain_plan(true)
7679 .await
7680 .unwrap();
7681
7682 let io_stats = io_tracker.incremental_stats();
7683 assert_io_eq!(io_stats, read_iops, 0);
7684
7685 dataset
7686 .scan()
7687 .prefilter(true)
7688 .filter("true")
7689 .unwrap()
7690 .explain_plan(true)
7691 .await
7692 .unwrap();
7693
7694 let io_stats = io_tracker.incremental_stats();
7695 assert_io_eq!(io_stats, read_iops, 0);
7696
7697 dataset
7698 .scan()
7699 .prefilter(true)
7700 .materialization_style(MaterializationStyle::AllEarly)
7701 .filter("true")
7702 .unwrap()
7703 .explain_plan(true)
7704 .await
7705 .unwrap();
7706
7707 let io_stats = io_tracker.incremental_stats();
7708 assert_io_eq!(io_stats, read_iops, 0);
7709
7710 dataset
7711 .scan()
7712 .prefilter(true)
7713 .materialization_style(MaterializationStyle::AllLate)
7714 .filter("true")
7715 .unwrap()
7716 .explain_plan(true)
7717 .await
7718 .unwrap();
7719
7720 let io_stats = io_tracker.incremental_stats();
7721 assert_io_eq!(io_stats, read_iops, 0);
7722 }
7723
7724 #[rstest]
7725 #[tokio::test]
7726 pub async fn test_row_meta_columns(
7727 #[values(
7728 (true, false), (false, true), (true, true) )]
7732 columns: (bool, bool),
7733 ) {
7734 let (with_row_id, with_row_address) = columns;
7735 let test_dir = TempStrDir::default();
7736 let uri = &test_dir;
7737
7738 let schema = Arc::new(arrow_schema::Schema::new(vec![
7739 arrow_schema::Field::new("data_item_id", arrow_schema::DataType::Int32, false),
7740 arrow_schema::Field::new("a", arrow_schema::DataType::Int32, false),
7741 ]));
7742
7743 let data = RecordBatch::try_new(
7744 schema.clone(),
7745 vec![
7746 Arc::new(Int32Array::from(vec![1001, 1002, 1003])),
7747 Arc::new(Int32Array::from(vec![1, 2, 3])),
7748 ],
7749 )
7750 .unwrap();
7751
7752 let dataset = Dataset::write(
7753 RecordBatchIterator::new(vec![Ok(data)], schema.clone()),
7754 uri,
7755 None,
7756 )
7757 .await
7758 .unwrap();
7759
7760 let mut scanner = dataset.scan();
7762
7763 let mut projection = vec!["data_item_id".to_string()];
7764 if with_row_id {
7765 scanner.with_row_id();
7766 projection.push(ROW_ID.to_string());
7767 }
7768 if with_row_address {
7769 scanner.with_row_address();
7770 projection.push(ROW_ADDR.to_string());
7771 }
7772
7773 scanner.project(&projection).unwrap();
7774 let stream = scanner.try_into_stream().await.unwrap();
7775 let batch = stream.try_collect::<Vec<_>>().await.unwrap().pop().unwrap();
7776
7777 if with_row_id {
7779 let column = batch.column_by_name(ROW_ID).unwrap();
7780 assert_eq!(column.data_type(), &DataType::UInt64);
7781 }
7782 if with_row_address {
7783 let column = batch.column_by_name(ROW_ADDR).unwrap();
7784 assert_eq!(column.data_type(), &DataType::UInt64);
7785 }
7786
7787 let mut scanner = dataset.scan();
7789 if with_row_id {
7790 scanner.with_row_id();
7791 }
7792 if with_row_address {
7793 scanner.with_row_address();
7794 }
7795 scanner.project(&["data_item_id"]).unwrap();
7796 let stream = scanner.try_into_stream().await.unwrap();
7797 let batch = stream.try_collect::<Vec<_>>().await.unwrap().pop().unwrap();
7798 let meta_column = batch.column_by_name(if with_row_id { ROW_ID } else { ROW_ADDR });
7799 assert!(meta_column.is_some());
7800
7801 let mut scanner = dataset.scan();
7803 if with_row_id {
7804 scanner.project(&[ROW_ID]).unwrap();
7805 } else {
7806 scanner.project(&[ROW_ADDR]).unwrap();
7807 };
7808 let stream = scanner.try_into_stream().await.unwrap();
7809 assert_eq!(stream.schema().fields().len(), 1);
7810 if with_row_id {
7811 assert!(stream.schema().field_with_name(ROW_ID).is_ok());
7812 } else {
7813 assert!(stream.schema().field_with_name(ROW_ADDR).is_ok());
7814 }
7815 }
7816
7817 async fn limit_offset_equivalency_test(scanner: &Scanner) {
7818 async fn test_one(
7819 scanner: &Scanner,
7820 full_result: &RecordBatch,
7821 limit: Option<i64>,
7822 offset: Option<i64>,
7823 ) {
7824 let mut new_scanner = scanner.clone();
7825 new_scanner.limit(limit, offset).unwrap();
7826 if let Some(nearest) = new_scanner.nearest_mut() {
7827 nearest.k = offset.unwrap_or(0).saturating_add(limit.unwrap_or(10_000)) as usize;
7828 }
7829 let result = new_scanner.try_into_batch().await.unwrap();
7830
7831 let resolved_offset = offset.unwrap_or(0).min(full_result.num_rows() as i64);
7832 let resolved_length = limit
7833 .unwrap_or(i64::MAX)
7834 .min(full_result.num_rows() as i64 - resolved_offset);
7835
7836 let expected = full_result.slice(resolved_offset as usize, resolved_length as usize);
7837
7838 if expected != result {
7839 let plan = new_scanner.analyze_plan().await.unwrap();
7840 assert_eq!(
7841 &expected, &result,
7842 "Limit: {:?}, Offset: {:?}, Plan: \n{}",
7843 limit, offset, plan
7844 );
7845 }
7846 }
7847
7848 let mut scanner_full = scanner.clone();
7849 if let Some(nearest) = scanner_full.nearest_mut() {
7850 nearest.k = 500;
7851 }
7852 let full_results = scanner_full.try_into_batch().await.unwrap();
7853
7854 test_one(scanner, &full_results, Some(1), None).await;
7855 test_one(scanner, &full_results, Some(1), Some(1)).await;
7856 test_one(scanner, &full_results, Some(1), Some(2)).await;
7857 test_one(scanner, &full_results, Some(1), Some(10)).await;
7858
7859 test_one(scanner, &full_results, Some(3), None).await;
7860 test_one(scanner, &full_results, Some(3), Some(2)).await;
7861 test_one(scanner, &full_results, Some(3), Some(4)).await;
7862
7863 test_one(scanner, &full_results, None, Some(3)).await;
7864 test_one(scanner, &full_results, None, Some(10)).await;
7865 }
7866
7867 #[tokio::test]
7868 async fn test_scan_limit_offset() {
7869 let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
7870 .await
7871 .unwrap();
7872 let scanner = test_ds.dataset.scan();
7873 limit_offset_equivalency_test(&scanner).await;
7874 }
7875
7876 #[tokio::test]
7877 async fn test_knn_limit_offset() {
7878 let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
7879 .await
7880 .unwrap();
7881 let query_vector = Float32Array::from(vec![0.0; 32]);
7882 let mut scanner = test_ds.dataset.scan();
7883 scanner
7884 .nearest("vec", &query_vector, 5)
7885 .unwrap()
7886 .project(&["i"])
7887 .unwrap();
7888 limit_offset_equivalency_test(&scanner).await;
7889 }
7890
7891 #[tokio::test]
7892 async fn test_ivf_pq_limit_offset() {
7893 let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
7894 .await
7895 .unwrap();
7896 test_ds.make_vector_index().await.unwrap();
7897 test_ds.append_new_data().await.unwrap();
7898 let query_vector = Float32Array::from(vec![0.0; 32]);
7899 let mut scanner = test_ds.dataset.scan();
7900 scanner.nearest("vec", &query_vector, 500).unwrap();
7901 limit_offset_equivalency_test(&scanner).await;
7902 }
7903
7904 #[tokio::test]
7905 async fn test_fts_limit_offset() {
7906 let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
7907 .await
7908 .unwrap();
7909 test_ds.make_fts_index().await.unwrap();
7910 test_ds.append_new_data().await.unwrap();
7911 let mut scanner = test_ds.dataset.scan();
7912 scanner
7913 .full_text_search(FullTextSearchQuery::new("4".into()))
7914 .unwrap();
7915 limit_offset_equivalency_test(&scanner).await;
7916 }
7917
7918 async fn test_row_offset_read_helper(
7919 ds: &Dataset,
7920 scan_builder: impl FnOnce(&mut Scanner) -> &mut Scanner,
7921 expected_cols: &[&str],
7922 expected_row_offsets: &[u64],
7923 ) {
7924 let mut scanner = ds.scan();
7925 let scanner = scan_builder(&mut scanner);
7926 let stream = scanner.try_into_stream().await.unwrap();
7927
7928 let schema = stream.schema();
7929 let actual_cols = schema
7930 .fields()
7931 .iter()
7932 .map(|f| f.name().as_str())
7933 .collect::<Vec<_>>();
7934 assert_eq!(&actual_cols, expected_cols);
7935
7936 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
7937 let batch = arrow_select::concat::concat_batches(&schema, &batches).unwrap();
7938
7939 let row_offsets = batch
7940 .column_by_name(ROW_OFFSET)
7941 .unwrap()
7942 .as_primitive::<UInt64Type>()
7943 .values();
7944 assert_eq!(row_offsets.as_ref(), expected_row_offsets);
7945 }
7946
7947 #[tokio::test]
7948 async fn test_row_offset_read() {
7949 let mut ds = lance_datagen::gen_batch()
7950 .col("idx", array::step::<Int32Type>())
7951 .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(3))
7952 .await
7953 .unwrap();
7954 ds.delete("idx >= 2 AND idx <= 6").await.unwrap();
7958
7959 test_row_offset_read_helper(
7961 &ds,
7962 |scanner| scanner.project(&["idx", ROW_OFFSET]).unwrap(),
7963 &["idx", ROW_OFFSET],
7964 &[0, 1, 2, 3],
7965 )
7966 .await;
7967
7968 test_row_offset_read_helper(
7970 &ds,
7971 |scanner| scanner.project(&[ROW_OFFSET]).unwrap(),
7972 &[ROW_OFFSET],
7973 &[0, 1, 2, 3],
7974 )
7975 .await;
7976
7977 test_row_offset_read_helper(
7979 &ds,
7980 |scanner| {
7981 scanner
7982 .filter("idx > 1")
7983 .unwrap()
7984 .project(&[ROW_OFFSET])
7985 .unwrap()
7986 },
7987 &[ROW_OFFSET],
7988 &[2, 3],
7989 )
7990 .await;
7991 }
7992
7993 #[tokio::test]
7994 async fn test_filter_to_take() {
7995 let mut ds = lance_datagen::gen_batch()
7996 .col("idx", array::step::<Int32Type>())
7997 .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(100))
7998 .await
7999 .unwrap();
8000
8001 let row_ids = ds
8002 .scan()
8003 .project(&Vec::<&str>::default())
8004 .unwrap()
8005 .with_row_id()
8006 .try_into_stream()
8007 .await
8008 .unwrap()
8009 .try_collect::<Vec<_>>()
8010 .await
8011 .unwrap();
8012 let schema = row_ids[0].schema();
8013 let row_ids = concat_batches(&schema, row_ids.iter()).unwrap();
8014 let row_ids = row_ids.column(0).as_primitive::<UInt64Type>().clone();
8015
8016 let row_addrs = ds
8017 .scan()
8018 .project(&Vec::<&str>::default())
8019 .unwrap()
8020 .with_row_address()
8021 .try_into_stream()
8022 .await
8023 .unwrap()
8024 .try_collect::<Vec<_>>()
8025 .await
8026 .unwrap();
8027 let schema = row_addrs[0].schema();
8028 let row_addrs = concat_batches(&schema, row_addrs.iter()).unwrap();
8029 let row_addrs = row_addrs.column(0).as_primitive::<UInt64Type>().clone();
8030
8031 ds.delete("idx >= 190 AND idx < 210").await.unwrap();
8032
8033 let ds_copy = ds.clone();
8034 let do_check = async move |filt: &str, expected_idx: &[i32], applies_optimization: bool| {
8035 let mut scanner = ds_copy.scan();
8036 scanner.filter(filt).unwrap();
8037 let plan = scanner.explain_plan(true).await.unwrap();
8039 if applies_optimization {
8040 assert!(
8041 plan.contains("OneShotStream"),
8042 "expected take optimization to be applied. Filter: '{}'. Plan:\n{}",
8043 filt,
8044 plan
8045 );
8046 } else {
8047 assert!(
8048 !plan.contains("OneShotStream"),
8049 "expected take optimization to not be applied. Filter: '{}'. Plan:\n{}",
8050 filt,
8051 plan
8052 );
8053 }
8054
8055 let stream = scanner.try_into_stream().await.unwrap();
8057 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
8058 let idx = batches
8059 .iter()
8060 .map(|b| b.column_by_name("idx").unwrap().as_ref())
8061 .collect::<Vec<_>>();
8062
8063 if idx.is_empty() {
8064 assert!(expected_idx.is_empty());
8065 return;
8066 }
8067
8068 let idx = arrow::compute::concat(&idx).unwrap();
8069 assert_eq!(idx.as_primitive::<Int32Type>().values(), expected_idx);
8070 };
8071 let check =
8072 async |filt: &str, expected_idx: &[i32]| do_check(filt, expected_idx, true).await;
8073 let check_no_opt = async |filt: &str, expected_idx: &[i32]| {
8074 do_check(filt, expected_idx, false).await;
8075 };
8076
8077 check("_rowid = 50", &[50]).await;
8079 check("_rowaddr = 50", &[50]).await;
8080 check("_rowoffset = 50", &[50]).await;
8081
8082 check(
8083 "_rowid = 50 OR _rowid = 51 OR _rowid = 52 OR _rowid = 49",
8084 &[49, 50, 51, 52],
8085 )
8086 .await;
8087 check(
8088 "_rowaddr = 50 OR _rowaddr = 51 OR _rowaddr = 52 OR _rowaddr = 49",
8089 &[49, 50, 51, 52],
8090 )
8091 .await;
8092 check(
8093 "_rowoffset = 50 OR _rowoffset = 51 OR _rowoffset = 52 OR _rowoffset = 49",
8094 &[49, 50, 51, 52],
8095 )
8096 .await;
8097
8098 check("_rowid IN (52, 51, 50, 17)", &[17, 50, 51, 52]).await;
8099 check("_rowaddr IN (52, 51, 50, 17)", &[17, 50, 51, 52]).await;
8100 check("_rowoffset IN (52, 51, 50, 17)", &[17, 50, 51, 52]).await;
8101
8102 check(&format!("_rowid = {}", row_ids.value(190)), &[]).await;
8106 check(&format!("_rowaddr = {}", row_addrs.value(190)), &[]).await;
8107 check("_rowoffset = 190", &[210]).await;
8110
8111 check(&format!("_rowid = {}", row_ids.value(250)), &[250]).await;
8113 check(&format!("_rowaddr = {}", row_addrs.value(250)), &[250]).await;
8114 check("_rowoffset = 250", &[270]).await;
8115
8116 check("_rowoffset = 1000", &[]).await;
8118
8119 check("_rowid IN (5, 10, 15) AND idx > 10", &[15]).await;
8121 check("_rowaddr IN (5, 10, 15) AND idx > 10", &[15]).await;
8122 check("_rowoffset IN (5, 10, 15) AND idx > 10", &[15]).await;
8123 check("idx > 10 AND _rowid IN (5, 10, 15)", &[15]).await;
8124 check("idx > 10 AND _rowaddr IN (5, 10, 15)", &[15]).await;
8125 check("idx > 10 AND _rowoffset IN (5, 10, 15)", &[15]).await;
8126 check("_rowid = 50 AND _rowid = 50", &[50]).await;
8128
8129 check_no_opt("_rowid = 50 AND _rowid = 51", &[]).await;
8131 check_no_opt("(_rowid = 50 AND idx < 100) OR _rowid = 51", &[50, 51]).await;
8132
8133 let mut scanner = ds.scan();
8135 scanner.filter("_rowoffset = 77").unwrap();
8136 scanner
8137 .project_with_transform(&[("foo", "idx * 2")])
8138 .unwrap();
8139 let stream = scanner.try_into_stream().await.unwrap();
8140 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
8141 assert_eq!(batches[0].schema().field(0).name(), "foo");
8142 let val = batches[0].column(0).as_primitive::<Int32Type>().values()[0];
8143 assert_eq!(val, 154);
8144 }
8145
8146 #[tokio::test]
8147 async fn test_nested_field_ordering() {
8148 use arrow_array::StructArray;
8149
8150 let id_array = Int32Array::from(vec![3, 1, 2]);
8152 let nested_values = Int32Array::from(vec![30, 10, 20]);
8153 let nested_struct = StructArray::from(vec![(
8154 Arc::new(ArrowField::new("value", DataType::Int32, false)),
8155 Arc::new(nested_values) as ArrayRef,
8156 )]);
8157
8158 let schema = Arc::new(ArrowSchema::new(vec![
8159 ArrowField::new("id", DataType::Int32, false),
8160 ArrowField::new(
8161 "nested",
8162 DataType::Struct(vec![ArrowField::new("value", DataType::Int32, false)].into()),
8163 false,
8164 ),
8165 ]));
8166
8167 let batch = RecordBatch::try_new(
8168 schema.clone(),
8169 vec![Arc::new(id_array), Arc::new(nested_struct)],
8170 )
8171 .unwrap();
8172
8173 let test_dir = TempStrDir::default();
8174 let test_uri = &test_dir;
8175 let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone());
8176
8177 let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
8178
8179 let mut scanner = dataset.scan();
8181 scanner
8182 .order_by(Some(vec![ColumnOrdering {
8183 column_name: "nested.value".to_string(),
8184 ascending: true,
8185 nulls_first: true,
8186 }]))
8187 .unwrap(); let stream = scanner.try_into_stream().await.unwrap();
8190 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
8191
8192 let sorted_ids = batches[0].column(0).as_primitive::<Int32Type>().values();
8194 assert_eq!(sorted_ids[0], 1); assert_eq!(sorted_ids[1], 2); assert_eq!(sorted_ids[2], 3); }
8198
8199 #[tokio::test]
8200 async fn test_limit_with_ordering_not_pushed_down() {
8201 let id_array = Int32Array::from(vec![5, 2, 8, 1, 3, 7, 4, 6]);
8207 let value_array = Int32Array::from(vec![50, 20, 80, 10, 30, 70, 40, 60]);
8208
8209 let schema = Arc::new(ArrowSchema::new(vec![
8210 ArrowField::new("id", DataType::Int32, false),
8211 ArrowField::new("value", DataType::Int32, false),
8212 ]));
8213
8214 let batch = RecordBatch::try_new(
8215 schema.clone(),
8216 vec![Arc::new(id_array), Arc::new(value_array)],
8217 )
8218 .unwrap();
8219
8220 let test_dir = TempStrDir::default();
8221 let test_uri = &test_dir;
8222 let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone());
8223
8224 let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
8225
8226 let mut scanner = dataset.scan();
8228 scanner
8229 .order_by(Some(vec![ColumnOrdering {
8230 column_name: "value".to_string(),
8231 ascending: true,
8232 nulls_first: true,
8233 }]))
8234 .unwrap();
8235 scanner.limit(Some(3), None).unwrap();
8236
8237 let stream = scanner.try_into_stream().await.unwrap();
8238 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
8239
8240 let sorted_ids = batches[0].column(0).as_primitive::<Int32Type>().values();
8242 let sorted_values = batches[0].column(1).as_primitive::<Int32Type>().values();
8243 assert_eq!(batches[0].num_rows(), 3);
8244 assert_eq!(sorted_ids[0], 1); assert_eq!(sorted_ids[1], 2); assert_eq!(sorted_ids[2], 3); assert_eq!(sorted_values[0], 10);
8248 assert_eq!(sorted_values[1], 20);
8249 assert_eq!(sorted_values[2], 30);
8250
8251 let mut scanner = dataset.scan();
8253 scanner
8254 .order_by(Some(vec![ColumnOrdering {
8255 column_name: "value".to_string(),
8256 ascending: true,
8257 nulls_first: true,
8258 }]))
8259 .unwrap();
8260 scanner.limit(Some(3), Some(2)).unwrap(); let stream = scanner.try_into_stream().await.unwrap();
8263 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
8264
8265 let sorted_ids = batches[0].column(0).as_primitive::<Int32Type>().values();
8266 let sorted_values = batches[0].column(1).as_primitive::<Int32Type>().values();
8267 assert_eq!(batches[0].num_rows(), 3);
8268 assert_eq!(sorted_ids[0], 3); assert_eq!(sorted_ids[1], 4); assert_eq!(sorted_ids[2], 5); assert_eq!(sorted_values[0], 30);
8272 assert_eq!(sorted_values[1], 40);
8273 assert_eq!(sorted_values[2], 50);
8274
8275 let mut scanner = dataset.scan();
8277 scanner.limit(Some(3), None).unwrap();
8278
8279 let stream = scanner.try_into_stream().await.unwrap();
8280 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
8281
8282 assert_eq!(batches[0].num_rows(), 3);
8284 let unsorted_values = batches[0].column(1).as_primitive::<Int32Type>().values();
8285 assert_eq!(unsorted_values[0], 50);
8287 assert_eq!(unsorted_values[1], 20);
8288 assert_eq!(unsorted_values[2], 80);
8289 }
8290
8291 #[tokio::test]
8292 async fn test_scan_with_version_columns() {
8293 use arrow_array::{Int32Array, RecordBatch, RecordBatchIterator};
8294 use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
8295
8296 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
8298 "id",
8299 DataType::Int32,
8300 false,
8301 )]));
8302
8303 let batch = RecordBatch::try_new(
8304 schema.clone(),
8305 vec![Arc::new(Int32Array::from(vec![1, 2, 3]))],
8306 )
8307 .unwrap();
8308
8309 let test_dir = lance_core::utils::tempfile::TempStrDir::default();
8310 let test_uri = test_dir.as_str();
8311
8312 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema);
8313 let write_params = WriteParams {
8314 enable_stable_row_ids: true,
8315 ..Default::default()
8316 };
8317 Dataset::write(reader, test_uri, Some(write_params))
8318 .await
8319 .unwrap();
8320
8321 let dataset = Dataset::open(test_uri).await.unwrap();
8322 let mut scanner = dataset.scan();
8323
8324 scanner
8325 .project(&[ROW_CREATED_AT_VERSION, ROW_LAST_UPDATED_AT_VERSION])
8326 .unwrap();
8327
8328 let output_schema = scanner.schema().await.unwrap();
8330 assert!(
8331 output_schema
8332 .column_with_name("_row_last_updated_at_version")
8333 .is_some(),
8334 "Schema should include _row_last_updated_at_version"
8335 );
8336 assert!(
8337 output_schema
8338 .column_with_name("_row_created_at_version")
8339 .is_some(),
8340 "Schema should include _row_created_at_version"
8341 );
8342
8343 let batches = scanner
8345 .try_into_stream()
8346 .await
8347 .unwrap()
8348 .try_collect::<Vec<_>>()
8349 .await
8350 .unwrap();
8351
8352 assert_eq!(batches.len(), 1);
8353 let batch = &batches[0];
8354
8355 let last_updated = batch
8357 .column_by_name("_row_last_updated_at_version")
8358 .expect("Should have _row_last_updated_at_version column");
8359 let created_at = batch
8360 .column_by_name("_row_created_at_version")
8361 .expect("Should have _row_created_at_version column");
8362
8363 let last_updated_array = last_updated
8365 .as_any()
8366 .downcast_ref::<arrow_array::UInt64Array>()
8367 .unwrap();
8368 let created_at_array = created_at
8369 .as_any()
8370 .downcast_ref::<arrow_array::UInt64Array>()
8371 .unwrap();
8372
8373 for i in 0..batch.num_rows() {
8374 assert_eq!(
8375 last_updated_array.value(i),
8376 1,
8377 "All rows last updated at version 1"
8378 );
8379 assert_eq!(
8380 created_at_array.value(i),
8381 1,
8382 "All rows created at version 1"
8383 );
8384 }
8385 }
8386}