1use std::ops::Range;
5use std::pin::Pin;
6use std::sync::{Arc, LazyLock};
7use std::task::{Context, Poll};
8
9use arrow::array::AsArray;
10use arrow_array::{Array, Float32Array, Int64Array, RecordBatch};
11use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema, SchemaRef, SortOptions};
12use arrow_select::concat::concat_batches;
13use async_recursion::async_recursion;
14use datafusion::common::{exec_datafusion_err, DFSchema, NullEquality, SchemaExt};
15use datafusion::functions_aggregate;
16use datafusion::functions_aggregate::count::count_udaf;
17use datafusion::logical_expr::{col, lit, Expr, ScalarUDF};
18use datafusion::physical_expr::PhysicalSortExpr;
19use datafusion::physical_plan::coalesce_batches::CoalesceBatchesExec;
20use datafusion::physical_plan::expressions;
21use datafusion::physical_plan::projection::ProjectionExec as DFProjectionExec;
22use datafusion::physical_plan::sorts::sort::SortExec;
23use datafusion::physical_plan::{
24 aggregates::{AggregateExec, AggregateMode, PhysicalGroupBy},
25 display::DisplayableExecutionPlan,
26 expressions::Literal,
27 limit::GlobalLimitExec,
28 repartition::RepartitionExec,
29 union::UnionExec,
30 ExecutionPlan, SendableRecordBatchStream,
31};
32use datafusion::scalar::ScalarValue;
33use datafusion_expr::execution_props::ExecutionProps;
34use datafusion_expr::ExprSchemable;
35use datafusion_functions::core::getfield::GetFieldFunc;
36use datafusion_physical_expr::{aggregate::AggregateExprBuilder, expressions::Column};
37use datafusion_physical_expr::{create_physical_expr, LexOrdering, Partitioning, PhysicalExpr};
38use datafusion_physical_plan::stream::RecordBatchStreamAdapter;
39use datafusion_physical_plan::{empty::EmptyExec, joins::HashJoinExec};
40use futures::future::BoxFuture;
41use futures::stream::{Stream, StreamExt};
42use futures::{FutureExt, TryStreamExt};
43use lance_arrow::floats::{coerce_float_vector, FloatType};
44use lance_arrow::DataTypeExt;
45use lance_core::datatypes::{
46 escape_field_path_for_project, format_field_path, Field, OnMissing, Projection,
47};
48use lance_core::error::LanceOptionExt;
49use lance_core::utils::address::RowAddress;
50use lance_core::utils::mask::{RowIdMask, RowIdTreeMap};
51use lance_core::utils::tokio::get_num_compute_intensive_cpus;
52use lance_core::{ROW_ADDR, ROW_ID, ROW_OFFSET};
53use lance_datafusion::exec::{
54 analyze_plan, execute_plan, LanceExecutionOptions, OneShotExec, StrictBatchSizeExec,
55};
56use lance_datafusion::expr::safe_coerce_scalar;
57use lance_datafusion::projection::ProjectionPlan;
58use lance_file::v2::reader::FileReaderOptions;
59use lance_index::scalar::expression::{IndexExprResult, PlannerIndexExt, INDEX_EXPR_RESULT_SCHEMA};
60use lance_index::scalar::inverted::query::{
61 fill_fts_query_column, FtsQuery, FtsSearchParams, MatchQuery,
62};
63use lance_index::scalar::inverted::SCORE_COL;
64use lance_index::scalar::FullTextSearchQuery;
65use lance_index::vector::{Query, DIST_COL};
66use lance_index::ScalarIndexCriteria;
67use lance_index::{metrics::NoOpMetricsCollector, scalar::inverted::FTS_SCHEMA};
68use lance_index::{scalar::expression::ScalarIndexExpr, DatasetIndexExt};
69use lance_io::stream::RecordBatchStream;
70use lance_linalg::distance::MetricType;
71use lance_table::format::{Fragment, IndexMetadata};
72use roaring::RoaringBitmap;
73use tracing::{info_span, instrument, Span};
74
75use super::Dataset;
76use crate::dataset::row_offsets_to_row_addresses;
77use crate::dataset::utils::wrap_json_stream_for_reading;
78use crate::index::vector::utils::{get_vector_dim, get_vector_type};
79use crate::index::DatasetIndexInternalExt;
80use crate::io::exec::filtered_read::{FilteredReadExec, FilteredReadOptions};
81use crate::io::exec::fts::{BoostQueryExec, FlatMatchQueryExec, MatchQueryExec, PhraseQueryExec};
82use crate::io::exec::knn::MultivectorScoringExec;
83use crate::io::exec::scalar_index::{MaterializeIndexExec, ScalarIndexExec};
84use crate::io::exec::{get_physical_optimizer, AddRowOffsetExec, LanceFilterExec, LanceScanConfig};
85use crate::io::exec::{
86 knn::new_knn_exec, project, AddRowAddrExec, FilterPlan, KNNVectorDistanceExec,
87 LancePushdownScanExec, LanceScanExec, Planner, PreFilterSource, ScanConfig, TakeExec,
88};
89use crate::{datatypes::Schema, io::exec::fts::BooleanQueryExec};
90use crate::{Error, Result};
91
92use snafu::location;
93
94pub use lance_datafusion::exec::{ExecutionStatsCallback, ExecutionSummaryCounts};
95#[cfg(feature = "substrait")]
96use lance_datafusion::substrait::parse_substrait;
97
98pub(crate) const BATCH_SIZE_FALLBACK: usize = 8192;
99pub fn get_default_batch_size() -> Option<usize> {
102 std::env::var("LANCE_DEFAULT_BATCH_SIZE")
103 .map(|val| Some(val.parse().unwrap()))
104 .unwrap_or(None)
105}
106
107pub const LEGACY_DEFAULT_FRAGMENT_READAHEAD: usize = 4;
108
109pub static DEFAULT_FRAGMENT_READAHEAD: LazyLock<Option<usize>> = LazyLock::new(|| {
110 std::env::var("LANCE_DEFAULT_FRAGMENT_READAHEAD")
111 .map(|val| Some(val.parse().unwrap()))
112 .unwrap_or(None)
113});
114
115pub static DEFAULT_XTR_OVERFETCH: LazyLock<u32> = LazyLock::new(|| {
116 std::env::var("LANCE_XTR_OVERFETCH")
117 .map(|val| val.parse().unwrap())
118 .unwrap_or(10)
119});
120
121pub static DEFAULT_IO_BUFFER_SIZE: LazyLock<u64> = LazyLock::new(|| {
125 std::env::var("LANCE_DEFAULT_IO_BUFFER_SIZE")
126 .map(|val| val.parse().unwrap())
127 .unwrap_or(2 * 1024 * 1024 * 1024)
128});
129
130#[derive(Debug, Clone)]
135pub struct ColumnOrdering {
136 pub ascending: bool,
137 pub nulls_first: bool,
138 pub column_name: String,
139}
140
141impl ColumnOrdering {
142 pub fn asc_nulls_first(column_name: String) -> Self {
143 Self {
144 ascending: true,
145 nulls_first: true,
146 column_name,
147 }
148 }
149
150 pub fn asc_nulls_last(column_name: String) -> Self {
151 Self {
152 ascending: true,
153 nulls_first: false,
154 column_name,
155 }
156 }
157
158 pub fn desc_nulls_first(column_name: String) -> Self {
159 Self {
160 ascending: false,
161 nulls_first: true,
162 column_name,
163 }
164 }
165
166 pub fn desc_nulls_last(column_name: String) -> Self {
167 Self {
168 ascending: false,
169 nulls_first: false,
170 column_name,
171 }
172 }
173}
174
175#[derive(Clone)]
189pub enum MaterializationStyle {
190 Heuristic,
202 AllLate,
204 AllEarly,
206 AllEarlyExcept(Vec<u32>),
208}
209
210impl MaterializationStyle {
211 pub fn all_early_except(columns: &[impl AsRef<str>], schema: &Schema) -> Result<Self> {
212 let field_ids = schema
213 .project(columns)?
214 .field_ids()
215 .into_iter()
216 .map(|id| id as u32)
217 .collect();
218 Ok(Self::AllEarlyExcept(field_ids))
219 }
220}
221
222#[derive(Debug)]
223struct PlannedFilteredScan {
224 plan: Arc<dyn ExecutionPlan>,
225 limit_pushed_down: bool,
226 filter_pushed_down: bool,
227}
228
229#[derive(Debug, Clone)]
231pub enum LanceFilter {
232 Sql(String),
234 Substrait(Vec<u8>),
236 Datafusion(Expr),
238}
239
240impl LanceFilter {
241 #[allow(unused)]
250 #[instrument(level = "trace", name = "filter_to_df", skip_all)]
251 pub fn to_datafusion(&self, dataset_schema: &Schema, full_schema: &Schema) -> Result<Expr> {
252 match self {
253 Self::Sql(sql) => {
254 let schema = Arc::new(ArrowSchema::from(full_schema));
255 let planner = Planner::new(schema.clone());
256 let filter = planner.parse_filter(sql)?;
257
258 let df_schema = DFSchema::try_from(schema)?;
259 let (ret_type, _) = filter.data_type_and_nullable(&df_schema)?;
260 if ret_type != DataType::Boolean {
261 return Err(Error::InvalidInput {
262 source: format!("The filter {} does not return a boolean", filter).into(),
263 location: location!(),
264 });
265 }
266
267 planner.optimize_expr(filter).map_err(|e| {
268 Error::invalid_input(
269 format!("Error optimizing sql filter: {sql} ({e})"),
270 location!(),
271 )
272 })
273 }
274 #[cfg(feature = "substrait")]
275 Self::Substrait(expr) => {
276 use lance_datafusion::exec::{get_session_context, LanceExecutionOptions};
277
278 let ctx = get_session_context(&LanceExecutionOptions::default());
279 let state = ctx.state();
280 let schema = Arc::new(ArrowSchema::from(dataset_schema));
281 let expr = parse_substrait(expr, schema.clone(), &ctx.state())
282 .now_or_never()
283 .expect("could not parse the Substrait filter in a synchronous fashion")?;
284 let planner = Planner::new(schema);
285 planner.optimize_expr(expr.clone()).map_err(|e| {
286 Error::invalid_input(
287 format!("Error optimizing substrait filter: {expr:?} ({e})"),
288 location!(),
289 )
290 })
291 }
292 #[cfg(not(feature = "substrait"))]
293 Self::Substrait(_) => {
294 panic!("Substrait filter is not supported in this build");
295 }
296 Self::Datafusion(expr) => Ok(expr.clone()),
297 }
298 }
299}
300
301#[derive(Clone)]
315pub struct Scanner {
316 dataset: Arc<Dataset>,
317
318 projection_plan: ProjectionPlan,
325
326 prefilter: bool,
328
329 materialization_style: MaterializationStyle,
331
332 filter: Option<LanceFilter>,
334
335 full_text_query: Option<FullTextSearchQuery>,
337
338 batch_size: Option<usize>,
340
341 batch_readahead: usize,
343
344 fragment_readahead: Option<usize>,
346
347 io_buffer_size: Option<u64>,
349
350 limit: Option<i64>,
351 offset: Option<i64>,
352
353 ordering: Option<Vec<ColumnOrdering>>,
362
363 nearest: Option<Query>,
364
365 use_scalar_index: bool,
371
372 use_stats: bool,
376
377 ordered: bool,
381
382 fragments: Option<Vec<Fragment>>,
384
385 fast_search: bool,
392
393 include_deleted_rows: bool,
395
396 scan_stats_callback: Option<ExecutionStatsCallback>,
398
399 strict_batch_size: bool,
404
405 file_reader_options: Option<FileReaderOptions>,
407
408 legacy_with_row_id: bool,
429 legacy_with_row_addr: bool,
431 explicit_projection: bool,
434 autoproject_scoring_columns: bool,
436}
437
438#[derive(Debug, Clone)]
440pub enum TakeOperation {
441 RowIds(Vec<u64>),
443 RowAddrs(Vec<u64>),
445 RowOffsets(Vec<u64>),
450}
451
452impl TakeOperation {
453 fn extract_u64_list(list: &[Expr]) -> Option<Vec<u64>> {
454 let mut u64s = Vec::with_capacity(list.len());
455 for expr in list {
456 if let Expr::Literal(lit, _) = expr {
457 if let Some(ScalarValue::UInt64(Some(val))) =
458 safe_coerce_scalar(lit, &DataType::UInt64)
459 {
460 u64s.push(val);
461 } else {
462 return None;
463 }
464 } else {
465 return None;
466 }
467 }
468 Some(u64s)
469 }
470
471 fn merge(self, other: Self) -> Option<Self> {
472 match (self, other) {
473 (Self::RowIds(mut left), Self::RowIds(right)) => {
474 left.extend(right);
475 Some(Self::RowIds(left))
476 }
477 (Self::RowAddrs(mut left), Self::RowAddrs(right)) => {
478 left.extend(right);
479 Some(Self::RowAddrs(left))
480 }
481 (Self::RowOffsets(mut left), Self::RowOffsets(right)) => {
482 left.extend(right);
483 Some(Self::RowOffsets(left))
484 }
485 _ => None,
486 }
487 }
488
489 fn try_from_expr(expr: &Expr) -> Option<(Self, Option<Expr>)> {
507 if let Expr::BinaryExpr(binary) = expr {
508 match binary.op {
509 datafusion_expr::Operator::And => {
510 let left_take = Self::try_from_expr(&binary.left);
511 let right_take = Self::try_from_expr(&binary.right);
512 match (left_take, right_take) {
513 (Some(_), Some(_)) => {
514 return None;
520 }
521 (Some((left_op, left_rem)), None) => {
522 let remainder = match left_rem {
523 Some(expr) => Expr::and(expr, binary.right.as_ref().clone()),
527 None => binary.right.as_ref().clone(),
528 };
529 return Some((left_op, Some(remainder)));
530 }
531 (None, Some((right_op, right_rem))) => {
532 let remainder = match right_rem {
533 Some(expr) => Expr::and(expr, binary.left.as_ref().clone()),
534 None => binary.left.as_ref().clone(),
535 };
536 return Some((right_op, Some(remainder)));
537 }
538 (None, None) => {
539 return None;
540 }
541 }
542 }
543 datafusion_expr::Operator::Eq => {
544 if let (Expr::Column(col), Expr::Literal(lit, _)) =
546 (binary.left.as_ref(), binary.right.as_ref())
547 {
548 if let Some(ScalarValue::UInt64(Some(val))) =
549 safe_coerce_scalar(lit, &DataType::UInt64)
550 {
551 if col.name == ROW_ID {
552 return Some((Self::RowIds(vec![val]), None));
553 } else if col.name == ROW_ADDR {
554 return Some((Self::RowAddrs(vec![val]), None));
555 } else if col.name == ROW_OFFSET {
556 return Some((Self::RowOffsets(vec![val]), None));
557 }
558 }
559 }
560 }
561 datafusion_expr::Operator::Or => {
562 let left_take = Self::try_from_expr(&binary.left);
563 let right_take = Self::try_from_expr(&binary.right);
564 if let (Some(left), Some(right)) = (left_take, right_take) {
565 if left.1.is_some() || right.1.is_some() {
566 return None;
573 }
574 return left.0.merge(right.0).map(|op| (op, None));
575 }
576 }
577 _ => {}
578 }
579 } else if let Expr::InList(in_expr) = expr {
580 if let Expr::Column(col) = in_expr.expr.as_ref() {
581 if let Some(u64s) = Self::extract_u64_list(&in_expr.list) {
582 if col.name == ROW_ID {
583 return Some((Self::RowIds(u64s), None));
584 } else if col.name == ROW_ADDR {
585 return Some((Self::RowAddrs(u64s), None));
586 } else if col.name == ROW_OFFSET {
587 return Some((Self::RowOffsets(u64s), None));
588 }
589 }
590 }
591 }
592 None
593 }
594}
595
596impl Scanner {
597 pub fn new(dataset: Arc<Dataset>) -> Self {
598 let projection_plan = ProjectionPlan::full(dataset.clone()).unwrap();
599 let file_reader_options = dataset.file_reader_options.clone();
600 Self {
601 dataset,
602 projection_plan,
603 prefilter: false,
604 materialization_style: MaterializationStyle::Heuristic,
605 filter: None,
606 full_text_query: None,
607 batch_size: None,
608 batch_readahead: get_num_compute_intensive_cpus(),
609 fragment_readahead: None,
610 io_buffer_size: None,
611 limit: None,
612 offset: None,
613 ordering: None,
614 nearest: None,
615 use_stats: true,
616 ordered: true,
617 fragments: None,
618 fast_search: false,
619 use_scalar_index: true,
620 include_deleted_rows: false,
621 scan_stats_callback: None,
622 strict_batch_size: false,
623 file_reader_options,
624 legacy_with_row_addr: false,
625 legacy_with_row_id: false,
626 explicit_projection: false,
627 autoproject_scoring_columns: true,
628 }
629 }
630
631 pub fn from_fragment(dataset: Arc<Dataset>, fragment: Fragment) -> Self {
632 Self {
633 fragments: Some(vec![fragment]),
634 ..Self::new(dataset)
635 }
636 }
637
638 pub fn with_fragments(&mut self, fragments: Vec<Fragment>) -> &mut Self {
642 self.fragments = Some(fragments);
643 self
644 }
645
646 fn get_batch_size(&self) -> usize {
647 get_default_batch_size().unwrap_or_else(|| {
653 self.batch_size.unwrap_or_else(|| {
654 std::cmp::max(
655 self.dataset.object_store().block_size() / 4,
656 BATCH_SIZE_FALLBACK,
657 )
658 })
659 })
660 }
661
662 fn ensure_not_fragment_scan(&self) -> Result<()> {
663 if self.is_fragment_scan() {
664 Err(Error::io(
665 "This operation is not supported for fragment scan".to_string(),
666 location!(),
667 ))
668 } else {
669 Ok(())
670 }
671 }
672
673 fn is_fragment_scan(&self) -> bool {
674 self.fragments.is_some()
675 }
676
677 pub fn empty_project(&mut self) -> Result<&mut Self> {
681 self.project(&[] as &[&str])
682 }
683
684 pub fn project<T: AsRef<str>>(&mut self, columns: &[T]) -> Result<&mut Self> {
688 let transformed_columns: Vec<(&str, String)> = columns
689 .iter()
690 .map(|c| (c.as_ref(), escape_field_path_for_project(c.as_ref())))
691 .collect();
692
693 self.project_with_transform(&transformed_columns)
694 }
695
696 pub fn project_with_transform(
700 &mut self,
701 columns: &[(impl AsRef<str>, impl AsRef<str>)],
702 ) -> Result<&mut Self> {
703 self.explicit_projection = true;
704 self.projection_plan = ProjectionPlan::from_expressions(self.dataset.clone(), columns)?;
705 if self.legacy_with_row_id {
706 self.projection_plan.include_row_id();
707 }
708 if self.legacy_with_row_addr {
709 self.projection_plan.include_row_addr();
710 }
711 Ok(self)
712 }
713
714 pub fn prefilter(&mut self, should_prefilter: bool) -> &mut Self {
723 self.prefilter = should_prefilter;
724 self
725 }
726
727 pub fn scan_stats_callback(&mut self, callback: ExecutionStatsCallback) -> &mut Self {
729 self.scan_stats_callback = Some(callback);
730 self
731 }
732
733 pub fn materialization_style(&mut self, style: MaterializationStyle) -> &mut Self {
745 self.materialization_style = style;
746 self
747 }
748
749 pub fn filter(&mut self, filter: &str) -> Result<&mut Self> {
765 self.filter = Some(LanceFilter::Sql(filter.to_string()));
766 Ok(self)
767 }
768
769 pub fn full_text_search(&mut self, query: FullTextSearchQuery) -> Result<&mut Self> {
783 let fields = query.columns();
784 if !fields.is_empty() {
785 for field in fields.iter() {
786 if self.dataset.schema().field(field).is_none() {
787 return Err(Error::invalid_input(
788 format!("Column {} not found", field),
789 location!(),
790 ));
791 }
792 }
793 }
794
795 self.full_text_query = Some(query);
796 Ok(self)
797 }
798
799 pub fn filter_substrait(&mut self, filter: &[u8]) -> Result<&mut Self> {
804 self.filter = Some(LanceFilter::Substrait(filter.to_vec()));
805 Ok(self)
806 }
807
808 pub fn filter_expr(&mut self, filter: Expr) -> &mut Self {
809 self.filter = Some(LanceFilter::Datafusion(filter));
810 self
811 }
812
813 pub fn batch_size(&mut self, batch_size: usize) -> &mut Self {
815 self.batch_size = Some(batch_size);
816 self
817 }
818
819 pub fn include_deleted_rows(&mut self) -> &mut Self {
830 self.include_deleted_rows = true;
831 self
832 }
833
834 pub fn io_buffer_size(&mut self, size: u64) -> &mut Self {
853 self.io_buffer_size = Some(size);
854 self
855 }
856
857 pub fn batch_readahead(&mut self, nbatches: usize) -> &mut Self {
860 self.batch_readahead = nbatches;
861 self
862 }
863
864 pub fn fragment_readahead(&mut self, nfragments: usize) -> &mut Self {
868 self.fragment_readahead = Some(nfragments);
869 self
870 }
871
872 pub fn scan_in_order(&mut self, ordered: bool) -> &mut Self {
886 self.ordered = ordered;
887 self
888 }
889
890 pub fn use_scalar_index(&mut self, use_scalar_index: bool) -> &mut Self {
896 self.use_scalar_index = use_scalar_index;
897 self
898 }
899
900 pub fn strict_batch_size(&mut self, strict_batch_size: bool) -> &mut Self {
907 self.strict_batch_size = strict_batch_size;
908 self
909 }
910
911 pub fn limit(&mut self, limit: Option<i64>, offset: Option<i64>) -> Result<&mut Self> {
918 if limit.unwrap_or_default() < 0 {
919 return Err(Error::invalid_input(
920 "Limit must be non-negative".to_string(),
921 location!(),
922 ));
923 }
924 if let Some(off) = offset {
925 if off < 0 {
926 return Err(Error::invalid_input(
927 "Offset must be non-negative".to_string(),
928 location!(),
929 ));
930 }
931 }
932 self.limit = limit;
933 self.offset = offset;
934 Ok(self)
935 }
936
937 pub fn nearest(&mut self, column: &str, q: &dyn Array, k: usize) -> Result<&mut Self> {
941 if !self.prefilter {
942 self.ensure_not_fragment_scan()?;
945 }
946
947 if k == 0 {
948 return Err(Error::invalid_input(
949 "k must be positive".to_string(),
950 location!(),
951 ));
952 }
953 if q.is_empty() {
954 return Err(Error::invalid_input(
955 "Query vector must have non-zero length".to_string(),
956 location!(),
957 ));
958 }
959 let (vector_type, element_type) = get_vector_type(self.dataset.schema(), column)?;
961 let dim = get_vector_dim(self.dataset.schema(), column)?;
962
963 let q = match q.data_type() {
964 DataType::List(_) | DataType::FixedSizeList(_, _) => {
965 if !matches!(vector_type, DataType::List(_)) {
966 return Err(Error::invalid_input(
967 format!(
968 "Query is multivector but column {}({})is not multivector",
969 column, vector_type,
970 ),
971 location!(),
972 ));
973 }
974
975 if let Some(list_array) = q.as_list_opt::<i32>() {
976 for i in 0..list_array.len() {
977 let vec = list_array.value(i);
978 if vec.len() != dim {
979 return Err(Error::invalid_input(
980 format!(
981 "query dim({}) doesn't match the column {} vector dim({})",
982 vec.len(),
983 column,
984 dim,
985 ),
986 location!(),
987 ));
988 }
989 }
990 list_array.values().clone()
991 } else {
992 let fsl = q.as_fixed_size_list();
993 if fsl.value_length() as usize != dim {
994 return Err(Error::invalid_input(
995 format!(
996 "query dim({}) doesn't match the column {} vector dim({})",
997 fsl.value_length(),
998 column,
999 dim,
1000 ),
1001 location!(),
1002 ));
1003 }
1004 fsl.values().clone()
1005 }
1006 }
1007 _ => {
1008 if q.len() != dim {
1009 return Err(Error::invalid_input(
1010 format!(
1011 "query dim({}) doesn't match the column {} vector dim({})",
1012 q.len(),
1013 column,
1014 dim,
1015 ),
1016 location!(),
1017 ));
1018 }
1019 q.slice(0, q.len())
1020 }
1021 };
1022
1023 let key = match element_type {
1024 dt if dt == *q.data_type() => q,
1025 dt if dt.is_floating() => coerce_float_vector(
1026 q.as_any().downcast_ref::<Float32Array>().unwrap(),
1027 FloatType::try_from(&dt)?,
1028 )?,
1029 _ => {
1030 return Err(Error::invalid_input(
1031 format!(
1032 "Column {} has element type {} and the query vector is {}",
1033 column,
1034 element_type,
1035 q.data_type(),
1036 ),
1037 location!(),
1038 ));
1039 }
1040 };
1041
1042 self.nearest = Some(Query {
1043 column: column.to_string(),
1044 key,
1045 k,
1046 lower_bound: None,
1047 upper_bound: None,
1048 minimum_nprobes: 20,
1049 maximum_nprobes: None,
1050 ef: None,
1051 refine_factor: None,
1052 metric_type: MetricType::L2,
1053 use_index: true,
1054 dist_q_c: 0.0,
1055 });
1056 Ok(self)
1057 }
1058
1059 #[cfg(test)]
1060 fn nearest_mut(&mut self) -> Option<&mut Query> {
1061 self.nearest.as_mut()
1062 }
1063
1064 pub fn distance_range(
1066 &mut self,
1067 lower_bound: Option<f32>,
1068 upper_bound: Option<f32>,
1069 ) -> &mut Self {
1070 if let Some(q) = self.nearest.as_mut() {
1071 q.lower_bound = lower_bound;
1072 q.upper_bound = upper_bound;
1073 }
1074 self
1075 }
1076
1077 pub fn nprobs(&mut self, n: usize) -> &mut Self {
1082 if let Some(q) = self.nearest.as_mut() {
1083 q.minimum_nprobes = n;
1084 q.maximum_nprobes = Some(n);
1085 } else {
1086 log::warn!("nprobes is not set because nearest has not been called yet");
1087 }
1088 self
1089 }
1090
1091 pub fn minimum_nprobes(&mut self, n: usize) -> &mut Self {
1097 if let Some(q) = self.nearest.as_mut() {
1098 q.minimum_nprobes = n;
1099 } else {
1100 log::warn!("minimum_nprobes is not set because nearest has not been called yet");
1101 }
1102 self
1103 }
1104
1105 pub fn maximum_nprobes(&mut self, n: usize) -> &mut Self {
1117 if let Some(q) = self.nearest.as_mut() {
1118 q.maximum_nprobes = Some(n);
1119 } else {
1120 log::warn!("maximum_nprobes is not set because nearest has not been called yet");
1121 }
1122 self
1123 }
1124
1125 pub fn ef(&mut self, ef: usize) -> &mut Self {
1126 if let Some(q) = self.nearest.as_mut() {
1127 q.ef = Some(ef);
1128 }
1129 self
1130 }
1131
1132 pub fn fast_search(&mut self) -> &mut Self {
1138 if let Some(q) = self.nearest.as_mut() {
1139 q.use_index = true;
1140 }
1141 self.fast_search = true;
1142 self.projection_plan.include_row_id(); self
1144 }
1145
1146 pub fn refine(&mut self, factor: u32) -> &mut Self {
1156 if let Some(q) = self.nearest.as_mut() {
1157 q.refine_factor = Some(factor)
1158 };
1159 self
1160 }
1161
1162 pub fn distance_metric(&mut self, metric_type: MetricType) -> &mut Self {
1164 if let Some(q) = self.nearest.as_mut() {
1165 q.metric_type = metric_type
1166 }
1167 self
1168 }
1169
1170 pub fn order_by(&mut self, ordering: Option<Vec<ColumnOrdering>>) -> Result<&mut Self> {
1176 if let Some(ordering) = &ordering {
1177 if ordering.is_empty() {
1178 self.ordering = None;
1179 return Ok(self);
1180 }
1181 for column in ordering {
1183 self.dataset
1184 .schema()
1185 .field(&column.column_name)
1186 .ok_or(Error::invalid_input(
1187 format!("Column {} not found", &column.column_name),
1188 location!(),
1189 ))?;
1190 }
1191 }
1192 self.ordering = ordering;
1193 Ok(self)
1194 }
1195
1196 pub fn use_index(&mut self, use_index: bool) -> &mut Self {
1198 if let Some(q) = self.nearest.as_mut() {
1199 q.use_index = use_index
1200 }
1201 self
1202 }
1203
1204 pub fn with_row_id(&mut self) -> &mut Self {
1206 self.legacy_with_row_id = true;
1207 self.projection_plan.include_row_id();
1208 self
1209 }
1210
1211 pub fn with_row_address(&mut self) -> &mut Self {
1213 self.legacy_with_row_addr = true;
1214 self.projection_plan.include_row_addr();
1215 self
1216 }
1217
1218 pub fn disable_scoring_autoprojection(&mut self) -> &mut Self {
1234 self.autoproject_scoring_columns = false;
1235 self
1236 }
1237
1238 pub fn with_file_reader_options(&mut self, options: FileReaderOptions) -> &mut Self {
1240 self.file_reader_options = Some(options);
1241 self
1242 }
1243
1244 fn create_column_expr(
1246 column_name: &str,
1247 dataset: &Dataset,
1248 arrow_schema: &ArrowSchema,
1249 ) -> Result<Arc<dyn PhysicalExpr>> {
1250 let lance_schema = dataset.schema();
1251 let field_path = lance_schema.resolve(column_name).ok_or_else(|| {
1252 Error::invalid_input(
1253 format!("Field '{}' not found in schema", column_name),
1254 location!(),
1255 )
1256 })?;
1257
1258 if field_path.len() == 1 {
1259 expressions::col(&field_path[0].name, arrow_schema).map_err(|e| Error::Internal {
1261 message: format!(
1262 "Failed to create column expression for '{}': {}",
1263 column_name, e
1264 ),
1265 location: location!(),
1266 })
1267 } else {
1268 let get_field_func = ScalarUDF::from(GetFieldFunc::default());
1270
1271 let mut expr = col(&field_path[0].name);
1272 for nested_field in &field_path[1..] {
1273 expr = get_field_func.call(vec![expr, lit(&nested_field.name)]);
1274 }
1275
1276 let df_schema = Arc::new(DFSchema::try_from(arrow_schema.clone())?);
1278 let execution_props = ExecutionProps::default();
1279 create_physical_expr(&expr, &df_schema, &execution_props).map_err(|e| Error::Internal {
1280 message: format!(
1281 "Failed to create physical expression for nested field '{}': {}",
1282 column_name, e
1283 ),
1284 location: location!(),
1285 })
1286 }
1287 }
1288
1289 pub fn use_stats(&mut self, use_stats: bool) -> &mut Self {
1293 self.use_stats = use_stats;
1294 self
1295 }
1296
1297 pub async fn schema(&self) -> Result<SchemaRef> {
1299 let plan = self.create_plan().await?;
1300 Ok(plan.schema())
1301 }
1302
1303 pub fn get_filter(&self) -> Result<Option<Expr>> {
1310 if let Some(filter) = &self.filter {
1311 let filter_schema = self.filterable_schema()?;
1312 Ok(Some(filter.to_datafusion(
1313 self.dataset.schema(),
1314 filter_schema.as_ref(),
1315 )?))
1316 } else {
1317 Ok(None)
1318 }
1319 }
1320
1321 fn add_extra_columns(&self, schema: Schema) -> Result<Schema> {
1322 let mut extra_columns = vec![ArrowField::new(ROW_OFFSET, DataType::UInt64, true)];
1323
1324 if self.nearest.as_ref().is_some() {
1325 extra_columns.push(ArrowField::new(DIST_COL, DataType::Float32, true));
1326 };
1327
1328 if self.full_text_query.is_some() {
1329 extra_columns.push(ArrowField::new(SCORE_COL, DataType::Float32, true));
1330 }
1331
1332 schema.merge(&ArrowSchema::new(extra_columns))
1333 }
1334
1335 fn filterable_schema(&self) -> Result<Arc<Schema>> {
1340 let base_schema = Projection::full(self.dataset.clone())
1341 .with_row_id()
1342 .with_row_addr()
1343 .to_schema();
1344
1345 Ok(Arc::new(self.add_extra_columns(base_schema)?))
1346 }
1347
1348 pub(crate) fn calculate_final_projection(
1353 &self,
1354 current_schema: &ArrowSchema,
1355 ) -> Result<Vec<(Arc<dyn PhysicalExpr>, String)>> {
1356 let mut output_expr = self.projection_plan.to_physical_exprs(current_schema)?;
1359
1360 if self.autoproject_scoring_columns {
1363 if self.nearest.is_some() && output_expr.iter().all(|(_, name)| name != DIST_COL) {
1364 if self.explicit_projection {
1365 log::warn!("Deprecation warning, this behavior will change in the future. This search specified output columns but did not include `_distance`. Currently the `_distance` column will be included. In the future it will not. Call `disable_scoring_autoprojection` to to adopt the future behavior and avoid this warning");
1366 }
1367 let vector_expr = expressions::col(DIST_COL, current_schema)?;
1368 output_expr.push((vector_expr, DIST_COL.to_string()));
1369 }
1370 if self.full_text_query.is_some()
1371 && output_expr.iter().all(|(_, name)| name != SCORE_COL)
1372 {
1373 if self.explicit_projection {
1374 log::warn!("Deprecation warning, this behavior will change in the future. This search specified output columns but did not include `_score`. Currently the `_score` column will be included. In the future it will not. Call `disable_scoring_autoprojection` to adopt the future behavior and avoid this warning");
1375 }
1376 let score_expr = expressions::col(SCORE_COL, current_schema)?;
1377 output_expr.push((score_expr, SCORE_COL.to_string()));
1378 }
1379 }
1380
1381 if self.legacy_with_row_id {
1382 let row_id_pos = output_expr
1383 .iter()
1384 .position(|(_, name)| name == ROW_ID)
1385 .ok_or_else(|| Error::Internal {
1386 message:
1387 "user specified with_row_id but the _rowid column was not in the output"
1388 .to_string(),
1389 location: location!(),
1390 })?;
1391 if row_id_pos != output_expr.len() - 1 {
1392 let row_id_expr = output_expr.remove(row_id_pos);
1394 output_expr.push(row_id_expr);
1395 }
1396 }
1397
1398 if self.legacy_with_row_addr {
1399 let row_addr_pos = output_expr.iter().position(|(_, name)| name == ROW_ADDR).ok_or_else(|| {
1400 Error::Internal {
1401 message: "user specified with_row_address but the _rowaddr column was not in the output".to_string(),
1402 location: location!(),
1403 }
1404 })?;
1405 if row_addr_pos != output_expr.len() - 1 {
1406 let row_addr_expr = output_expr.remove(row_addr_pos);
1408 output_expr.push(row_addr_expr);
1409 }
1410 }
1411
1412 Ok(output_expr)
1413 }
1414
1415 #[instrument(skip_all)]
1417 pub fn try_into_stream(&self) -> BoxFuture<'_, Result<DatasetRecordBatchStream>> {
1418 async move {
1420 let plan = self.create_plan().await?;
1421
1422 Ok(DatasetRecordBatchStream::new(execute_plan(
1423 plan,
1424 LanceExecutionOptions {
1425 batch_size: self.batch_size,
1426 execution_stats_callback: self.scan_stats_callback.clone(),
1427 ..Default::default()
1428 },
1429 )?))
1430 }
1431 .boxed()
1432 }
1433
1434 pub(crate) async fn try_into_dfstream(
1435 &self,
1436 mut options: LanceExecutionOptions,
1437 ) -> Result<SendableRecordBatchStream> {
1438 let plan = self.create_plan().await?;
1439
1440 if options.execution_stats_callback.is_none() {
1442 options.execution_stats_callback = self.scan_stats_callback.clone();
1443 }
1444
1445 execute_plan(plan, options)
1446 }
1447
1448 pub async fn try_into_batch(&self) -> Result<RecordBatch> {
1449 let stream = self.try_into_stream().await?;
1450 let schema = stream.schema();
1451 let batches = stream.try_collect::<Vec<_>>().await?;
1452 Ok(concat_batches(&schema, &batches)?)
1453 }
1454
1455 fn create_count_plan(&self) -> BoxFuture<'_, Result<Arc<dyn ExecutionPlan>>> {
1456 async move {
1458 if self.projection_plan.physical_projection.is_empty() {
1459 return Err(Error::invalid_input("count_rows called but with_row_id is false".to_string(), location!()));
1460 }
1461 if !self.projection_plan.physical_projection.is_metadata_only() {
1462 let physical_schema = self.projection_plan.physical_projection.to_schema();
1463 let columns: Vec<&str> = physical_schema.fields
1464 .iter()
1465 .map(|field| field.name.as_str())
1466 .collect();
1467
1468 let msg = format!(
1469 "count_rows should not be called on a plan selecting columns. selected columns: [{}]",
1470 columns.join(", ")
1471 );
1472
1473 return Err(Error::invalid_input(msg, location!()));
1474 }
1475
1476 if self.limit.is_some() || self.offset.is_some() {
1477 log::warn!(
1478 "count_rows called with limit or offset which could have surprising results"
1479 );
1480 }
1481
1482 let plan = self.create_plan().await?;
1483 let one = Arc::new(Literal::new(ScalarValue::UInt8(Some(1))));
1485
1486 let input_phy_exprs: &[Arc<dyn PhysicalExpr>] = &[one];
1487 let schema = plan.schema();
1488
1489 let mut builder = AggregateExprBuilder::new(count_udaf(), input_phy_exprs.to_vec());
1490 builder = builder.schema(schema);
1491 builder = builder.alias("count_rows".to_string());
1492
1493 let count_expr = builder.build()?;
1494
1495 let plan_schema = plan.schema();
1496 Ok(Arc::new(AggregateExec::try_new(
1497 AggregateMode::Single,
1498 PhysicalGroupBy::new_single(Vec::new()),
1499 vec![Arc::new(count_expr)],
1500 vec![None],
1501 plan,
1502 plan_schema,
1503 )?) as Arc<dyn ExecutionPlan>)
1504 }
1505 .boxed()
1506 }
1507
1508 #[instrument(skip_all)]
1513 pub fn count_rows(&self) -> BoxFuture<'_, Result<u64>> {
1514 async move {
1516 let count_plan = self.create_count_plan().await?;
1517 let mut stream = execute_plan(count_plan, LanceExecutionOptions::default())?;
1518
1519 if let Some(first_batch) = stream.next().await {
1521 let batch = first_batch?;
1522 let array = batch
1523 .column(0)
1524 .as_any()
1525 .downcast_ref::<Int64Array>()
1526 .ok_or(Error::io(
1527 "Count plan did not return a UInt64Array".to_string(),
1528 location!(),
1529 ))?;
1530 Ok(array.value(0) as u64)
1531 } else {
1532 Ok(0)
1533 }
1534 }
1535 .boxed()
1536 }
1537
1538 fn is_early_field(&self, field: &Field) -> bool {
1559 match self.materialization_style {
1560 MaterializationStyle::AllEarly => true,
1561 MaterializationStyle::AllLate => false,
1562 MaterializationStyle::AllEarlyExcept(ref cols) => !cols.contains(&(field.id as u32)),
1563 MaterializationStyle::Heuristic => {
1564 if field.is_blob() {
1565 return true;
1570 }
1571
1572 let byte_width = field.data_type().byte_width_opt();
1573 let is_cloud = self.dataset.object_store().is_cloud();
1574 if is_cloud {
1575 byte_width.is_some_and(|bw| bw < 1000)
1576 } else {
1577 byte_width.is_some_and(|bw| bw < 10)
1578 }
1579 }
1580 }
1581 }
1582
1583 fn calc_eager_projection(
1588 &self,
1589 filter_plan: &FilterPlan,
1590 desired_projection: &Projection,
1591 ) -> Result<Projection> {
1592 let filter_columns = filter_plan.all_columns();
1599
1600 let filter_schema = self
1601 .dataset
1602 .empty_projection()
1603 .union_columns(filter_columns, OnMissing::Error)?
1604 .into_schema();
1605 if filter_schema.fields.iter().any(|f| !f.is_default_storage()) {
1606 return Err(Error::NotSupported {
1607 source: "non-default storage columns cannot be used as filters".into(),
1608 location: location!(),
1609 });
1610 }
1611
1612 Ok(desired_projection
1614 .clone()
1615 .subtract_predicate(|f| !self.is_early_field(f))
1617 .union_schema(&filter_schema))
1619 }
1620
1621 fn validate_options(&self) -> Result<()> {
1622 if self.include_deleted_rows && !self.projection_plan.physical_projection.with_row_id {
1623 return Err(Error::InvalidInput {
1624 source: "include_deleted_rows is set but with_row_id is false".into(),
1625 location: location!(),
1626 });
1627 }
1628
1629 Ok(())
1630 }
1631
1632 async fn create_filter_plan(&self, use_scalar_index: bool) -> Result<FilterPlan> {
1633 let filter_schema = self.filterable_schema()?;
1634 let planner = Planner::new(Arc::new(filter_schema.as_ref().into()));
1635
1636 if let Some(filter) = self.filter.as_ref() {
1637 let filter = filter.to_datafusion(self.dataset.schema(), filter_schema.as_ref())?;
1638 let index_info = self.dataset.scalar_index_info().await?;
1639 let filter_plan =
1640 planner.create_filter_plan(filter.clone(), &index_info, use_scalar_index)?;
1641
1642 if filter_plan.index_query.is_some() {
1645 let fragments = if let Some(fragments) = self.fragments.as_ref() {
1646 fragments
1647 } else {
1648 self.dataset.fragments()
1649 };
1650 let mut has_missing_row_count = false;
1651 for frag in fragments {
1652 if frag.physical_rows.is_none() {
1653 has_missing_row_count = true;
1654 break;
1655 }
1656 }
1657 if has_missing_row_count {
1658 Ok(planner.create_filter_plan(filter.clone(), &index_info, false)?)
1661 } else {
1662 Ok(filter_plan)
1663 }
1664 } else {
1665 Ok(filter_plan)
1666 }
1667 } else {
1668 Ok(FilterPlan::default())
1669 }
1670 }
1671
1672 async fn get_scan_range(&self, filter_plan: &FilterPlan) -> Result<Option<Range<u64>>> {
1673 if filter_plan.has_any_filter() {
1674 Ok(None)
1676 } else if self.ordering.is_some() {
1677 Ok(None)
1680 } else {
1681 match (self.limit, self.offset) {
1682 (None, None) => Ok(None),
1683 (Some(limit), None) => {
1684 let num_rows = self.dataset.count_all_rows().await? as i64;
1685 Ok(Some(0..limit.min(num_rows) as u64))
1686 }
1687 (None, Some(offset)) => {
1688 let num_rows = self.dataset.count_all_rows().await? as i64;
1689 Ok(Some(offset.min(num_rows) as u64..num_rows as u64))
1690 }
1691 (Some(limit), Some(offset)) => {
1692 let num_rows = self.dataset.count_all_rows().await? as i64;
1693 Ok(Some(
1694 offset.min(num_rows) as u64..(offset + limit).min(num_rows) as u64,
1695 ))
1696 }
1697 }
1698 }
1699 }
1700
1701 #[instrument(level = "debug", skip_all)]
1747 pub async fn create_plan(&self) -> Result<Arc<dyn ExecutionPlan>> {
1748 log::trace!("creating scanner plan");
1749 self.validate_options()?;
1750
1751 let use_scalar_index = self.use_scalar_index && (self.prefilter || self.nearest.is_none());
1753 let mut filter_plan = self.create_filter_plan(use_scalar_index).await?;
1754
1755 let mut use_limit_node = true;
1756 let mut plan: Arc<dyn ExecutionPlan> = match (&self.nearest, &self.full_text_query) {
1758 (Some(_), None) => self.vector_search_source(&mut filter_plan).await?,
1759 (None, Some(query)) => self.fts_search_source(&mut filter_plan, query).await?,
1760 (None, None) => {
1761 if self.projection_plan.has_output_cols()
1762 && self.projection_plan.physical_projection.is_empty()
1763 {
1764 let output_expr = self.calculate_final_projection(&ArrowSchema::empty())?;
1775 return Err(Error::NotSupported {
1776 source: format!("Scans must request at least one column. Received only dynamic expressions: {:?}", output_expr).into(),
1777 location: location!(),
1778 });
1779 }
1780
1781 let take_op = filter_plan
1782 .full_expr
1783 .as_ref()
1784 .and_then(TakeOperation::try_from_expr);
1785 if let Some((take_op, remainder)) = take_op {
1786 filter_plan = remainder
1789 .map(FilterPlan::new_refine_only)
1790 .unwrap_or(FilterPlan::default());
1791 self.take_source(take_op).await?
1792 } else {
1793 let planned_read = self.filtered_read_source(&mut filter_plan).await?;
1794 if planned_read.limit_pushed_down {
1795 use_limit_node = false;
1796 }
1797 if planned_read.filter_pushed_down {
1798 filter_plan = FilterPlan::default();
1799 }
1800 planned_read.plan
1801 }
1802 }
1803 _ => {
1804 return Err(Error::InvalidInput {
1805 source: "Cannot have both nearest and full text search".into(),
1806 location: location!(),
1807 })
1808 }
1809 };
1810
1811 let mut pre_filter_projection = self.dataset.empty_projection();
1814
1815 if filter_plan.has_refine() {
1818 pre_filter_projection = pre_filter_projection
1820 .union_columns(filter_plan.refine_columns(), OnMissing::Ignore)?;
1821 }
1822
1823 if let Some(ordering) = &self.ordering {
1828 pre_filter_projection = pre_filter_projection.union_columns(
1829 ordering.iter().map(|col| &col.column_name),
1830 OnMissing::Error,
1831 )?;
1832 }
1833
1834 plan = self.take(plan, pre_filter_projection)?;
1835
1836 if let Some(refine_expr) = filter_plan.refine_expr {
1838 plan = Arc::new(LanceFilterExec::try_new(refine_expr, plan)?);
1841 }
1842
1843 if let Some(ordering) = &self.ordering {
1845 let ordering_columns = ordering.iter().map(|col| &col.column_name);
1846 let projection_with_ordering = self
1847 .dataset
1848 .empty_projection()
1849 .union_columns(ordering_columns, OnMissing::Error)?;
1850 plan = self.take(plan, projection_with_ordering)?;
1852 let col_exprs = ordering
1853 .iter()
1854 .map(|col| {
1855 Ok(PhysicalSortExpr {
1856 expr: Self::create_column_expr(
1857 &col.column_name,
1858 &self.dataset,
1859 plan.schema().as_ref(),
1860 )?,
1861 options: SortOptions {
1862 descending: !col.ascending,
1863 nulls_first: col.nulls_first,
1864 },
1865 })
1866 })
1867 .collect::<Result<Vec<_>>>()?;
1868 plan = Arc::new(SortExec::new(
1869 LexOrdering::new(col_exprs)
1870 .ok_or(exec_datafusion_err!("Unexpected empty sort expressions"))?,
1871 plan,
1872 ));
1873 }
1874
1875 if use_limit_node && (self.limit.unwrap_or(0) > 0 || self.offset.is_some()) {
1877 plan = self.limit_node(plan);
1878 }
1879
1880 plan = self.take(plan, self.projection_plan.physical_projection.clone())?;
1882
1883 if self.projection_plan.must_add_row_offset {
1885 plan = Arc::new(AddRowOffsetExec::try_new(plan, self.dataset.clone()).await?);
1886 }
1887
1888 let final_projection = self.calculate_final_projection(plan.schema().as_ref())?;
1890
1891 plan = Arc::new(DFProjectionExec::try_new(final_projection, plan)?);
1892
1893 if self.strict_batch_size {
1895 plan = Arc::new(StrictBatchSizeExec::new(plan, self.get_batch_size()));
1896 }
1897
1898 let optimizer = get_physical_optimizer();
1899 let options = Default::default();
1900 for rule in optimizer.rules {
1901 plan = rule.optimize(plan, &options)?;
1902 }
1903
1904 Ok(plan)
1905 }
1906
1907 async fn legacy_filtered_read(
1913 &self,
1914 filter_plan: &FilterPlan,
1915 projection: Projection,
1916 make_deletions_null: bool,
1917 fragments: Option<Arc<Vec<Fragment>>>,
1918 scan_range: Option<Range<u64>>,
1919 is_prefilter: bool,
1920 ) -> Result<PlannedFilteredScan> {
1921 let fragments = fragments.unwrap_or(self.dataset.fragments().clone());
1922 let mut filter_pushed_down = false;
1923
1924 let plan: Arc<dyn ExecutionPlan> = if filter_plan.has_index_query() {
1925 if self.include_deleted_rows {
1926 return Err(Error::InvalidInput {
1927 source: "Cannot include deleted rows in a scalar indexed scan".into(),
1928 location: location!(),
1929 });
1930 }
1931 self.scalar_indexed_scan(projection, filter_plan, fragments)
1932 .await
1933 } else if !is_prefilter
1934 && filter_plan.has_refine()
1935 && self.batch_size.is_none()
1936 && self.use_stats
1937 {
1938 filter_pushed_down = true;
1939 self.pushdown_scan(false, filter_plan)
1940 } else {
1941 let ordered = if self.ordering.is_some() || self.nearest.is_some() {
1942 false
1944 } else {
1945 self.ordered
1946 };
1947
1948 let projection = if let Some(refine_expr) = filter_plan.refine_expr.as_ref() {
1949 if is_prefilter {
1950 let refine_cols = Planner::column_names_in_expr(refine_expr);
1951 projection.union_columns(refine_cols, OnMissing::Error)?
1952 } else {
1953 projection
1954 }
1955 } else {
1956 projection
1957 };
1958
1959 let scan_range = if filter_plan.has_refine() {
1961 None
1962 } else {
1963 scan_range
1964 };
1965
1966 let scan = self.scan_fragments(
1967 projection.with_row_id,
1968 self.projection_plan.physical_projection.with_row_addr,
1969 make_deletions_null,
1970 Arc::new(projection.to_bare_schema()),
1971 fragments,
1972 scan_range,
1973 ordered,
1974 );
1975
1976 if filter_plan.has_refine() && is_prefilter {
1977 Ok(Arc::new(LanceFilterExec::try_new(
1978 filter_plan.refine_expr.clone().unwrap(),
1979 scan,
1980 )?) as Arc<dyn ExecutionPlan>)
1981 } else {
1982 Ok(scan)
1983 }
1984 }?;
1985 Ok(PlannedFilteredScan {
1986 plan,
1987 limit_pushed_down: false,
1988 filter_pushed_down,
1989 })
1990 }
1991
1992 async fn new_filtered_read(
1996 &self,
1997 filter_plan: &FilterPlan,
1998 projection: Projection,
1999 make_deletions_null: bool,
2000 fragments: Option<Arc<Vec<Fragment>>>,
2001 scan_range: Option<Range<u64>>,
2002 ) -> Result<Arc<dyn ExecutionPlan>> {
2003 let mut read_options = FilteredReadOptions::basic_full_read(&self.dataset)
2004 .with_filter_plan(filter_plan.clone())
2005 .with_projection(projection);
2006
2007 if let Some(fragments) = fragments {
2008 read_options = read_options.with_fragments(fragments);
2009 }
2010
2011 if let Some(scan_range) = scan_range {
2012 read_options = read_options.with_scan_range_before_filter(scan_range)?;
2013 }
2014
2015 if let Some(batch_size) = self.batch_size {
2016 read_options = read_options.with_batch_size(batch_size as u32);
2017 }
2018
2019 if let Some(fragment_readahead) = self.fragment_readahead {
2020 read_options = read_options.with_fragment_readahead(fragment_readahead);
2021 }
2022
2023 if make_deletions_null {
2024 read_options = read_options.with_deleted_rows()?;
2025 }
2026
2027 let index_input = filter_plan.index_query.clone().map(|index_query| {
2028 Arc::new(ScalarIndexExec::new(self.dataset.clone(), index_query))
2029 as Arc<dyn ExecutionPlan>
2030 });
2031
2032 Ok(Arc::new(FilteredReadExec::try_new(
2033 self.dataset.clone(),
2034 read_options,
2035 index_input,
2036 )?))
2037 }
2038
2039 async fn filtered_read(
2043 &self,
2044 filter_plan: &FilterPlan,
2045 projection: Projection,
2046 make_deletions_null: bool,
2047 fragments: Option<Arc<Vec<Fragment>>>,
2048 scan_range: Option<Range<u64>>,
2049 is_prefilter: bool,
2050 ) -> Result<PlannedFilteredScan> {
2051 if self.dataset.is_legacy_storage() {
2052 self.legacy_filtered_read(
2053 filter_plan,
2054 projection,
2055 make_deletions_null,
2056 fragments,
2057 scan_range,
2058 is_prefilter,
2059 )
2060 .await
2061 } else {
2062 let limit_pushed_down = scan_range.is_some();
2063 let plan = self
2064 .new_filtered_read(
2065 filter_plan,
2066 projection,
2067 make_deletions_null,
2068 fragments,
2069 scan_range,
2070 )
2071 .await?;
2072 Ok(PlannedFilteredScan {
2073 filter_pushed_down: true,
2074 limit_pushed_down,
2075 plan,
2076 })
2077 }
2078 }
2079
2080 fn u64s_as_take_input(&self, u64s: Vec<u64>) -> Result<Arc<dyn ExecutionPlan>> {
2081 let row_ids = RowIdTreeMap::from_iter(u64s);
2082 let row_id_mask = RowIdMask::from_allowed(row_ids);
2083 let index_result = IndexExprResult::Exact(row_id_mask);
2084 let fragments_covered =
2085 RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32));
2086 let batch = index_result.serialize_to_arrow(&fragments_covered)?;
2087 let stream = futures::stream::once(async move { Ok(batch) });
2088 let stream = Box::pin(RecordBatchStreamAdapter::new(
2089 INDEX_EXPR_RESULT_SCHEMA.clone(),
2090 stream,
2091 ));
2092 Ok(Arc::new(OneShotExec::new(stream)))
2093 }
2094
2095 async fn take_source(&self, take_op: TakeOperation) -> Result<Arc<dyn ExecutionPlan>> {
2096 let projection = self.projection_plan.physical_projection.clone();
2099
2100 let input = match take_op {
2101 TakeOperation::RowIds(ids) => self.u64s_as_take_input(ids),
2102 TakeOperation::RowAddrs(addrs) => self.u64s_as_take_input(addrs),
2103 TakeOperation::RowOffsets(offsets) => {
2104 let mut addrs =
2105 row_offsets_to_row_addresses(self.dataset.as_ref(), &offsets).await?;
2106 addrs.retain(|addr| *addr != RowAddress::TOMBSTONE_ROW);
2107 self.u64s_as_take_input(addrs)
2108 }
2109 }?;
2110
2111 Ok(Arc::new(FilteredReadExec::try_new(
2112 self.dataset.clone(),
2113 FilteredReadOptions::new(projection),
2114 Some(input),
2115 )?))
2116 }
2117
2118 async fn filtered_read_source(
2119 &self,
2120 filter_plan: &mut FilterPlan,
2121 ) -> Result<PlannedFilteredScan> {
2122 log::trace!("source is a filtered read");
2123 let mut projection = if filter_plan.has_refine() {
2124 self.calc_eager_projection(filter_plan, &self.projection_plan.physical_projection)?
2128 .with_row_id()
2129 } else {
2130 self.projection_plan.physical_projection.clone()
2133 };
2134
2135 if projection.is_empty() {
2136 projection.with_row_addr = true;
2139 }
2140
2141 let scan_range = if filter_plan.is_empty() {
2142 log::trace!("pushing scan_range into filtered_read");
2143 self.get_scan_range(filter_plan).await?
2144 } else {
2145 None
2146 };
2147
2148 self.filtered_read(
2149 filter_plan,
2150 projection,
2151 self.include_deleted_rows,
2152 self.fragments.clone().map(Arc::new),
2153 scan_range,
2154 false,
2155 )
2156 .await
2157 }
2158
2159 async fn fts_search_source(
2160 &self,
2161 filter_plan: &mut FilterPlan,
2162 query: &FullTextSearchQuery,
2163 ) -> Result<Arc<dyn ExecutionPlan>> {
2164 log::trace!("source is an fts search");
2165 if self.include_deleted_rows {
2166 return Err(Error::InvalidInput {
2167 source: "Cannot include deleted rows in an FTS search".into(),
2168 location: location!(),
2169 });
2170 }
2171
2172 if self.prefilter {
2174 let source = self.fts(filter_plan, query).await?;
2176 *filter_plan = FilterPlan::default();
2177 Ok(source)
2178 } else {
2179 filter_plan.make_refine_only();
2182 self.fts(&FilterPlan::default(), query).await
2183 }
2184 }
2185
2186 async fn vector_search_source(
2187 &self,
2188 filter_plan: &mut FilterPlan,
2189 ) -> Result<Arc<dyn ExecutionPlan>> {
2190 if self.include_deleted_rows {
2191 return Err(Error::InvalidInput {
2192 source: "Cannot include deleted rows in a nearest neighbor search".into(),
2193 location: location!(),
2194 });
2195 }
2196
2197 if self.prefilter {
2198 log::trace!("source is a vector search (prefilter)");
2199 let source = self.vector_search(filter_plan).await?;
2201 *filter_plan = FilterPlan::default();
2202 Ok(source)
2203 } else {
2204 log::trace!("source is a vector search (postfilter)");
2205 filter_plan.make_refine_only();
2208 self.vector_search(&FilterPlan::default()).await
2209 }
2210 }
2211
2212 async fn fragments_covered_by_fts_leaf(
2213 &self,
2214 column: &str,
2215 accum: &mut RoaringBitmap,
2216 ) -> Result<bool> {
2217 let index = self
2218 .dataset
2219 .load_scalar_index(
2220 ScalarIndexCriteria::default()
2221 .for_column(column)
2222 .supports_fts(),
2223 )
2224 .await?
2225 .ok_or(Error::invalid_input(
2226 format!("Column {} has no inverted index", column),
2227 location!(),
2228 ))?;
2229 if let Some(fragmap) = &index.fragment_bitmap {
2230 *accum |= fragmap;
2231 Ok(true)
2232 } else {
2233 Ok(false)
2234 }
2235 }
2236
2237 #[async_recursion]
2238 async fn fragments_covered_by_fts_query_helper(
2239 &self,
2240 query: &FtsQuery,
2241 accum: &mut RoaringBitmap,
2242 ) -> Result<bool> {
2243 match query {
2244 FtsQuery::Match(match_query) => {
2245 self.fragments_covered_by_fts_leaf(
2246 match_query.column.as_ref().ok_or(Error::invalid_input(
2247 "the column must be specified in the query".to_string(),
2248 location!(),
2249 ))?,
2250 accum,
2251 )
2252 .await
2253 }
2254 FtsQuery::Boost(boost) => Ok(self
2255 .fragments_covered_by_fts_query_helper(&boost.negative, accum)
2256 .await?
2257 & self
2258 .fragments_covered_by_fts_query_helper(&boost.positive, accum)
2259 .await?),
2260 FtsQuery::MultiMatch(multi_match) => {
2261 for mq in &multi_match.match_queries {
2262 if !self
2263 .fragments_covered_by_fts_leaf(
2264 mq.column.as_ref().ok_or(Error::invalid_input(
2265 "the column must be specified in the query".to_string(),
2266 location!(),
2267 ))?,
2268 accum,
2269 )
2270 .await?
2271 {
2272 return Ok(false);
2273 }
2274 }
2275 Ok(true)
2276 }
2277 FtsQuery::Phrase(phrase_query) => {
2278 self.fragments_covered_by_fts_leaf(
2279 phrase_query.column.as_ref().ok_or(Error::invalid_input(
2280 "the column must be specified in the query".to_string(),
2281 location!(),
2282 ))?,
2283 accum,
2284 )
2285 .await
2286 }
2287 FtsQuery::Boolean(bool_query) => {
2288 for query in bool_query.must.iter() {
2289 if !self
2290 .fragments_covered_by_fts_query_helper(query, accum)
2291 .await?
2292 {
2293 return Ok(false);
2294 }
2295 }
2296 for query in &bool_query.should {
2297 if !self
2298 .fragments_covered_by_fts_query_helper(query, accum)
2299 .await?
2300 {
2301 return Ok(false);
2302 }
2303 }
2304 Ok(true)
2305 }
2306 }
2307 }
2308
2309 async fn fragments_covered_by_fts_query(&self, query: &FtsQuery) -> Result<RoaringBitmap> {
2310 let all_fragments = self.get_fragments_as_bitmap();
2311
2312 let mut referenced_fragments = RoaringBitmap::new();
2313 if !self
2314 .fragments_covered_by_fts_query_helper(query, &mut referenced_fragments)
2315 .await?
2316 {
2317 Ok(all_fragments)
2319 } else {
2320 Ok(all_fragments & referenced_fragments)
2322 }
2323 }
2324
2325 async fn fts(
2327 &self,
2328 filter_plan: &FilterPlan,
2329 query: &FullTextSearchQuery,
2330 ) -> Result<Arc<dyn ExecutionPlan>> {
2331 let columns = query.columns();
2332 let mut params = query.params();
2333 if params.limit.is_none() {
2334 let search_limit = match (self.limit, self.offset) {
2335 (Some(limit), Some(offset)) => Some((limit + offset) as usize),
2336 (Some(limit), None) => Some(limit as usize),
2337 (None, Some(_)) => None, (None, None) => None,
2339 };
2340 params = params.with_limit(search_limit);
2341 }
2342 let query = if columns.is_empty() {
2343 let mut indexed_columns = Vec::new();
2346 for field in self.dataset.schema().fields_pre_order() {
2347 let is_string_field = match field.data_type() {
2349 DataType::Utf8 | DataType::LargeUtf8 => true,
2350 DataType::List(inner_field) | DataType::LargeList(inner_field) => {
2351 matches!(
2352 inner_field.data_type(),
2353 DataType::Utf8 | DataType::LargeUtf8
2354 )
2355 }
2356 _ => false,
2357 };
2358
2359 if is_string_field {
2360 let column_path = if let Some(ancestors) =
2362 self.dataset.schema().field_ancestry_by_id(field.id)
2363 {
2364 let field_refs: Vec<&str> =
2365 ancestors.iter().map(|f| f.name.as_str()).collect();
2366 format_field_path(&field_refs)
2367 } else {
2368 continue; };
2370
2371 let has_fts_index = self
2373 .dataset
2374 .load_scalar_index(
2375 ScalarIndexCriteria::default()
2376 .for_column(&column_path)
2377 .supports_fts(),
2378 )
2379 .await?
2380 .is_some();
2381
2382 if has_fts_index {
2383 indexed_columns.push(column_path);
2384 }
2385 }
2386 }
2387
2388 fill_fts_query_column(&query.query, &indexed_columns, false)?
2389 } else {
2390 query.query.clone()
2391 };
2392
2393 let prefilter_source = self
2397 .prefilter_source(
2398 filter_plan,
2399 self.fragments_covered_by_fts_query(&query).await?,
2400 )
2401 .await?;
2402 let fts_exec = self
2403 .plan_fts(&query, ¶ms, filter_plan, &prefilter_source)
2404 .await?;
2405 Ok(fts_exec)
2406 }
2407
2408 async fn plan_fts(
2409 &self,
2410 query: &FtsQuery,
2411 params: &FtsSearchParams,
2412 filter_plan: &FilterPlan,
2413 prefilter_source: &PreFilterSource,
2414 ) -> Result<Arc<dyn ExecutionPlan>> {
2415 let plan: Arc<dyn ExecutionPlan> = match query {
2416 FtsQuery::Match(query) => {
2417 self.plan_match_query(query, params, filter_plan, prefilter_source)
2418 .await?
2419 }
2420 FtsQuery::Phrase(query) => Arc::new(PhraseQueryExec::new(
2421 self.dataset.clone(),
2422 query.clone(),
2423 params.clone(),
2424 prefilter_source.clone(),
2425 )),
2426
2427 FtsQuery::Boost(query) => {
2428 let unlimited_params = params.clone().with_limit(None);
2432 let positive_exec = Box::pin(self.plan_fts(
2433 &query.positive,
2434 &unlimited_params,
2435 filter_plan,
2436 prefilter_source,
2437 ));
2438 let negative_exec = Box::pin(self.plan_fts(
2439 &query.negative,
2440 &unlimited_params,
2441 filter_plan,
2442 prefilter_source,
2443 ));
2444 let (positive_exec, negative_exec) =
2445 futures::future::try_join(positive_exec, negative_exec).await?;
2446 Arc::new(BoostQueryExec::new(
2447 query.clone(),
2448 params.clone(),
2449 positive_exec,
2450 negative_exec,
2451 ))
2452 }
2453
2454 FtsQuery::MultiMatch(query) => {
2455 let mut children = Vec::with_capacity(query.match_queries.len());
2456 for match_query in &query.match_queries {
2457 let child =
2458 self.plan_match_query(match_query, params, filter_plan, prefilter_source);
2459 children.push(child);
2460 }
2461 let children = futures::future::try_join_all(children).await?;
2462
2463 let schema = children[0].schema();
2464 let group_expr = vec![(
2465 expressions::col(ROW_ID, schema.as_ref())?,
2466 ROW_ID.to_string(),
2467 )];
2468
2469 let fts_node = Arc::new(UnionExec::new(children));
2470 let fts_node = Arc::new(RepartitionExec::try_new(
2471 fts_node,
2472 Partitioning::RoundRobinBatch(1),
2473 )?);
2474 let fts_node = Arc::new(AggregateExec::try_new(
2476 AggregateMode::Single,
2477 PhysicalGroupBy::new_single(group_expr),
2478 vec![Arc::new(
2479 AggregateExprBuilder::new(
2480 functions_aggregate::min_max::max_udaf(),
2481 vec![expressions::col(SCORE_COL, &schema)?],
2482 )
2483 .schema(schema.clone())
2484 .alias(SCORE_COL)
2485 .build()?,
2486 )],
2487 vec![None],
2488 fts_node,
2489 schema,
2490 )?);
2491 let sort_expr = PhysicalSortExpr {
2492 expr: expressions::col(SCORE_COL, fts_node.schema().as_ref())?,
2493 options: SortOptions {
2494 descending: true,
2495 nulls_first: false,
2496 },
2497 };
2498
2499 Arc::new(
2500 SortExec::new([sort_expr].into(), fts_node)
2501 .with_fetch(self.limit.map(|l| l as usize)),
2502 )
2503 }
2504 FtsQuery::Boolean(query) => {
2505 let unlimited_params = params.clone().with_limit(None);
2510
2511 let mut should = Vec::with_capacity(query.should.len());
2513 for subquery in &query.should {
2514 let plan = Box::pin(self.plan_fts(
2515 subquery,
2516 &unlimited_params,
2517 filter_plan,
2518 prefilter_source,
2519 ))
2520 .await?;
2521 should.push(plan);
2522 }
2523 let should = if should.is_empty() {
2524 Arc::new(EmptyExec::new(FTS_SCHEMA.clone()))
2525 } else if should.len() == 1 {
2526 should.pop().unwrap()
2527 } else {
2528 let unioned = Arc::new(UnionExec::new(should));
2529 Arc::new(RepartitionExec::try_new(
2530 unioned,
2531 Partitioning::RoundRobinBatch(1),
2532 )?)
2533 };
2534
2535 let mut must = None;
2537 for query in &query.must {
2538 let plan = Box::pin(self.plan_fts(
2539 query,
2540 &unlimited_params,
2541 filter_plan,
2542 prefilter_source,
2543 ))
2544 .await?;
2545 if let Some(joined_plan) = must {
2546 must = Some(Arc::new(HashJoinExec::try_new(
2547 joined_plan,
2548 plan,
2549 vec![(
2550 Arc::new(Column::new_with_schema(ROW_ID, &FTS_SCHEMA)?),
2551 Arc::new(Column::new_with_schema(ROW_ID, &FTS_SCHEMA)?),
2552 )],
2553 None,
2554 &datafusion_expr::JoinType::Inner,
2555 None,
2556 datafusion_physical_plan::joins::PartitionMode::CollectLeft,
2557 NullEquality::NullEqualsNothing,
2558 )?) as _);
2559 } else {
2560 must = Some(plan);
2561 }
2562 }
2563
2564 let mut must_not = Vec::with_capacity(query.must_not.len());
2566 for query in &query.must_not {
2567 let plan = Box::pin(self.plan_fts(
2568 query,
2569 &unlimited_params,
2570 filter_plan,
2571 prefilter_source,
2572 ))
2573 .await?;
2574 must_not.push(plan);
2575 }
2576 let must_not = if must_not.is_empty() {
2577 Arc::new(EmptyExec::new(FTS_SCHEMA.clone()))
2578 } else if must_not.len() == 1 {
2579 must_not.pop().unwrap()
2580 } else {
2581 let unioned = Arc::new(UnionExec::new(must_not));
2582 Arc::new(RepartitionExec::try_new(
2583 unioned,
2584 Partitioning::RoundRobinBatch(1),
2585 )?)
2586 };
2587
2588 if query.should.is_empty() && must.is_none() {
2589 return Err(Error::invalid_input(
2590 "boolean query must have at least one should/must query".to_string(),
2591 location!(),
2592 ));
2593 }
2594
2595 Arc::new(BooleanQueryExec::new(
2596 query.clone(),
2597 params.clone(),
2598 should,
2599 must,
2600 must_not,
2601 ))
2602 }
2603 };
2604
2605 Ok(plan)
2606 }
2607
2608 async fn plan_match_query(
2609 &self,
2610 query: &MatchQuery,
2611 params: &FtsSearchParams,
2612 filter_plan: &FilterPlan,
2613 prefilter_source: &PreFilterSource,
2614 ) -> Result<Arc<dyn ExecutionPlan>> {
2615 let column = query
2616 .column
2617 .as_ref()
2618 .ok_or(Error::invalid_input(
2619 "the column must be specified in the query".to_string(),
2620 location!(),
2621 ))?
2622 .clone();
2623
2624 let index = self
2625 .dataset
2626 .load_scalar_index(
2627 ScalarIndexCriteria::default()
2628 .for_column(&column)
2629 .supports_fts(),
2630 )
2631 .await?
2632 .ok_or(Error::invalid_input(
2633 format!(
2634 "Column {} has no inverted index",
2635 query.column.as_ref().unwrap()
2636 ),
2637 location!(),
2638 ))?;
2639
2640 let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
2641 let mut match_plan: Arc<dyn ExecutionPlan> = Arc::new(MatchQueryExec::new(
2642 self.dataset.clone(),
2643 query.clone(),
2644 params.clone(),
2645 prefilter_source.clone(),
2646 ));
2647 if !unindexed_fragments.is_empty() {
2648 let mut columns = vec![column.clone()];
2649 if let Some(expr) = filter_plan.full_expr.as_ref() {
2650 let filter_columns = Planner::column_names_in_expr(expr);
2651 columns.extend(filter_columns);
2652 }
2653 let flat_fts_scan_schema = Arc::new(self.dataset.schema().project(&columns).unwrap());
2654 let mut scan_node = self.scan_fragments(
2655 true,
2656 false,
2657 false,
2658 flat_fts_scan_schema,
2659 Arc::new(unindexed_fragments),
2660 None,
2661 false,
2662 );
2663
2664 if let Some(expr) = filter_plan.full_expr.as_ref() {
2665 scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?);
2667 }
2668
2669 let flat_match_plan = Arc::new(FlatMatchQueryExec::new(
2670 self.dataset.clone(),
2671 query.clone(),
2672 params.clone(),
2673 scan_node,
2674 ));
2675
2676 match_plan = Arc::new(UnionExec::new(vec![match_plan, flat_match_plan]));
2677 match_plan = Arc::new(RepartitionExec::try_new(
2678 match_plan,
2679 Partitioning::RoundRobinBatch(1),
2680 )?);
2681 let sort_expr = PhysicalSortExpr {
2682 expr: expressions::col(SCORE_COL, match_plan.schema().as_ref())?,
2683 options: SortOptions {
2684 descending: true,
2685 nulls_first: false,
2686 },
2687 };
2688 match_plan =
2689 Arc::new(SortExec::new([sort_expr].into(), match_plan).with_fetch(params.limit));
2690 }
2691 Ok(match_plan)
2692 }
2693
2694 async fn vector_search(&self, filter_plan: &FilterPlan) -> Result<Arc<dyn ExecutionPlan>> {
2696 let Some(q) = self.nearest.as_ref() else {
2697 return Err(Error::invalid_input(
2698 "No nearest query".to_string(),
2699 location!(),
2700 ));
2701 };
2702
2703 let (vector_type, _) = get_vector_type(self.dataset.schema(), &q.column)?;
2705
2706 let column_id = self.dataset.schema().field_id(q.column.as_str())?;
2707 let use_index = self.nearest.as_ref().map(|q| q.use_index).unwrap_or(false);
2708 let indices = if use_index {
2709 self.dataset.load_indices().await?
2710 } else {
2711 Arc::new(vec![])
2712 };
2713 if let Some(index) = indices.iter().find(|i| i.fields.contains(&column_id)) {
2714 log::trace!("index found for vector search");
2715 if matches!(q.refine_factor, Some(0)) {
2718 return Err(Error::invalid_input(
2719 "Refine factor cannot be zero".to_string(),
2720 location!(),
2721 ));
2722 }
2723
2724 let deltas = self.dataset.load_indices_by_name(&index.name).await?;
2726 let ann_node = match vector_type {
2727 DataType::FixedSizeList(_, _) => self.ann(q, &deltas, filter_plan).await?,
2728 DataType::List(_) => self.multivec_ann(q, &deltas, filter_plan).await?,
2729 _ => unreachable!(),
2730 };
2731
2732 let mut knn_node = if q.refine_factor.is_some() {
2733 let vector_projection = self
2734 .dataset
2735 .empty_projection()
2736 .union_column(&q.column, OnMissing::Error)
2737 .unwrap();
2738 let knn_node_with_vector = self.take(ann_node, vector_projection)?;
2739 let idx = self
2741 .dataset
2742 .open_vector_index(
2743 q.column.as_str(),
2744 &index.uuid.to_string(),
2745 &NoOpMetricsCollector,
2746 )
2747 .await?;
2748 let mut q = q.clone();
2749 q.metric_type = idx.metric_type();
2750 self.flat_knn(knn_node_with_vector, &q)?
2751 } else {
2752 ann_node
2753 }; if !self.fast_search {
2756 knn_node = self.knn_combined(q, index, knn_node, filter_plan).await?;
2757 }
2758
2759 Ok(knn_node)
2760 } else {
2761 let mut columns = vec![q.column.clone()];
2763 if let Some(refine_expr) = filter_plan.refine_expr.as_ref() {
2764 columns.extend(Planner::column_names_in_expr(refine_expr));
2765 }
2766 let mut vector_scan_projection = self
2767 .dataset
2768 .empty_projection()
2769 .with_row_id()
2770 .union_columns(&columns, OnMissing::Error)?;
2771
2772 vector_scan_projection.with_row_addr =
2773 self.projection_plan.physical_projection.with_row_addr;
2774
2775 let PlannedFilteredScan { mut plan, .. } = self
2776 .filtered_read(
2777 filter_plan,
2778 vector_scan_projection,
2779 true,
2780 None,
2781 None,
2782 true,
2783 )
2784 .await?;
2785
2786 if let Some(refine_expr) = &filter_plan.refine_expr {
2787 plan = Arc::new(LanceFilterExec::try_new(refine_expr.clone(), plan)?);
2788 }
2789 Ok(self.flat_knn(plan, q)?)
2790 }
2791 }
2792
2793 async fn knn_combined(
2795 &self,
2796 q: &Query,
2797 index: &IndexMetadata,
2798 mut knn_node: Arc<dyn ExecutionPlan>,
2799 filter_plan: &FilterPlan,
2800 ) -> Result<Arc<dyn ExecutionPlan>> {
2801 let unindexed_fragments = self.dataset.unindexed_fragments(&index.name).await?;
2803 if !unindexed_fragments.is_empty() {
2804 let idx = self
2807 .dataset
2808 .open_vector_index(
2809 q.column.as_str(),
2810 &index.uuid.to_string(),
2811 &NoOpMetricsCollector,
2812 )
2813 .await?;
2814 let mut q = q.clone();
2815 q.metric_type = idx.metric_type();
2816
2817 if knn_node.schema().column_with_name(&q.column).is_none() {
2820 let vector_projection = self
2821 .dataset
2822 .empty_projection()
2823 .union_column(&q.column, OnMissing::Error)
2824 .unwrap();
2825 knn_node = self.take(knn_node, vector_projection)?;
2826 }
2827
2828 let mut columns = vec![q.column.clone()];
2829 if let Some(expr) = filter_plan.full_expr.as_ref() {
2830 let filter_columns = Planner::column_names_in_expr(expr);
2831 columns.extend(filter_columns);
2832 }
2833 let vector_scan_projection = Arc::new(self.dataset.schema().project(&columns).unwrap());
2834 let mut scan_node = self.scan_fragments(
2838 true,
2839 false,
2840 false,
2841 vector_scan_projection,
2842 Arc::new(unindexed_fragments),
2843 None,
2845 false,
2848 );
2849
2850 if let Some(expr) = filter_plan.full_expr.as_ref() {
2851 scan_node = Arc::new(LanceFilterExec::try_new(expr.clone(), scan_node)?);
2853 }
2854 let topk_appended = self.flat_knn(scan_node, &q)?;
2856
2857 let topk_appended = project(topk_appended, knn_node.schema().as_ref())?;
2861 assert!(topk_appended
2862 .schema()
2863 .equivalent_names_and_types(&knn_node.schema()));
2864 let unioned = UnionExec::new(vec![Arc::new(topk_appended), knn_node]);
2866 let unioned = RepartitionExec::try_new(
2868 Arc::new(unioned),
2869 datafusion::physical_plan::Partitioning::RoundRobinBatch(1),
2870 )?;
2871 return self.flat_knn(Arc::new(unioned), &q);
2873 }
2874
2875 Ok(knn_node)
2876 }
2877
2878 #[async_recursion]
2879 async fn fragments_covered_by_index_query(
2880 &self,
2881 index_expr: &ScalarIndexExpr,
2882 ) -> Result<RoaringBitmap> {
2883 match index_expr {
2884 ScalarIndexExpr::And(lhs, rhs) => {
2885 Ok(self.fragments_covered_by_index_query(lhs).await?
2886 & self.fragments_covered_by_index_query(rhs).await?)
2887 }
2888 ScalarIndexExpr::Or(lhs, rhs) => Ok(self.fragments_covered_by_index_query(lhs).await?
2889 & self.fragments_covered_by_index_query(rhs).await?),
2890 ScalarIndexExpr::Not(expr) => self.fragments_covered_by_index_query(expr).await,
2891 ScalarIndexExpr::Query(search) => {
2892 let idx = self
2893 .dataset
2894 .load_scalar_index(ScalarIndexCriteria::default().with_name(&search.index_name))
2895 .await?
2896 .expect("Index not found even though it must have been found earlier");
2897 Ok(idx
2898 .fragment_bitmap
2899 .expect("scalar indices should always have a fragment bitmap"))
2900 }
2901 }
2902 }
2903
2904 async fn partition_frags_by_coverage(
2912 &self,
2913 index_expr: &ScalarIndexExpr,
2914 fragments: Arc<Vec<Fragment>>,
2915 ) -> Result<(Vec<Fragment>, Vec<Fragment>)> {
2916 let covered_frags = self.fragments_covered_by_index_query(index_expr).await?;
2917 let mut relevant_frags = Vec::with_capacity(fragments.len());
2918 let mut missing_frags = Vec::with_capacity(fragments.len());
2919 for fragment in fragments.iter() {
2920 if covered_frags.contains(fragment.id as u32) {
2921 relevant_frags.push(fragment.clone());
2922 } else {
2923 missing_frags.push(fragment.clone());
2924 }
2925 }
2926 Ok((relevant_frags, missing_frags))
2927 }
2928
2929 async fn scalar_indexed_scan(
2932 &self,
2933 projection: Projection,
2934 filter_plan: &FilterPlan,
2935 fragments: Arc<Vec<Fragment>>,
2936 ) -> Result<Arc<dyn ExecutionPlan>> {
2937 log::trace!("scalar indexed scan");
2938 let index_expr = filter_plan.index_query.as_ref().unwrap();
2945
2946 let needs_recheck = index_expr.needs_recheck();
2947
2948 let (relevant_frags, missing_frags) = self
2950 .partition_frags_by_coverage(index_expr, fragments)
2951 .await?;
2952
2953 let mut plan: Arc<dyn ExecutionPlan> = Arc::new(MaterializeIndexExec::new(
2954 self.dataset.clone(),
2955 index_expr.clone(),
2956 Arc::new(relevant_frags),
2957 ));
2958
2959 let refine_expr = filter_plan.refine_expr.as_ref();
2960
2961 let needs_take =
2964 needs_recheck || projection.has_data_fields() || filter_plan.refine_expr.is_some();
2965 if needs_take {
2966 let mut take_projection = projection.clone();
2967 if needs_recheck {
2968 let filter_expr = index_expr.to_expr();
2970 let filter_cols = Planner::column_names_in_expr(&filter_expr);
2971 take_projection = take_projection.union_columns(filter_cols, OnMissing::Error)?;
2972 }
2973 if let Some(refine_expr) = refine_expr {
2974 let refine_cols = Planner::column_names_in_expr(refine_expr);
2975 take_projection = take_projection.union_columns(refine_cols, OnMissing::Error)?;
2976 }
2977 log::trace!("need to take additional columns for scalar_indexed_scan");
2978 plan = self.take(plan, take_projection)?;
2979 }
2980
2981 let post_take_filter = match (needs_recheck, refine_expr) {
2982 (false, None) => None,
2983 (true, None) => {
2984 Some(index_expr.to_expr())
2986 }
2987 (true, Some(_)) => Some(filter_plan.full_expr.as_ref().unwrap().clone()),
2988 (false, Some(refine_expr)) => Some(refine_expr.clone()),
2989 };
2990
2991 if let Some(post_take_filter) = post_take_filter {
2992 let planner = Planner::new(plan.schema());
2993 let optimized_filter = planner.optimize_expr(post_take_filter)?;
2994
2995 log::trace!("applying post-take filter to indexed scan");
2996 plan = Arc::new(LanceFilterExec::try_new(optimized_filter, plan)?);
2997 }
2998
2999 if self.projection_plan.physical_projection.with_row_addr {
3000 plan = Arc::new(AddRowAddrExec::try_new(plan, self.dataset.clone(), 0)?);
3001 }
3002
3003 let new_data_path: Option<Arc<dyn ExecutionPlan>> = if !missing_frags.is_empty() {
3004 log::trace!(
3005 "scalar_indexed_scan will need full scan of {} missing fragments",
3006 missing_frags.len()
3007 );
3008
3009 let filter = filter_plan.full_expr.as_ref().unwrap();
3022 let filter_cols = Planner::column_names_in_expr(filter);
3023 let scan_projection = projection.union_columns(filter_cols, OnMissing::Error)?;
3024
3025 let scan_schema = Arc::new(scan_projection.to_bare_schema());
3026 let scan_arrow_schema = Arc::new(scan_schema.as_ref().into());
3027 let planner = Planner::new(scan_arrow_schema);
3028 let optimized_filter = planner.optimize_expr(filter.clone())?;
3029
3030 let new_data_scan = self.scan_fragments(
3031 true,
3032 self.projection_plan.physical_projection.with_row_addr,
3033 false,
3034 scan_schema,
3035 missing_frags.into(),
3036 None,
3038 false,
3039 );
3040 let filtered = Arc::new(LanceFilterExec::try_new(optimized_filter, new_data_scan)?);
3041 Some(Arc::new(project(filtered, plan.schema().as_ref())?))
3042 } else {
3043 log::trace!("scalar_indexed_scan will not need full scan of any missing fragments");
3044 None
3045 };
3046
3047 if let Some(new_data_path) = new_data_path {
3048 let unioned = UnionExec::new(vec![plan, new_data_path]);
3049 let unioned = RepartitionExec::try_new(
3051 Arc::new(unioned),
3052 datafusion::physical_plan::Partitioning::RoundRobinBatch(1),
3053 )?;
3054 Ok(Arc::new(unioned))
3055 } else {
3056 Ok(plan)
3057 }
3058 }
3059
3060 fn get_io_buffer_size(&self) -> u64 {
3061 self.io_buffer_size.unwrap_or(*DEFAULT_IO_BUFFER_SIZE)
3062 }
3063
3064 pub(crate) fn scan(
3069 &self,
3070 with_row_id: bool,
3071 with_row_address: bool,
3072 with_make_deletions_null: bool,
3073 range: Option<Range<u64>>,
3074 projection: Arc<Schema>,
3075 ) -> Arc<dyn ExecutionPlan> {
3076 let fragments = if let Some(fragment) = self.fragments.as_ref() {
3077 Arc::new(fragment.clone())
3078 } else {
3079 self.dataset.fragments().clone()
3080 };
3081 let ordered = if self.ordering.is_some() || self.nearest.is_some() {
3082 false
3084 } else {
3085 self.ordered
3086 };
3087 self.scan_fragments(
3088 with_row_id,
3089 with_row_address,
3090 with_make_deletions_null,
3091 projection,
3092 fragments,
3093 range,
3094 ordered,
3095 )
3096 }
3097
3098 #[allow(clippy::too_many_arguments)]
3099 fn scan_fragments(
3100 &self,
3101 with_row_id: bool,
3102 with_row_address: bool,
3103 with_make_deletions_null: bool,
3104 projection: Arc<Schema>,
3105 fragments: Arc<Vec<Fragment>>,
3106 range: Option<Range<u64>>,
3107 ordered: bool,
3108 ) -> Arc<dyn ExecutionPlan> {
3109 log::trace!("scan_fragments covered {} fragments", fragments.len());
3110 let config = LanceScanConfig {
3111 batch_size: self.get_batch_size(),
3112 batch_readahead: self.batch_readahead,
3113 fragment_readahead: self.fragment_readahead,
3114 io_buffer_size: self.get_io_buffer_size(),
3115 with_row_id,
3116 with_row_address,
3117 with_make_deletions_null,
3118 ordered_output: ordered,
3119 };
3120 Arc::new(LanceScanExec::new(
3121 self.dataset.clone(),
3122 fragments,
3123 range,
3124 projection,
3125 config,
3126 ))
3127 }
3128
3129 fn pushdown_scan(
3130 &self,
3131 make_deletions_null: bool,
3132 filter_plan: &FilterPlan,
3133 ) -> Result<Arc<dyn ExecutionPlan>> {
3134 log::trace!("pushdown_scan");
3135
3136 let config = ScanConfig {
3137 batch_readahead: self.batch_readahead,
3138 fragment_readahead: self
3139 .fragment_readahead
3140 .unwrap_or(LEGACY_DEFAULT_FRAGMENT_READAHEAD),
3141 with_row_id: self.projection_plan.physical_projection.with_row_id,
3142 with_row_address: self.projection_plan.physical_projection.with_row_addr,
3143 make_deletions_null,
3144 ordered_output: self.ordered,
3145 file_reader_options: self
3146 .file_reader_options
3147 .clone()
3148 .or_else(|| self.dataset.file_reader_options.clone()),
3149 };
3150
3151 let fragments = if let Some(fragment) = self.fragments.as_ref() {
3152 Arc::new(fragment.clone())
3153 } else {
3154 self.dataset.fragments().clone()
3155 };
3156
3157 Ok(Arc::new(LancePushdownScanExec::try_new(
3158 self.dataset.clone(),
3159 fragments,
3160 Arc::new(self.projection_plan.physical_projection.to_bare_schema()),
3161 filter_plan.refine_expr.clone().unwrap(),
3162 config,
3163 )?))
3164 }
3165
3166 fn flat_knn(&self, input: Arc<dyn ExecutionPlan>, q: &Query) -> Result<Arc<dyn ExecutionPlan>> {
3168 let flat_dist = Arc::new(KNNVectorDistanceExec::try_new(
3169 input,
3170 &q.column,
3171 q.key.clone(),
3172 q.metric_type,
3173 )?);
3174
3175 let lower: Option<(Expr, Arc<dyn PhysicalExpr>)> = q
3176 .lower_bound
3177 .map(|v| -> Result<(Expr, Arc<dyn PhysicalExpr>)> {
3178 let logical = col(DIST_COL).gt_eq(lit(v));
3179 let schema = flat_dist.schema();
3180 let df_schema = DFSchema::try_from(schema)?;
3181 let physical = create_physical_expr(&logical, &df_schema, &ExecutionProps::new())?;
3182 Ok::<(Expr, Arc<dyn PhysicalExpr>), _>((logical, physical))
3183 })
3184 .transpose()?;
3185
3186 let upper = q
3187 .upper_bound
3188 .map(|v| -> Result<(Expr, Arc<dyn PhysicalExpr>)> {
3189 let logical = col(DIST_COL).lt(lit(v));
3190 let schema = flat_dist.schema();
3191 let df_schema = DFSchema::try_from(schema)?;
3192 let physical = create_physical_expr(&logical, &df_schema, &ExecutionProps::new())?;
3193 Ok::<(Expr, Arc<dyn PhysicalExpr>), _>((logical, physical))
3194 })
3195 .transpose()?;
3196
3197 let filter_expr = match (lower, upper) {
3198 (Some((llog, _)), Some((ulog, _))) => {
3199 let logical = llog.and(ulog);
3200 let schema = flat_dist.schema();
3201 let df_schema = DFSchema::try_from(schema)?;
3202 let physical = create_physical_expr(&logical, &df_schema, &ExecutionProps::new())?;
3203 Some((logical, physical))
3204 }
3205 (Some((llog, lphys)), None) => Some((llog, lphys)),
3206 (None, Some((ulog, uphys))) => Some((ulog, uphys)),
3207 (None, None) => None,
3208 };
3209
3210 let knn_plan: Arc<dyn ExecutionPlan> = if let Some(filter_expr) = filter_expr {
3211 Arc::new(LanceFilterExec::try_new(filter_expr.0, flat_dist)?)
3212 } else {
3213 flat_dist
3214 };
3215
3216 let sort = SortExec::new(
3218 [
3219 PhysicalSortExpr {
3220 expr: expressions::col(DIST_COL, knn_plan.schema().as_ref())?,
3221 options: SortOptions {
3222 descending: false,
3223 nulls_first: false,
3224 },
3225 },
3226 PhysicalSortExpr {
3227 expr: expressions::col(ROW_ID, knn_plan.schema().as_ref())?,
3228 options: SortOptions {
3229 descending: false,
3230 nulls_first: false,
3231 },
3232 },
3233 ]
3234 .into(),
3235 knn_plan,
3236 )
3237 .with_fetch(Some(q.k));
3238
3239 let logical_not_null = col(DIST_COL).is_not_null();
3240 let not_nulls = Arc::new(LanceFilterExec::try_new(logical_not_null, Arc::new(sort))?);
3241
3242 Ok(not_nulls)
3243 }
3244
3245 fn get_fragments_as_bitmap(&self) -> RoaringBitmap {
3246 if let Some(fragments) = &self.fragments {
3247 RoaringBitmap::from_iter(fragments.iter().map(|f| f.id as u32))
3248 } else {
3249 RoaringBitmap::from_iter(self.dataset.fragments().iter().map(|f| f.id as u32))
3250 }
3251 }
3252
3253 fn get_indexed_frags(&self, index: &[IndexMetadata]) -> RoaringBitmap {
3254 let all_fragments = self.get_fragments_as_bitmap();
3255
3256 let mut all_indexed_frags = RoaringBitmap::new();
3257 for idx in index {
3258 if let Some(fragmap) = idx.fragment_bitmap.as_ref() {
3259 all_indexed_frags |= fragmap;
3260 } else {
3261 return all_fragments;
3264 }
3265 }
3266
3267 all_indexed_frags & all_fragments
3268 }
3269
3270 async fn ann(
3272 &self,
3273 q: &Query,
3274 index: &[IndexMetadata],
3275 filter_plan: &FilterPlan,
3276 ) -> Result<Arc<dyn ExecutionPlan>> {
3277 let prefilter_source = self
3278 .prefilter_source(filter_plan, self.get_indexed_frags(index))
3279 .await?;
3280 let inner_fanout_search = new_knn_exec(self.dataset.clone(), index, q, prefilter_source)?;
3281 let sort_expr = PhysicalSortExpr {
3282 expr: expressions::col(DIST_COL, inner_fanout_search.schema().as_ref())?,
3283 options: SortOptions {
3284 descending: false,
3285 nulls_first: false,
3286 },
3287 };
3288 let sort_expr_row_id = PhysicalSortExpr {
3289 expr: expressions::col(ROW_ID, inner_fanout_search.schema().as_ref())?,
3290 options: SortOptions {
3291 descending: false,
3292 nulls_first: false,
3293 },
3294 };
3295 Ok(Arc::new(
3296 SortExec::new([sort_expr, sort_expr_row_id].into(), inner_fanout_search)
3297 .with_fetch(Some(q.k * q.refine_factor.unwrap_or(1) as usize)),
3298 ))
3299 }
3300
3301 async fn multivec_ann(
3303 &self,
3304 q: &Query,
3305 index: &[IndexMetadata],
3306 filter_plan: &FilterPlan,
3307 ) -> Result<Arc<dyn ExecutionPlan>> {
3308 let over_fetch_factor = *DEFAULT_XTR_OVERFETCH;
3313
3314 let prefilter_source = self
3315 .prefilter_source(filter_plan, self.get_indexed_frags(index))
3316 .await?;
3317 let dim = get_vector_dim(self.dataset.schema(), &q.column)?;
3318
3319 let num_queries = q.key.len() / dim;
3320 let new_queries = (0..num_queries)
3321 .map(|i| q.key.slice(i * dim, dim))
3322 .map(|query_vec| {
3323 let mut new_query = q.clone();
3324 new_query.key = query_vec;
3325 new_query.refine_factor = Some(over_fetch_factor);
3329 new_query
3330 });
3331 let mut ann_nodes = Vec::with_capacity(new_queries.len());
3332 for query in new_queries {
3333 let ann_node = new_knn_exec(
3335 self.dataset.clone(),
3336 index,
3337 &query,
3338 prefilter_source.clone(),
3339 )?;
3340 let sort_expr = PhysicalSortExpr {
3341 expr: expressions::col(DIST_COL, ann_node.schema().as_ref())?,
3342 options: SortOptions {
3343 descending: false,
3344 nulls_first: false,
3345 },
3346 };
3347 let sort_expr_row_id = PhysicalSortExpr {
3348 expr: expressions::col(ROW_ID, ann_node.schema().as_ref())?,
3349 options: SortOptions {
3350 descending: false,
3351 nulls_first: false,
3352 },
3353 };
3354 let ann_node = Arc::new(
3355 SortExec::new([sort_expr, sort_expr_row_id].into(), ann_node)
3356 .with_fetch(Some(q.k * over_fetch_factor as usize)),
3357 );
3358 ann_nodes.push(ann_node as Arc<dyn ExecutionPlan>);
3359 }
3360
3361 let ann_node = Arc::new(MultivectorScoringExec::try_new(ann_nodes, q.clone())?);
3362
3363 let sort_expr = PhysicalSortExpr {
3364 expr: expressions::col(DIST_COL, ann_node.schema().as_ref())?,
3365 options: SortOptions {
3366 descending: false,
3367 nulls_first: false,
3368 },
3369 };
3370 let sort_expr_row_id = PhysicalSortExpr {
3371 expr: expressions::col(ROW_ID, ann_node.schema().as_ref())?,
3372 options: SortOptions {
3373 descending: false,
3374 nulls_first: false,
3375 },
3376 };
3377 let ann_node = Arc::new(
3378 SortExec::new([sort_expr, sort_expr_row_id].into(), ann_node)
3379 .with_fetch(Some(q.k * q.refine_factor.unwrap_or(1) as usize)),
3380 );
3381
3382 Ok(ann_node)
3383 }
3384
3385 async fn prefilter_source(
3390 &self,
3391 filter_plan: &FilterPlan,
3392 required_frags: RoaringBitmap,
3393 ) -> Result<PreFilterSource> {
3394 if filter_plan.is_empty() {
3395 log::trace!("no filter plan, no prefilter");
3396 return Ok(PreFilterSource::None);
3397 }
3398
3399 let fragments = Arc::new(
3400 self.dataset
3401 .manifest
3402 .fragments
3403 .iter()
3404 .filter(|f| required_frags.contains(f.id as u32))
3405 .cloned()
3406 .collect::<Vec<_>>(),
3407 );
3408
3409 if filter_plan.is_exact_index_search() && self.fragments.is_none() {
3415 let index_query = filter_plan.index_query.as_ref().expect_ok()?;
3416 let (_, missing_frags) = self
3417 .partition_frags_by_coverage(index_query, fragments.clone())
3418 .await?;
3419
3420 if missing_frags.is_empty() {
3421 log::trace!("prefilter entirely satisfied by exact index search");
3422 return Ok(PreFilterSource::ScalarIndexQuery(Arc::new(
3427 ScalarIndexExec::new(self.dataset.clone(), index_query.clone()),
3428 )));
3429 } else {
3430 log::trace!("exact index search did not cover all fragments");
3431 }
3432 }
3433
3434 log::trace!(
3436 "prefilter is a filtered read of {} fragments",
3437 fragments.len()
3438 );
3439 let PlannedFilteredScan { plan, .. } = self
3440 .filtered_read(
3441 filter_plan,
3442 self.dataset.empty_projection().with_row_id(),
3443 false,
3444 Some(fragments),
3445 None,
3446 true,
3447 )
3448 .await?;
3449 Ok(PreFilterSource::FilteredRowIds(plan))
3450 }
3451
3452 fn take(
3454 &self,
3455 input: Arc<dyn ExecutionPlan>,
3456 output_projection: Projection,
3457 ) -> Result<Arc<dyn ExecutionPlan>> {
3458 let coalesced = Arc::new(CoalesceBatchesExec::new(
3459 input.clone(),
3460 self.get_batch_size(),
3461 ));
3462 if let Some(take_plan) =
3463 TakeExec::try_new(self.dataset.clone(), coalesced, output_projection)?
3464 {
3465 Ok(Arc::new(take_plan))
3466 } else {
3467 Ok(input)
3469 }
3470 }
3471
3472 fn limit_node(&self, plan: Arc<dyn ExecutionPlan>) -> Arc<dyn ExecutionPlan> {
3474 Arc::new(GlobalLimitExec::new(
3475 plan,
3476 *self.offset.as_ref().unwrap_or(&0) as usize,
3477 self.limit.map(|l| l as usize),
3478 ))
3479 }
3480
3481 #[instrument(level = "info", skip(self))]
3482 pub async fn analyze_plan(&self) -> Result<String> {
3483 let plan = self.create_plan().await?;
3484 let res = analyze_plan(
3485 plan,
3486 LanceExecutionOptions {
3487 batch_size: self.batch_size,
3488 ..Default::default()
3489 },
3490 )
3491 .await;
3492 res
3493 }
3494
3495 #[instrument(level = "info", skip(self))]
3496 pub async fn explain_plan(&self, verbose: bool) -> Result<String> {
3497 let plan = self.create_plan().await?;
3498 let display = DisplayableExecutionPlan::new(plan.as_ref());
3499
3500 Ok(format!("{}", display.indent(verbose)))
3501 }
3502}
3503
3504#[pin_project::pin_project]
3508pub struct DatasetRecordBatchStream {
3509 #[pin]
3510 exec_node: SendableRecordBatchStream,
3511 span: Span,
3512}
3513
3514impl DatasetRecordBatchStream {
3515 pub fn new(exec_node: SendableRecordBatchStream) -> Self {
3516 let exec_node = wrap_json_stream_for_reading(exec_node);
3520
3521 let span = info_span!("DatasetRecordBatchStream");
3522 Self { exec_node, span }
3523 }
3524}
3525
3526impl RecordBatchStream for DatasetRecordBatchStream {
3527 fn schema(&self) -> SchemaRef {
3528 self.exec_node.schema()
3529 }
3530}
3531
3532impl Stream for DatasetRecordBatchStream {
3533 type Item = Result<RecordBatch>;
3534
3535 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
3536 let mut this = self.project();
3537 let _guard = this.span.enter();
3538 match this.exec_node.poll_next_unpin(cx) {
3539 Poll::Ready(result) => {
3540 Poll::Ready(result.map(|r| r.map_err(|e| Error::io(e.to_string(), location!()))))
3541 }
3542 Poll::Pending => Poll::Pending,
3543 }
3544 }
3545}
3546
3547impl From<DatasetRecordBatchStream> for SendableRecordBatchStream {
3548 fn from(stream: DatasetRecordBatchStream) -> Self {
3549 stream.exec_node
3550 }
3551}
3552
3553#[cfg(test)]
3554pub mod test_dataset {
3555
3556 use super::*;
3557
3558 use std::{collections::HashMap, vec};
3559
3560 use arrow_array::{
3561 ArrayRef, FixedSizeListArray, Int32Array, RecordBatch, RecordBatchIterator, StringArray,
3562 };
3563 use arrow_schema::{ArrowError, DataType};
3564 use lance_core::utils::tempfile::TempStrDir;
3565 use lance_file::version::LanceFileVersion;
3566 use lance_index::{
3567 scalar::{inverted::tokenizer::InvertedIndexParams, ScalarIndexParams},
3568 IndexType,
3569 };
3570
3571 use crate::arrow::*;
3572 use crate::dataset::WriteParams;
3573 use crate::index::vector::VectorIndexParams;
3574
3575 pub struct TestVectorDataset {
3585 pub tmp_dir: TempStrDir,
3586 pub schema: Arc<ArrowSchema>,
3587 pub dataset: Dataset,
3588 dimension: u32,
3589 }
3590
3591 impl TestVectorDataset {
3592 pub async fn new(
3593 data_storage_version: LanceFileVersion,
3594 stable_row_ids: bool,
3595 ) -> Result<Self> {
3596 Self::new_with_dimension(data_storage_version, stable_row_ids, 32).await
3597 }
3598
3599 pub async fn new_with_dimension(
3600 data_storage_version: LanceFileVersion,
3601 stable_row_ids: bool,
3602 dimension: u32,
3603 ) -> Result<Self> {
3604 let path = TempStrDir::default();
3605
3606 let metadata: HashMap<String, String> =
3608 vec![("dataset".to_string(), "vector".to_string())]
3609 .into_iter()
3610 .collect();
3611
3612 let schema = Arc::new(ArrowSchema::new_with_metadata(
3613 vec![
3614 ArrowField::new("i", DataType::Int32, true),
3615 ArrowField::new("s", DataType::Utf8, true),
3616 ArrowField::new(
3617 "vec",
3618 DataType::FixedSizeList(
3619 Arc::new(ArrowField::new("item", DataType::Float32, true)),
3620 dimension as i32,
3621 ),
3622 true,
3623 ),
3624 ],
3625 metadata,
3626 ));
3627
3628 let batches: Vec<RecordBatch> = (0..5)
3629 .map(|i| {
3630 let vector_values: Float32Array =
3631 (0..dimension * 80).map(|v| v as f32).collect();
3632 let vectors =
3633 FixedSizeListArray::try_new_from_values(vector_values, dimension as i32)
3634 .unwrap();
3635 RecordBatch::try_new(
3636 schema.clone(),
3637 vec![
3638 Arc::new(Int32Array::from_iter_values(i * 80..(i + 1) * 80)),
3639 Arc::new(StringArray::from_iter_values(
3640 (i * 80..(i + 1) * 80).map(|v| format!("s-{}", v)),
3641 )),
3642 Arc::new(vectors),
3643 ],
3644 )
3645 })
3646 .collect::<std::result::Result<Vec<_>, ArrowError>>()?;
3647
3648 let params = WriteParams {
3649 max_rows_per_group: 10,
3650 max_rows_per_file: 200,
3651 data_storage_version: Some(data_storage_version),
3652 enable_stable_row_ids: stable_row_ids,
3653 ..Default::default()
3654 };
3655 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
3656
3657 let dataset = Dataset::write(reader, &path, Some(params)).await?;
3658
3659 Ok(Self {
3660 tmp_dir: path,
3661 schema,
3662 dataset,
3663 dimension,
3664 })
3665 }
3666
3667 pub async fn make_vector_index(&mut self) -> Result<()> {
3668 let params = VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2);
3669 self.dataset
3670 .create_index(
3671 &["vec"],
3672 IndexType::Vector,
3673 Some("idx".to_string()),
3674 ¶ms,
3675 true,
3676 )
3677 .await
3678 }
3679
3680 pub async fn make_scalar_index(&mut self) -> Result<()> {
3681 self.dataset
3682 .create_index(
3683 &["i"],
3684 IndexType::Scalar,
3685 None,
3686 &ScalarIndexParams::default(),
3687 true,
3688 )
3689 .await
3690 }
3691
3692 pub async fn make_fts_index(&mut self) -> Result<()> {
3693 self.dataset
3694 .create_index(
3695 &["s"],
3696 IndexType::Inverted,
3697 None,
3698 &InvertedIndexParams::default(),
3699 true,
3700 )
3701 .await
3702 }
3703
3704 pub async fn append_new_data(&mut self) -> Result<()> {
3705 let vector_values: Float32Array = (0..10)
3706 .flat_map(|i| vec![i as f32; self.dimension as usize].into_iter())
3707 .collect();
3708 let new_vectors =
3709 FixedSizeListArray::try_new_from_values(vector_values, self.dimension as i32)
3710 .unwrap();
3711 let new_data: Vec<ArrayRef> = vec![
3712 Arc::new(Int32Array::from_iter_values(400..410)), Arc::new(StringArray::from_iter_values(
3714 (400..410).map(|v| format!("s-{}", v)),
3715 )),
3716 Arc::new(new_vectors),
3717 ];
3718 let reader = RecordBatchIterator::new(
3719 vec![RecordBatch::try_new(self.schema.clone(), new_data).unwrap()]
3720 .into_iter()
3721 .map(Ok),
3722 self.schema.clone(),
3723 );
3724 self.dataset.append(reader, None).await?;
3725 Ok(())
3726 }
3727 }
3728}
3729
3730#[cfg(test)]
3731mod test {
3732
3733 use std::collections::BTreeSet;
3734 use std::sync::Mutex;
3735 use std::vec;
3736
3737 use arrow::array::as_primitive_array;
3738 use arrow::datatypes::{Int32Type, Int64Type};
3739 use arrow_array::cast::AsArray;
3740 use arrow_array::types::{Float32Type, UInt64Type};
3741 use arrow_array::{
3742 ArrayRef, FixedSizeListArray, Float16Array, Int32Array, Int64Array, LargeStringArray,
3743 ListArray, PrimitiveArray, RecordBatchIterator, StringArray, StructArray,
3744 };
3745 use arrow_buffer::{OffsetBuffer, ScalarBuffer};
3746 use arrow_ord::sort::sort_to_indices;
3747 use arrow_schema::Fields;
3748 use arrow_select::take;
3749 use datafusion::logical_expr::{col, lit};
3750 use half::f16;
3751 use lance_arrow::SchemaExt;
3752 use lance_core::utils::tempfile::TempStrDir;
3753 use lance_datagen::{array, gen_batch, BatchCount, ByteCount, Dimension, RowCount};
3754 use lance_file::version::LanceFileVersion;
3755 use lance_index::scalar::inverted::query::{MatchQuery, PhraseQuery};
3756 use lance_index::vector::hnsw::builder::HnswBuildParams;
3757 use lance_index::vector::ivf::IvfBuildParams;
3758 use lance_index::vector::pq::PQBuildParams;
3759 use lance_index::vector::sq::builder::SQBuildParams;
3760 use lance_index::{scalar::ScalarIndexParams, IndexType};
3761 use lance_io::object_store::ObjectStoreParams;
3762 use lance_linalg::distance::DistanceType;
3763 use lance_testing::datagen::{BatchGenerator, IncrementingInt32, RandomVector};
3764 use rstest::rstest;
3765
3766 use super::*;
3767 use crate::arrow::*;
3768 use crate::dataset::optimize::{compact_files, CompactionOptions};
3769 use crate::dataset::scanner::test_dataset::TestVectorDataset;
3770 use crate::dataset::WriteMode;
3771 use crate::dataset::WriteParams;
3772 use crate::index::vector::{StageParams, VectorIndexParams};
3773 use crate::utils::test::{
3774 assert_plan_node_equals, DatagenExt, FragmentCount, FragmentRowCount, IoStats,
3775 IoTrackingStore,
3776 };
3777
3778 #[tokio::test]
3779 async fn test_batch_size() {
3780 let schema = Arc::new(ArrowSchema::new(vec![
3781 ArrowField::new("i", DataType::Int32, true),
3782 ArrowField::new("s", DataType::Utf8, true),
3783 ]));
3784
3785 let batches: Vec<RecordBatch> = (0..5)
3786 .map(|i| {
3787 RecordBatch::try_new(
3788 schema.clone(),
3789 vec![
3790 Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)),
3791 Arc::new(StringArray::from_iter_values(
3792 (i * 20..(i + 1) * 20).map(|v| format!("s-{}", v)),
3793 )),
3794 ],
3795 )
3796 .unwrap()
3797 })
3798 .collect();
3799
3800 for use_filter in [false, true] {
3801 let test_dir = TempStrDir::default();
3802 let test_uri = &test_dir;
3803 let write_params = WriteParams {
3804 max_rows_per_file: 40,
3805 max_rows_per_group: 10,
3806 ..Default::default()
3807 };
3808 let batches =
3809 RecordBatchIterator::new(batches.clone().into_iter().map(Ok), schema.clone());
3810 Dataset::write(batches, test_uri, Some(write_params))
3811 .await
3812 .unwrap();
3813
3814 let dataset = Dataset::open(test_uri).await.unwrap();
3815 let mut builder = dataset.scan();
3816 builder.batch_size(8);
3817 if use_filter {
3818 builder.filter("i IS NOT NULL").unwrap();
3819 }
3820 let mut stream = builder.try_into_stream().await.unwrap();
3821 let mut rows_read = 0;
3822 while let Some(next) = stream.next().await {
3823 let next = next.unwrap();
3824 let expected = 8.min(100 - rows_read);
3825 assert_eq!(next.num_rows(), expected);
3826 rows_read += next.num_rows();
3827 }
3828 }
3829 }
3830
3831 #[tokio::test]
3832 async fn test_strict_batch_size() {
3833 let dataset = lance_datagen::gen_batch()
3834 .col("x", array::step::<Int32Type>())
3835 .anon_col(array::step::<Int64Type>())
3836 .into_ram_dataset(FragmentCount::from(7), FragmentRowCount::from(6))
3837 .await
3838 .unwrap();
3839
3840 let mut scan = dataset.scan();
3841 scan.batch_size(10)
3842 .strict_batch_size(true)
3843 .filter("x % 2 == 0")
3844 .unwrap();
3845
3846 let batches = scan
3847 .try_into_stream()
3848 .await
3849 .unwrap()
3850 .try_collect::<Vec<_>>()
3851 .await
3852 .unwrap();
3853
3854 let batch_sizes = batches.iter().map(|b| b.num_rows()).collect::<Vec<_>>();
3855 assert_eq!(batch_sizes, vec![10, 10, 1]);
3856 }
3857
3858 #[tokio::test]
3859 async fn test_column_not_exist() {
3860 let dataset = lance_datagen::gen_batch()
3861 .col("x", array::step::<Int32Type>())
3862 .into_ram_dataset(FragmentCount::from(7), FragmentRowCount::from(6))
3863 .await
3864 .unwrap();
3865
3866 let check_err_msg = |r: Result<DatasetRecordBatchStream>| {
3867 let Err(err) = r else {
3868 panic!(
3869 "Expected an error to be raised saying column y is not found but got no error"
3870 )
3871 };
3872
3873 assert!(
3874 err.to_string().contains("No field named y"),
3875 "Expected error to contain 'No field named y' but got {}",
3876 err
3877 );
3878 };
3879
3880 let mut scan = dataset.scan();
3881 scan.project(&["x", "y"]).unwrap();
3882 check_err_msg(scan.try_into_stream().await);
3883
3884 let mut scan = dataset.scan();
3885 scan.project(&["y"]).unwrap();
3886 check_err_msg(scan.try_into_stream().await);
3887
3888 let mut scan = dataset.scan();
3891 scan.project_with_transform(&[("foo", "1")]).unwrap();
3892 match scan.try_into_stream().await {
3893 Ok(_) => panic!("Expected an error to be raised saying not supported"),
3894 Err(e) => {
3895 assert!(
3896 e.to_string().contains("Received only dynamic expressions"),
3897 "Expected error to contain 'Received only dynamic expressions' but got {}",
3898 e
3899 );
3900 }
3901 }
3902 }
3903
3904 #[tokio::test]
3905 async fn test_filter_with_nullable_struct_list_schema_mismatch() {
3906 let test_uri = TempStrDir::default();
3907
3908 let struct_fields = Fields::from(vec![
3909 Arc::new(ArrowField::new("company_id", DataType::Int64, true)),
3910 Arc::new(ArrowField::new("company_name", DataType::Utf8, true)),
3911 Arc::new(ArrowField::new("count", DataType::Int64, true)),
3912 ]);
3913 let list_item_field = Arc::new(ArrowField::new(
3914 "item",
3915 DataType::Struct(struct_fields.clone()),
3916 true,
3917 ));
3918
3919 let schema = Arc::new(ArrowSchema::new(vec![
3920 ArrowField::new("cid", DataType::Utf8, false),
3921 ArrowField::new("companies", DataType::List(list_item_field.clone()), true),
3922 ]));
3923
3924 let companies_values: ArrayRef = Arc::new(
3925 StructArray::try_new(
3926 struct_fields.clone(),
3927 vec![
3928 Arc::new(Int64Array::from(vec![
3929 Option::<i64>::None,
3930 None,
3931 None,
3932 None,
3933 Some(100),
3934 Some(101),
3935 ])) as ArrayRef,
3936 Arc::new(StringArray::from(vec![
3937 Some("Google"),
3938 Some("Microsoft"),
3939 None,
3940 None,
3941 Some("Apple"),
3942 Some("Amazon"),
3943 ])) as ArrayRef,
3944 Arc::new(Int64Array::from(vec![
3945 Option::<i64>::None,
3946 None,
3947 None,
3948 None,
3949 Some(50),
3950 Some(60),
3951 ])) as ArrayRef,
3952 ],
3953 None,
3954 )
3955 .unwrap(),
3956 );
3957
3958 let companies_offsets = OffsetBuffer::new(ScalarBuffer::<i32>::from(vec![0, 2, 4, 6]));
3959 let companies_array: ArrayRef = Arc::new(ListArray::new(
3960 list_item_field,
3961 companies_offsets,
3962 companies_values,
3963 None,
3964 ));
3965
3966 let cid_array: ArrayRef = Arc::new(StringArray::from(vec!["1", "2", "3"]));
3967 let batch = RecordBatch::try_new(
3968 schema.clone(),
3969 vec![cid_array.clone(), companies_array.clone()],
3970 )
3971 .unwrap();
3972
3973 let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone());
3974 Dataset::write(reader, &test_uri, None).await.unwrap();
3975
3976 let dataset = Dataset::open(&test_uri).await.unwrap();
3977 let mut scan = dataset.scan();
3978 scan.filter("cid = '1'").unwrap();
3979
3980 let batches = scan
3981 .try_into_stream()
3982 .await
3983 .unwrap()
3984 .try_collect::<Vec<_>>()
3985 .await
3986 .expect("filter should not trigger schema mismatch");
3987
3988 assert_eq!(batches.len(), 1);
3989 let companies_col = batches[0]
3990 .column_by_name("companies")
3991 .expect("companies column missing");
3992 assert_eq!(companies_col.len(), 1);
3993 }
3994
3995 #[cfg(not(windows))]
3996 #[tokio::test]
3997 async fn test_local_object_store() {
3998 let schema = Arc::new(ArrowSchema::new(vec![
3999 ArrowField::new("i", DataType::Int32, true),
4000 ArrowField::new("s", DataType::Utf8, true),
4001 ]));
4002
4003 let batches: Vec<RecordBatch> = (0..5)
4004 .map(|i| {
4005 RecordBatch::try_new(
4006 schema.clone(),
4007 vec![
4008 Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20)),
4009 Arc::new(StringArray::from_iter_values(
4010 (i * 20..(i + 1) * 20).map(|v| format!("s-{}", v)),
4011 )),
4012 ],
4013 )
4014 .unwrap()
4015 })
4016 .collect();
4017
4018 let test_dir = TempStrDir::default();
4019 let test_uri = &test_dir;
4020 let write_params = WriteParams {
4021 max_rows_per_file: 40,
4022 max_rows_per_group: 10,
4023 ..Default::default()
4024 };
4025 let batches = RecordBatchIterator::new(batches.clone().into_iter().map(Ok), schema.clone());
4026 Dataset::write(batches, test_uri, Some(write_params))
4027 .await
4028 .unwrap();
4029
4030 let dataset = Dataset::open(&format!("file-object-store://{}", test_uri))
4031 .await
4032 .unwrap();
4033 let mut builder = dataset.scan();
4034 builder.batch_size(8);
4035 let mut stream = builder.try_into_stream().await.unwrap();
4036 let mut rows_read = 0;
4037 while let Some(next) = stream.next().await {
4038 let next = next.unwrap();
4039 let expected = 8.min(100 - rows_read);
4040 assert_eq!(next.num_rows(), expected);
4041 rows_read += next.num_rows();
4042 }
4043 }
4044
4045 #[tokio::test]
4046 async fn test_filter_parsing() -> Result<()> {
4047 let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false).await?;
4048 let dataset = &test_ds.dataset;
4049
4050 let mut scan = dataset.scan();
4051 assert!(scan.filter.is_none());
4052
4053 scan.filter("i > 50")?;
4054 assert_eq!(scan.get_filter().unwrap(), Some(col("i").gt(lit(50))));
4055
4056 for use_stats in [false, true] {
4057 let batches = scan
4058 .project(&["s"])?
4059 .use_stats(use_stats)
4060 .try_into_stream()
4061 .await?
4062 .try_collect::<Vec<_>>()
4063 .await?;
4064 let batch = concat_batches(&batches[0].schema(), &batches)?;
4065
4066 let expected_batch = RecordBatch::try_new(
4067 Arc::new(test_ds.schema.project(&[1])?),
4069 vec![Arc::new(StringArray::from_iter_values(
4070 (51..400).map(|v| format!("s-{}", v)),
4071 ))],
4072 )?;
4073 assert_eq!(batch, expected_batch);
4074 }
4075 Ok(())
4076 }
4077
4078 #[tokio::test]
4079 async fn test_nested_projection() {
4080 let point_fields: Fields = vec![
4081 ArrowField::new("x", DataType::Float32, true),
4082 ArrowField::new("y", DataType::Float32, true),
4083 ]
4084 .into();
4085 let metadata_fields: Fields = vec![
4086 ArrowField::new("location", DataType::Struct(point_fields), true),
4087 ArrowField::new("age", DataType::Int32, true),
4088 ]
4089 .into();
4090 let metadata_field = ArrowField::new("metadata", DataType::Struct(metadata_fields), true);
4091 let schema = Arc::new(ArrowSchema::new(vec![
4092 metadata_field,
4093 ArrowField::new("idx", DataType::Int32, true),
4094 ]));
4095 let data = lance_datagen::rand(&schema)
4096 .into_ram_dataset(FragmentCount::from(7), FragmentRowCount::from(6))
4097 .await
4098 .unwrap();
4099
4100 let mut scan = data.scan();
4101 scan.project(&["metadata.location.x", "metadata.age"])
4102 .unwrap();
4103 let batch = scan.try_into_batch().await.unwrap();
4104
4105 assert_eq!(
4106 batch.schema().as_ref(),
4107 &ArrowSchema::new(vec![
4108 ArrowField::new("metadata.location.x", DataType::Float32, true),
4109 ArrowField::new("metadata.age", DataType::Int32, true),
4110 ])
4111 );
4112
4113 let take_schema = data.schema().project_by_ids(&[0, 2, 4], false);
4117
4118 let taken = data.take_rows(&[0, 5], take_schema).await.unwrap();
4119
4120 let part_point_fields = Fields::from(vec![ArrowField::new("x", DataType::Float32, true)]);
4122 let part_metadata_fields = Fields::from(vec![
4123 ArrowField::new("location", DataType::Struct(part_point_fields), true),
4124 ArrowField::new("age", DataType::Int32, true),
4125 ]);
4126 let part_schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
4127 "metadata",
4128 DataType::Struct(part_metadata_fields),
4129 true,
4130 )]));
4131
4132 assert_eq!(taken.schema(), part_schema);
4133 }
4134
4135 #[rstest]
4136 #[tokio::test]
4137 async fn test_limit(
4138 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4139 data_storage_version: LanceFileVersion,
4140 ) -> Result<()> {
4141 let test_ds = TestVectorDataset::new(data_storage_version, false).await?;
4142 let dataset = &test_ds.dataset;
4143
4144 let full_data = dataset.scan().try_into_batch().await?.slice(19, 2);
4145
4146 let actual = dataset
4147 .scan()
4148 .limit(Some(2), Some(19))?
4149 .try_into_batch()
4150 .await?;
4151
4152 assert_eq!(actual.num_rows(), 2);
4153 assert_eq!(actual, full_data);
4154 Ok(())
4155 }
4156
4157 #[rstest]
4158 #[tokio::test]
4159 async fn test_knn_nodes(
4160 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4161 data_storage_version: LanceFileVersion,
4162 #[values(false, true)] stable_row_ids: bool,
4163 #[values(false, true)] build_index: bool,
4164 ) {
4165 let mut test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4166 .await
4167 .unwrap();
4168 if build_index {
4169 test_ds.make_vector_index().await.unwrap();
4170 }
4171 let dataset = &test_ds.dataset;
4172
4173 let mut scan = dataset.scan();
4174 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4175 scan.nearest("vec", &key, 5).unwrap();
4176 scan.refine(5);
4177
4178 let batch = scan.try_into_batch().await.unwrap();
4179
4180 assert_eq!(batch.num_rows(), 5);
4181 assert_eq!(
4182 batch.schema().as_ref(),
4183 &ArrowSchema::new(vec![
4184 ArrowField::new("i", DataType::Int32, true),
4185 ArrowField::new("s", DataType::Utf8, true),
4186 ArrowField::new(
4187 "vec",
4188 DataType::FixedSizeList(
4189 Arc::new(ArrowField::new("item", DataType::Float32, true)),
4190 32,
4191 ),
4192 true,
4193 ),
4194 ArrowField::new(DIST_COL, DataType::Float32, true),
4195 ])
4196 .with_metadata([("dataset".into(), "vector".into())].into())
4197 );
4198
4199 let expected_i = BTreeSet::from_iter(vec![1, 81, 161, 241, 321]);
4200 let column_i = batch.column_by_name("i").unwrap();
4201 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
4202 .values()
4203 .iter()
4204 .copied()
4205 .collect();
4206 assert_eq!(expected_i, actual_i);
4207 }
4208
4209 #[rstest]
4210 #[tokio::test]
4211 async fn test_can_project_distance() {
4212 let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, true)
4213 .await
4214 .unwrap();
4215 let dataset = &test_ds.dataset;
4216
4217 let mut scan = dataset.scan();
4218 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4219 scan.nearest("vec", &key, 5).unwrap();
4220 scan.refine(5);
4221 scan.project(&["_distance"]).unwrap();
4222
4223 let batch = scan.try_into_batch().await.unwrap();
4224
4225 assert_eq!(batch.num_rows(), 5);
4226 assert_eq!(batch.num_columns(), 1);
4227 }
4228
4229 #[rstest]
4230 #[tokio::test]
4231 async fn test_knn_with_new_data(
4232 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4233 data_storage_version: LanceFileVersion,
4234 #[values(false, true)] stable_row_ids: bool,
4235 ) {
4236 let mut test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4237 .await
4238 .unwrap();
4239 test_ds.make_vector_index().await.unwrap();
4240 test_ds.append_new_data().await.unwrap();
4241 let dataset = &test_ds.dataset;
4242
4243 let key: Float32Array = [0f32; 32].into_iter().collect();
4245 let k = 20;
4248
4249 #[derive(Debug)]
4250 struct TestCase {
4251 filter: Option<&'static str>,
4252 limit: Option<i64>,
4253 use_index: bool,
4254 }
4255
4256 let mut cases = vec![];
4257 for filter in [Some("i > 100"), None] {
4258 for limit in [None, Some(10)] {
4259 for use_index in [true, false] {
4260 cases.push(TestCase {
4261 filter,
4262 limit,
4263 use_index,
4264 });
4265 }
4266 }
4267 }
4268
4269 for case in cases {
4271 let mut scanner = dataset.scan();
4272 scanner
4273 .nearest("vec", &key, k)
4274 .unwrap()
4275 .limit(case.limit, None)
4276 .unwrap()
4277 .refine(3)
4278 .use_index(case.use_index);
4279 if let Some(filter) = case.filter {
4280 scanner.filter(filter).unwrap();
4281 }
4282
4283 let result = scanner
4284 .try_into_stream()
4285 .await
4286 .unwrap()
4287 .try_collect::<Vec<_>>()
4288 .await
4289 .unwrap();
4290 assert!(!result.is_empty());
4291 let result = concat_batches(&result[0].schema(), result.iter()).unwrap();
4292
4293 if case.filter.is_some() {
4294 let result_rows = result.num_rows();
4295 let expected_rows = case.limit.unwrap_or(k as i64) as usize;
4296 assert!(
4297 result_rows <= expected_rows,
4298 "Expected less than {} rows, got {}",
4299 expected_rows,
4300 result_rows
4301 );
4302 } else {
4303 assert_eq!(result.num_rows(), case.limit.unwrap_or(k as i64) as usize);
4305 }
4306
4307 assert_eq!(
4309 as_primitive_array::<Int32Type>(result.column(0).as_ref()).value(0),
4310 400
4311 );
4312 }
4313 }
4314
4315 #[rstest]
4316 #[tokio::test]
4317 async fn test_knn_with_prefilter(
4318 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4319 data_storage_version: LanceFileVersion,
4320 #[values(false, true)] stable_row_ids: bool,
4321 ) {
4322 let mut test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4323 .await
4324 .unwrap();
4325 test_ds.make_vector_index().await.unwrap();
4326 let dataset = &test_ds.dataset;
4327
4328 let mut scan = dataset.scan();
4329 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4330 scan.filter("i > 100").unwrap();
4331 scan.prefilter(true);
4332 scan.project(&["i", "vec"]).unwrap();
4333 scan.nearest("vec", &key, 5).unwrap();
4334 scan.use_index(false);
4335
4336 let results = scan
4337 .try_into_stream()
4338 .await
4339 .unwrap()
4340 .try_collect::<Vec<_>>()
4341 .await
4342 .unwrap();
4343
4344 assert_eq!(results.len(), 1);
4345 let batch = &results[0];
4346
4347 assert_eq!(batch.num_rows(), 5);
4348 assert_eq!(
4349 batch.schema().as_ref(),
4350 &ArrowSchema::new(vec![
4351 ArrowField::new("i", DataType::Int32, true),
4352 ArrowField::new(
4353 "vec",
4354 DataType::FixedSizeList(
4355 Arc::new(ArrowField::new("item", DataType::Float32, true)),
4356 32,
4357 ),
4358 true,
4359 ),
4360 ArrowField::new(DIST_COL, DataType::Float32, true),
4361 ])
4362 .with_metadata([("dataset".into(), "vector".into())].into())
4363 );
4364
4365 let exact_i = BTreeSet::from_iter(vec![161, 241, 321]);
4367 let close_i = BTreeSet::from_iter(vec![161, 241, 321, 160, 162, 240, 242, 320, 322]);
4369 let column_i = batch.column_by_name("i").unwrap();
4370 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
4371 .values()
4372 .iter()
4373 .copied()
4374 .collect();
4375 assert!(exact_i.is_subset(&actual_i));
4376 assert!(actual_i.is_subset(&close_i));
4377 }
4378
4379 #[rstest]
4380 #[tokio::test]
4381 async fn test_knn_filter_new_data(
4382 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4383 data_storage_version: LanceFileVersion,
4384 #[values(false, true)] stable_row_ids: bool,
4385 ) {
4386 let mut test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4390 .await
4391 .unwrap();
4392 test_ds.make_vector_index().await.unwrap();
4393 test_ds.append_new_data().await.unwrap();
4394 let dataset = &test_ds.dataset;
4395
4396 let key: Float32Array = [0f32; 32].into_iter().collect();
4398
4399 let mut query = dataset.scan();
4400 query.nearest("vec", &key, 20).unwrap();
4401
4402 let results = query
4404 .try_into_stream()
4405 .await
4406 .unwrap()
4407 .try_collect::<Vec<_>>()
4408 .await
4409 .unwrap();
4410
4411 let results_i = results[0]["i"]
4412 .as_primitive::<Int32Type>()
4413 .values()
4414 .iter()
4415 .copied()
4416 .collect::<BTreeSet<_>>();
4417
4418 assert!(results_i.contains(&400));
4419
4420 for prefilter in [false, true] {
4422 let mut query = dataset.scan();
4423 query
4424 .filter("i != 400")
4425 .unwrap()
4426 .prefilter(prefilter)
4427 .nearest("vec", &key, 20)
4428 .unwrap();
4429
4430 let results = query
4431 .try_into_stream()
4432 .await
4433 .unwrap()
4434 .try_collect::<Vec<_>>()
4435 .await
4436 .unwrap();
4437
4438 let results_i = results[0]["i"]
4439 .as_primitive::<Int32Type>()
4440 .values()
4441 .iter()
4442 .copied()
4443 .collect::<BTreeSet<_>>();
4444
4445 assert!(!results_i.contains(&400));
4446 }
4447 }
4448
4449 #[rstest]
4450 #[tokio::test]
4451 async fn test_knn_with_filter(
4452 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4453 data_storage_version: LanceFileVersion,
4454 #[values(false, true)] stable_row_ids: bool,
4455 ) {
4456 let test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4457 .await
4458 .unwrap();
4459 let dataset = &test_ds.dataset;
4460
4461 let mut scan = dataset.scan();
4462 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4463 scan.nearest("vec", &key, 5).unwrap();
4464 scan.filter("i > 100").unwrap();
4465 scan.project(&["i", "vec"]).unwrap();
4466 scan.refine(5);
4467
4468 let results = scan
4469 .try_into_stream()
4470 .await
4471 .unwrap()
4472 .try_collect::<Vec<_>>()
4473 .await
4474 .unwrap();
4475
4476 assert_eq!(results.len(), 1);
4477 let batch = &results[0];
4478
4479 assert_eq!(batch.num_rows(), 3);
4480 assert_eq!(
4481 batch.schema().as_ref(),
4482 &ArrowSchema::new(vec![
4483 ArrowField::new("i", DataType::Int32, true),
4484 ArrowField::new(
4485 "vec",
4486 DataType::FixedSizeList(
4487 Arc::new(ArrowField::new("item", DataType::Float32, true)),
4488 32,
4489 ),
4490 true,
4491 ),
4492 ArrowField::new(DIST_COL, DataType::Float32, true),
4493 ])
4494 .with_metadata([("dataset".into(), "vector".into())].into())
4495 );
4496
4497 let expected_i = BTreeSet::from_iter(vec![161, 241, 321]);
4498 let column_i = batch.column_by_name("i").unwrap();
4499 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
4500 .values()
4501 .iter()
4502 .copied()
4503 .collect();
4504 assert_eq!(expected_i, actual_i);
4505 }
4506
4507 #[rstest]
4508 #[tokio::test]
4509 async fn test_refine_factor(
4510 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4511 data_storage_version: LanceFileVersion,
4512 #[values(false, true)] stable_row_ids: bool,
4513 ) {
4514 let test_ds = TestVectorDataset::new(data_storage_version, stable_row_ids)
4515 .await
4516 .unwrap();
4517 let dataset = &test_ds.dataset;
4518
4519 let mut scan = dataset.scan();
4520 let key: Float32Array = (32..64).map(|v| v as f32).collect();
4521 scan.nearest("vec", &key, 5).unwrap();
4522 scan.refine(5);
4523
4524 let results = scan
4525 .try_into_stream()
4526 .await
4527 .unwrap()
4528 .try_collect::<Vec<_>>()
4529 .await
4530 .unwrap();
4531
4532 assert_eq!(results.len(), 1);
4533 let batch = &results[0];
4534
4535 assert_eq!(batch.num_rows(), 5);
4536 assert_eq!(
4537 batch.schema().as_ref(),
4538 &ArrowSchema::new(vec![
4539 ArrowField::new("i", DataType::Int32, true),
4540 ArrowField::new("s", DataType::Utf8, true),
4541 ArrowField::new(
4542 "vec",
4543 DataType::FixedSizeList(
4544 Arc::new(ArrowField::new("item", DataType::Float32, true)),
4545 32,
4546 ),
4547 true,
4548 ),
4549 ArrowField::new(DIST_COL, DataType::Float32, true),
4550 ])
4551 .with_metadata([("dataset".into(), "vector".into())].into())
4552 );
4553
4554 let expected_i = BTreeSet::from_iter(vec![1, 81, 161, 241, 321]);
4555 let column_i = batch.column_by_name("i").unwrap();
4556 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
4557 .values()
4558 .iter()
4559 .copied()
4560 .collect();
4561 assert_eq!(expected_i, actual_i);
4562 }
4563
4564 #[rstest]
4565 #[tokio::test]
4566 async fn test_only_row_id(
4567 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4568 data_storage_version: LanceFileVersion,
4569 ) {
4570 let test_ds = TestVectorDataset::new(data_storage_version, false)
4571 .await
4572 .unwrap();
4573 let dataset = &test_ds.dataset;
4574
4575 let mut scan = dataset.scan();
4576 scan.project::<&str>(&[]).unwrap().with_row_id();
4577
4578 let batch = scan.try_into_batch().await.unwrap();
4579
4580 assert_eq!(batch.num_columns(), 1);
4581 assert_eq!(batch.num_rows(), 400);
4582 let expected_schema =
4583 ArrowSchema::new(vec![ArrowField::new(ROW_ID, DataType::UInt64, true)])
4584 .with_metadata(dataset.schema().metadata.clone());
4585 assert_eq!(batch.schema().as_ref(), &expected_schema,);
4586
4587 let expected_row_ids: Vec<u64> = (0..200_u64).chain((1 << 32)..((1 << 32) + 200)).collect();
4588 let actual_row_ids: Vec<u64> = as_primitive_array::<UInt64Type>(batch.column(0).as_ref())
4589 .values()
4590 .iter()
4591 .copied()
4592 .collect();
4593 assert_eq!(expected_row_ids, actual_row_ids);
4594 }
4595
4596 #[tokio::test]
4597 async fn test_scan_unordered_with_row_id() {
4598 let test_ds = TestVectorDataset::new(LanceFileVersion::Legacy, false)
4600 .await
4601 .unwrap();
4602 let dataset = &test_ds.dataset;
4603
4604 let mut scan = dataset.scan();
4605 scan.with_row_id();
4606
4607 let ordered_batches = scan
4608 .try_into_stream()
4609 .await
4610 .unwrap()
4611 .try_collect::<Vec<RecordBatch>>()
4612 .await
4613 .unwrap();
4614 assert!(ordered_batches.len() > 2);
4615 let ordered_batch =
4616 concat_batches(&ordered_batches[0].schema(), ordered_batches.iter()).unwrap();
4617
4618 scan.scan_in_order(false);
4620 for _ in 0..10 {
4621 let unordered_batches = scan
4622 .try_into_stream()
4623 .await
4624 .unwrap()
4625 .try_collect::<Vec<RecordBatch>>()
4626 .await
4627 .unwrap();
4628 let unordered_batch =
4629 concat_batches(&unordered_batches[0].schema(), unordered_batches.iter()).unwrap();
4630
4631 assert_eq!(ordered_batch.num_rows(), unordered_batch.num_rows());
4632
4633 if ordered_batch != unordered_batch {
4635 let sort_indices = sort_to_indices(&unordered_batch[ROW_ID], None, None).unwrap();
4636
4637 let ordered_i = ordered_batch["i"].clone();
4638 let sorted_i = take::take(&unordered_batch["i"], &sort_indices, None).unwrap();
4639
4640 assert_eq!(&ordered_i, &sorted_i);
4641
4642 break;
4643 }
4644 }
4645 }
4646
4647 #[rstest]
4648 #[tokio::test]
4649 async fn test_scan_order(
4650 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4651 data_storage_version: LanceFileVersion,
4652 ) {
4653 let test_dir = TempStrDir::default();
4654 let test_uri = &test_dir;
4655
4656 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
4657 "i",
4658 DataType::Int32,
4659 true,
4660 )]));
4661
4662 let batch1 = RecordBatch::try_new(
4663 schema.clone(),
4664 vec![Arc::new(Int32Array::from(vec![1, 2, 3, 4, 5]))],
4665 )
4666 .unwrap();
4667
4668 let batch2 = RecordBatch::try_new(
4669 schema.clone(),
4670 vec![Arc::new(Int32Array::from(vec![6, 7, 8]))],
4671 )
4672 .unwrap();
4673
4674 let params = WriteParams {
4675 mode: WriteMode::Append,
4676 data_storage_version: Some(data_storage_version),
4677 ..Default::default()
4678 };
4679
4680 let write_batch = |batch: RecordBatch| async {
4681 let reader = RecordBatchIterator::new(vec![Ok(batch)], schema.clone());
4682 Dataset::write(reader, test_uri, Some(params)).await
4683 };
4684
4685 write_batch.clone()(batch1.clone()).await.unwrap();
4686 write_batch(batch2.clone()).await.unwrap();
4687
4688 let dataset = Arc::new(Dataset::open(test_uri).await.unwrap());
4689 let fragment1 = dataset.get_fragment(0).unwrap().metadata().clone();
4690 let fragment2 = dataset.get_fragment(1).unwrap().metadata().clone();
4691
4692 let mut scanner = dataset.scan();
4694 scanner.with_fragments(vec![fragment1.clone(), fragment2.clone()]);
4695 let output = scanner
4696 .try_into_stream()
4697 .await
4698 .unwrap()
4699 .try_collect::<Vec<_>>()
4700 .await
4701 .unwrap();
4702 assert_eq!(output.len(), 2);
4703 assert_eq!(output[0], batch1);
4704 assert_eq!(output[1], batch2);
4705
4706 let mut scanner = dataset.scan();
4708 scanner.with_fragments(vec![fragment2, fragment1]);
4709 let output = scanner
4710 .try_into_stream()
4711 .await
4712 .unwrap()
4713 .try_collect::<Vec<_>>()
4714 .await
4715 .unwrap();
4716 assert_eq!(output.len(), 2);
4717 assert_eq!(output[0], batch2);
4718 assert_eq!(output[1], batch1);
4719 }
4720
4721 #[rstest]
4722 #[tokio::test]
4723 async fn test_scan_sort(
4724 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4725 data_storage_version: LanceFileVersion,
4726 ) {
4727 let test_dir = TempStrDir::default();
4728 let test_uri = &test_dir;
4729
4730 let data = gen_batch()
4731 .col("int", array::cycle::<Int32Type>(vec![5, 4, 1, 2, 3]))
4732 .col(
4733 "str",
4734 array::cycle_utf8_literals(&["a", "b", "c", "e", "d"]),
4735 );
4736
4737 let sorted_by_int = gen_batch()
4738 .col("int", array::cycle::<Int32Type>(vec![1, 2, 3, 4, 5]))
4739 .col(
4740 "str",
4741 array::cycle_utf8_literals(&["c", "e", "d", "b", "a"]),
4742 )
4743 .into_batch_rows(RowCount::from(5))
4744 .unwrap();
4745
4746 let sorted_by_str = gen_batch()
4747 .col("int", array::cycle::<Int32Type>(vec![5, 4, 1, 3, 2]))
4748 .col(
4749 "str",
4750 array::cycle_utf8_literals(&["a", "b", "c", "d", "e"]),
4751 )
4752 .into_batch_rows(RowCount::from(5))
4753 .unwrap();
4754
4755 Dataset::write(
4756 data.into_reader_rows(RowCount::from(5), BatchCount::from(1)),
4757 test_uri,
4758 Some(WriteParams {
4759 data_storage_version: Some(data_storage_version),
4760 ..Default::default()
4761 }),
4762 )
4763 .await
4764 .unwrap();
4765
4766 let dataset = Arc::new(Dataset::open(test_uri).await.unwrap());
4767
4768 let batches_by_int = dataset
4769 .scan()
4770 .order_by(Some(vec![ColumnOrdering::asc_nulls_first(
4771 "int".to_string(),
4772 )]))
4773 .unwrap()
4774 .try_into_stream()
4775 .await
4776 .unwrap()
4777 .try_collect::<Vec<_>>()
4778 .await
4779 .unwrap();
4780
4781 assert_eq!(batches_by_int[0], sorted_by_int);
4782
4783 let batches_by_str = dataset
4784 .scan()
4785 .order_by(Some(vec![ColumnOrdering::asc_nulls_first(
4786 "str".to_string(),
4787 )]))
4788 .unwrap()
4789 .try_into_stream()
4790 .await
4791 .unwrap()
4792 .try_collect::<Vec<_>>()
4793 .await
4794 .unwrap();
4795
4796 assert_eq!(batches_by_str[0], sorted_by_str);
4797
4798 dataset
4800 .scan()
4801 .order_by(Some(vec![]))
4802 .unwrap()
4803 .try_into_stream()
4804 .await
4805 .unwrap()
4806 .try_collect::<Vec<_>>()
4807 .await
4808 .unwrap();
4809 }
4810
4811 #[rstest]
4812 #[tokio::test]
4813 async fn test_sort_multi_columns(
4814 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4815 data_storage_version: LanceFileVersion,
4816 ) {
4817 let test_dir = TempStrDir::default();
4818 let test_uri = &test_dir;
4819
4820 let data = gen_batch()
4821 .col("int", array::cycle::<Int32Type>(vec![5, 5, 1, 1, 3]))
4822 .col(
4823 "float",
4824 array::cycle::<Float32Type>(vec![7.3, -f32::NAN, f32::NAN, 4.3, f32::INFINITY]),
4825 );
4826
4827 let sorted_by_int_then_float = gen_batch()
4828 .col("int", array::cycle::<Int32Type>(vec![1, 1, 3, 5, 5]))
4829 .col(
4830 "float",
4831 array::cycle::<Float32Type>(vec![4.3, f32::NAN, f32::INFINITY, -f32::NAN, 7.3]),
4833 )
4834 .into_batch_rows(RowCount::from(5))
4835 .unwrap();
4836
4837 Dataset::write(
4838 data.into_reader_rows(RowCount::from(5), BatchCount::from(1)),
4839 test_uri,
4840 Some(WriteParams {
4841 data_storage_version: Some(data_storage_version),
4842 ..Default::default()
4843 }),
4844 )
4845 .await
4846 .unwrap();
4847
4848 let dataset = Arc::new(Dataset::open(test_uri).await.unwrap());
4849
4850 let batches_by_int_then_float = dataset
4851 .scan()
4852 .order_by(Some(vec![
4853 ColumnOrdering::asc_nulls_first("int".to_string()),
4854 ColumnOrdering::asc_nulls_first("float".to_string()),
4855 ]))
4856 .unwrap()
4857 .try_into_stream()
4858 .await
4859 .unwrap()
4860 .try_collect::<Vec<_>>()
4861 .await
4862 .unwrap();
4863
4864 assert_eq!(batches_by_int_then_float[0], sorted_by_int_then_float);
4865 }
4866
4867 #[rstest]
4868 #[tokio::test]
4869 async fn test_ann_prefilter(
4870 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4871 data_storage_version: LanceFileVersion,
4872 #[values(false, true)] stable_row_ids: bool,
4873 #[values(
4874 VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2),
4875 VectorIndexParams::with_ivf_hnsw_sq_params(
4876 MetricType::L2,
4877 IvfBuildParams::new(2),
4878 HnswBuildParams::default(),
4879 SQBuildParams::default()
4880 )
4881 )]
4882 index_params: VectorIndexParams,
4883 ) {
4884 let test_dir = TempStrDir::default();
4885 let test_uri = &test_dir;
4886
4887 let schema = Arc::new(ArrowSchema::new(vec![
4888 ArrowField::new("filterable", DataType::Int32, true),
4889 ArrowField::new("vector", fixed_size_list_type(2, DataType::Float32), true),
4890 ]));
4891
4892 let vector_values = Float32Array::from_iter_values((0..600).map(|x| x as f32));
4893
4894 let batches = vec![RecordBatch::try_new(
4895 schema.clone(),
4896 vec![
4897 Arc::new(Int32Array::from_iter_values(0..300)),
4898 Arc::new(FixedSizeListArray::try_new_from_values(vector_values, 2).unwrap()),
4899 ],
4900 )
4901 .unwrap()];
4902
4903 let write_params = WriteParams {
4904 data_storage_version: Some(data_storage_version),
4905 max_rows_per_file: 300, enable_stable_row_ids: stable_row_ids,
4907 ..Default::default()
4908 };
4909 let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
4910 let mut dataset = Dataset::write(batches, test_uri, Some(write_params))
4911 .await
4912 .unwrap();
4913
4914 dataset
4915 .create_index(&["vector"], IndexType::Vector, None, &index_params, false)
4916 .await
4917 .unwrap();
4918
4919 let query_key = Arc::new(Float32Array::from_iter_values((0..2).map(|x| x as f32)));
4920 let mut scan = dataset.scan();
4921 scan.filter("filterable > 5").unwrap();
4922 scan.nearest("vector", query_key.as_ref(), 1).unwrap();
4923 scan.minimum_nprobes(100);
4924 scan.with_row_id();
4925
4926 let batches = scan
4927 .try_into_stream()
4928 .await
4929 .unwrap()
4930 .try_collect::<Vec<_>>()
4931 .await
4932 .unwrap();
4933
4934 assert_eq!(batches.len(), 0);
4935
4936 scan.prefilter(true);
4937
4938 let batches = scan
4939 .try_into_stream()
4940 .await
4941 .unwrap()
4942 .try_collect::<Vec<_>>()
4943 .await
4944 .unwrap();
4945 assert_eq!(batches.len(), 1);
4946
4947 let first_match = batches[0][ROW_ID].as_primitive::<UInt64Type>().values()[0];
4948
4949 assert_eq!(6, first_match);
4950 }
4951
4952 #[rstest]
4953 #[tokio::test]
4954 async fn test_filter_on_large_utf8(
4955 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
4956 data_storage_version: LanceFileVersion,
4957 ) {
4958 let test_dir = TempStrDir::default();
4959 let test_uri = &test_dir;
4960
4961 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
4962 "ls",
4963 DataType::LargeUtf8,
4964 true,
4965 )]));
4966
4967 let batches = vec![RecordBatch::try_new(
4968 schema.clone(),
4969 vec![Arc::new(LargeStringArray::from_iter_values(
4970 (0..10).map(|v| format!("s-{}", v)),
4971 ))],
4972 )
4973 .unwrap()];
4974
4975 let write_params = WriteParams {
4976 data_storage_version: Some(data_storage_version),
4977 ..Default::default()
4978 };
4979 let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
4980 Dataset::write(batches, test_uri, Some(write_params))
4981 .await
4982 .unwrap();
4983
4984 let dataset = Dataset::open(test_uri).await.unwrap();
4985 let mut scan = dataset.scan();
4986 scan.filter("ls = 's-8'").unwrap();
4987
4988 let batches = scan
4989 .try_into_stream()
4990 .await
4991 .unwrap()
4992 .try_collect::<Vec<_>>()
4993 .await
4994 .unwrap();
4995 let batch = &batches[0];
4996
4997 let expected = RecordBatch::try_new(
4998 schema.clone(),
4999 vec![Arc::new(LargeStringArray::from_iter_values(
5000 (8..9).map(|v| format!("s-{}", v)),
5001 ))],
5002 )
5003 .unwrap();
5004
5005 assert_eq!(batch, &expected);
5006 }
5007
5008 #[rstest]
5009 #[tokio::test]
5010 async fn test_filter_with_regex(
5011 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5012 data_storage_version: LanceFileVersion,
5013 ) {
5014 let test_dir = TempStrDir::default();
5015 let test_uri = &test_dir;
5016
5017 let schema = Arc::new(ArrowSchema::new(vec![ArrowField::new(
5018 "ls",
5019 DataType::Utf8,
5020 true,
5021 )]));
5022
5023 let batches = vec![RecordBatch::try_new(
5024 schema.clone(),
5025 vec![Arc::new(StringArray::from_iter_values(
5026 (0..20).map(|v| format!("s-{}", v)),
5027 ))],
5028 )
5029 .unwrap()];
5030
5031 let write_params = WriteParams {
5032 data_storage_version: Some(data_storage_version),
5033 ..Default::default()
5034 };
5035 let batches = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
5036 Dataset::write(batches, test_uri, Some(write_params))
5037 .await
5038 .unwrap();
5039
5040 let dataset = Dataset::open(test_uri).await.unwrap();
5041 let mut scan = dataset.scan();
5042 scan.filter("regexp_match(ls, 's-1.')").unwrap();
5043
5044 let stream = scan.try_into_stream().await.unwrap();
5045 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
5046 let batch = &batches[0];
5047
5048 let expected = RecordBatch::try_new(
5049 schema.clone(),
5050 vec![Arc::new(StringArray::from_iter_values(
5051 (10..=19).map(|v| format!("s-{}", v)),
5052 ))],
5053 )
5054 .unwrap();
5055
5056 assert_eq!(batch, &expected);
5057 }
5058
5059 #[tokio::test]
5060 async fn test_filter_proj_bug() {
5061 let struct_i_field = ArrowField::new("i", DataType::Int32, true);
5062 let struct_o_field = ArrowField::new("o", DataType::Utf8, true);
5063 let schema = Arc::new(ArrowSchema::new(vec![
5064 ArrowField::new(
5065 "struct",
5066 DataType::Struct(vec![struct_i_field.clone(), struct_o_field.clone()].into()),
5067 true,
5068 ),
5069 ArrowField::new("s", DataType::Utf8, true),
5070 ]));
5071
5072 let input_batches: Vec<RecordBatch> = (0..5)
5073 .map(|i| {
5074 let struct_i_arr: Arc<Int32Array> =
5075 Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20));
5076 let struct_o_arr: Arc<StringArray> = Arc::new(StringArray::from_iter_values(
5077 (i * 20..(i + 1) * 20).map(|v| format!("o-{:02}", v)),
5078 ));
5079 RecordBatch::try_new(
5080 schema.clone(),
5081 vec![
5082 Arc::new(StructArray::from(vec![
5083 (Arc::new(struct_i_field.clone()), struct_i_arr as ArrayRef),
5084 (Arc::new(struct_o_field.clone()), struct_o_arr as ArrayRef),
5085 ])),
5086 Arc::new(StringArray::from_iter_values(
5087 (i * 20..(i + 1) * 20).map(|v| format!("s-{}", v)),
5088 )),
5089 ],
5090 )
5091 .unwrap()
5092 })
5093 .collect();
5094 let batches =
5095 RecordBatchIterator::new(input_batches.clone().into_iter().map(Ok), schema.clone());
5096 let test_dir = TempStrDir::default();
5097 let test_uri = &test_dir;
5098 let write_params = WriteParams {
5099 max_rows_per_file: 40,
5100 max_rows_per_group: 10,
5101 data_storage_version: Some(LanceFileVersion::Legacy),
5102 ..Default::default()
5103 };
5104 Dataset::write(batches, test_uri, Some(write_params))
5105 .await
5106 .unwrap();
5107
5108 let dataset = Dataset::open(test_uri).await.unwrap();
5109 let batches = dataset
5110 .scan()
5111 .filter("struct.i >= 20")
5112 .unwrap()
5113 .try_into_stream()
5114 .await
5115 .unwrap()
5116 .try_collect::<Vec<_>>()
5117 .await
5118 .unwrap();
5119 let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
5120
5121 let expected_batch = concat_batches(&schema, &input_batches.as_slice()[1..]).unwrap();
5122 assert_eq!(batch, expected_batch);
5123
5124 let batches = dataset
5126 .scan()
5127 .filter("struct.o >= 'o-20'")
5128 .unwrap()
5129 .try_into_stream()
5130 .await
5131 .unwrap()
5132 .try_collect::<Vec<_>>()
5133 .await
5134 .unwrap();
5135 let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
5136 assert_eq!(batch, expected_batch);
5137
5138 let batches = dataset
5140 .scan()
5141 .project(vec!["struct"].as_slice())
5142 .unwrap()
5143 .try_into_stream()
5144 .await
5145 .unwrap()
5146 .try_collect::<Vec<_>>()
5147 .await
5148 .unwrap();
5149 concat_batches(&batches[0].schema(), &batches).unwrap();
5150 }
5151
5152 #[rstest]
5153 #[tokio::test]
5154 async fn test_ann_with_deletion(
5155 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5156 data_storage_version: LanceFileVersion,
5157 #[values(false, true)] stable_row_ids: bool,
5158 ) {
5159 let vec_params = vec![
5160 VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 2),
5163 ];
5164 for params in vec_params {
5165 let test_dir = TempStrDir::default();
5166 let test_uri = &test_dir;
5167
5168 let schema = Arc::new(ArrowSchema::new(vec![
5170 ArrowField::new("i", DataType::Int32, true),
5171 ArrowField::new(
5172 "vec",
5173 DataType::FixedSizeList(
5174 Arc::new(ArrowField::new("item", DataType::Float32, true)),
5175 32,
5176 ),
5177 true,
5178 ),
5179 ]));
5180
5181 let vector_values: Float32Array =
5183 (0..32 * 512).map(|v| (v / 32) as f32 + 1.0).collect();
5184 let vectors = FixedSizeListArray::try_new_from_values(vector_values, 32).unwrap();
5185
5186 let batches = vec![RecordBatch::try_new(
5187 schema.clone(),
5188 vec![
5189 Arc::new(Int32Array::from_iter_values(0..512)),
5190 Arc::new(vectors.clone()),
5191 ],
5192 )
5193 .unwrap()];
5194
5195 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
5196 let mut dataset = Dataset::write(
5197 reader,
5198 test_uri,
5199 Some(WriteParams {
5200 data_storage_version: Some(data_storage_version),
5201 enable_stable_row_ids: stable_row_ids,
5202 ..Default::default()
5203 }),
5204 )
5205 .await
5206 .unwrap();
5207
5208 assert_eq!(dataset.index_cache_entry_count().await, 0);
5209 dataset
5210 .create_index(
5211 &["vec"],
5212 IndexType::Vector,
5213 Some("idx".to_string()),
5214 ¶ms,
5215 true,
5216 )
5217 .await
5218 .unwrap();
5219
5220 let mut scan = dataset.scan();
5221 let key: Float32Array = (0..32).map(|_v| 1.0_f32).collect();
5223 scan.nearest("vec", &key, 5).unwrap();
5224 scan.refine(100);
5225 scan.minimum_nprobes(100);
5226
5227 assert_eq!(
5228 dataset.index_cache_entry_count().await,
5229 2, );
5231 let results = scan
5232 .try_into_stream()
5233 .await
5234 .unwrap()
5235 .try_collect::<Vec<_>>()
5236 .await
5237 .unwrap();
5238
5239 assert_eq!(
5240 dataset.index_cache_entry_count().await,
5241 5 + dataset.versions().await.unwrap().len()
5242 );
5243 assert_eq!(results.len(), 1);
5244 let batch = &results[0];
5245
5246 let expected_i = BTreeSet::from_iter(vec![0, 1, 2, 3, 4]);
5247 let column_i = batch.column_by_name("i").unwrap();
5248 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
5249 .values()
5250 .iter()
5251 .copied()
5252 .collect();
5253 assert_eq!(expected_i, actual_i);
5254
5255 dataset.delete("i = 1").await.unwrap();
5258 let mut scan = dataset.scan();
5259 scan.nearest("vec", &key, 5).unwrap();
5260 scan.refine(100);
5261 scan.minimum_nprobes(100);
5262
5263 let results = scan
5264 .try_into_stream()
5265 .await
5266 .unwrap()
5267 .try_collect::<Vec<_>>()
5268 .await
5269 .unwrap();
5270
5271 assert_eq!(results.len(), 1);
5272 let batch = &results[0];
5273
5274 let expected_i = BTreeSet::from_iter(vec![0, 2, 3, 4, 5]);
5276 let column_i = batch.column_by_name("i").unwrap();
5277 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
5278 .values()
5279 .iter()
5280 .copied()
5281 .collect();
5282 assert_eq!(expected_i, actual_i);
5283
5284 let batches = vec![RecordBatch::try_new(
5287 schema.clone(),
5288 vec![
5289 Arc::new(Int32Array::from_iter_values(512..1024)),
5290 Arc::new(vectors),
5291 ],
5292 )
5293 .unwrap()];
5294
5295 let reader = RecordBatchIterator::new(batches.into_iter().map(Ok), schema.clone());
5296 let mut dataset = Dataset::write(
5297 reader,
5298 test_uri,
5299 Some(WriteParams {
5300 mode: WriteMode::Append,
5301 data_storage_version: Some(data_storage_version),
5302 ..Default::default()
5303 }),
5304 )
5305 .await
5306 .unwrap();
5307 dataset
5308 .create_index(
5309 &["vec"],
5310 IndexType::Vector,
5311 Some("idx".to_string()),
5312 ¶ms,
5313 true,
5314 )
5315 .await
5316 .unwrap();
5317
5318 dataset.delete("i < 512").await.unwrap();
5319
5320 let mut scan = dataset.scan();
5321 scan.nearest("vec", &key, 5).unwrap();
5322 scan.refine(100);
5323 scan.minimum_nprobes(100);
5324
5325 let results = scan
5326 .try_into_stream()
5327 .await
5328 .unwrap()
5329 .try_collect::<Vec<_>>()
5330 .await
5331 .unwrap();
5332
5333 assert_eq!(results.len(), 1);
5334 let batch = &results[0];
5335
5336 let expected_i = BTreeSet::from_iter(vec![512, 513, 514, 515, 516]);
5338 let column_i = batch.column_by_name("i").unwrap();
5339 let actual_i: BTreeSet<i32> = as_primitive_array::<Int32Type>(column_i.as_ref())
5340 .values()
5341 .iter()
5342 .copied()
5343 .collect();
5344 assert_eq!(expected_i, actual_i);
5345 }
5346 }
5347
5348 #[tokio::test]
5349 async fn test_projection_order() {
5350 let vec_params = VectorIndexParams::ivf_pq(4, 8, 2, MetricType::L2, 2);
5351 let mut data = gen_batch()
5352 .col("vec", array::rand_vec::<Float32Type>(Dimension::from(4)))
5353 .col("text", array::rand_utf8(ByteCount::from(10), false))
5354 .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(100))
5355 .await
5356 .unwrap();
5357 data.create_index(&["vec"], IndexType::Vector, None, &vec_params, true)
5358 .await
5359 .unwrap();
5360
5361 let mut scan = data.scan();
5362 scan.nearest("vec", &Float32Array::from(vec![1.0, 1.0, 1.0, 1.0]), 5)
5363 .unwrap();
5364 scan.with_row_id().project(&["text"]).unwrap();
5365
5366 let results = scan
5367 .try_into_stream()
5368 .await
5369 .unwrap()
5370 .try_collect::<Vec<_>>()
5371 .await
5372 .unwrap();
5373
5374 assert_eq!(
5375 results[0].schema().field_names(),
5376 vec!["text", "_distance", "_rowid"]
5377 );
5378 }
5379
5380 #[rstest]
5381 #[tokio::test]
5382 async fn test_count_rows_with_filter(
5383 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5384 data_storage_version: LanceFileVersion,
5385 ) {
5386 let test_dir = TempStrDir::default();
5387 let test_uri = &test_dir;
5388 let mut data_gen = BatchGenerator::new().col(Box::new(
5389 IncrementingInt32::new().named("Filter_me".to_owned()),
5390 ));
5391 Dataset::write(
5392 data_gen.batch(32),
5393 test_uri,
5394 Some(WriteParams {
5395 data_storage_version: Some(data_storage_version),
5396 ..Default::default()
5397 }),
5398 )
5399 .await
5400 .unwrap();
5401
5402 let dataset = Dataset::open(test_uri).await.unwrap();
5403 assert_eq!(32, dataset.count_rows(None).await.unwrap());
5404 assert_eq!(
5405 16,
5406 dataset
5407 .count_rows(Some("`Filter_me` > 15".to_string()))
5408 .await
5409 .unwrap()
5410 );
5411 }
5412
5413 #[rstest]
5414 #[tokio::test]
5415 async fn test_dynamic_projection(
5416 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5417 data_storage_version: LanceFileVersion,
5418 ) {
5419 let test_dir = TempStrDir::default();
5420 let test_uri = &test_dir;
5421 let mut data_gen =
5422 BatchGenerator::new().col(Box::new(IncrementingInt32::new().named("i".to_owned())));
5423 Dataset::write(
5424 data_gen.batch(32),
5425 test_uri,
5426 Some(WriteParams {
5427 data_storage_version: Some(data_storage_version),
5428 ..Default::default()
5429 }),
5430 )
5431 .await
5432 .unwrap();
5433
5434 let dataset = Dataset::open(test_uri).await.unwrap();
5435 assert_eq!(dataset.count_rows(None).await.unwrap(), 32);
5436
5437 let mut scanner = dataset.scan();
5438
5439 let scan_res = scanner
5440 .project_with_transform(&[("bool", "i > 15")])
5441 .unwrap()
5442 .try_into_batch()
5443 .await
5444 .unwrap();
5445
5446 assert_eq!(1, scan_res.num_columns());
5447
5448 let bool_col = scan_res
5449 .column_by_name("bool")
5450 .expect("bool column should exist");
5451 let bool_arr = bool_col.as_boolean();
5452 for i in 0..32 {
5453 if i > 15 {
5454 assert!(bool_arr.value(i));
5455 } else {
5456 assert!(!bool_arr.value(i));
5457 }
5458 }
5459 }
5460
5461 #[rstest]
5462 #[tokio::test]
5463 async fn test_column_casting_function(
5464 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5465 data_storage_version: LanceFileVersion,
5466 ) {
5467 let test_dir = TempStrDir::default();
5468 let test_uri = &test_dir;
5469 let mut data_gen =
5470 BatchGenerator::new().col(Box::new(RandomVector::new().named("vec".to_owned())));
5471 Dataset::write(
5472 data_gen.batch(32),
5473 test_uri,
5474 Some(WriteParams {
5475 data_storage_version: Some(data_storage_version),
5476 ..Default::default()
5477 }),
5478 )
5479 .await
5480 .unwrap();
5481
5482 let dataset = Dataset::open(test_uri).await.unwrap();
5483 assert_eq!(dataset.count_rows(None).await.unwrap(), 32);
5484
5485 let mut scanner = dataset.scan();
5486
5487 let scan_res = scanner
5488 .project_with_transform(&[("f16", "_cast_list_f16(vec)")])
5489 .unwrap()
5490 .try_into_batch()
5491 .await
5492 .unwrap();
5493
5494 assert_eq!(1, scan_res.num_columns());
5495 assert_eq!(32, scan_res.num_rows());
5496 assert_eq!("f16", scan_res.schema().field(0).name());
5497
5498 let mut scanner = dataset.scan();
5499 let scan_res_original = scanner
5500 .project(&["vec"])
5501 .unwrap()
5502 .try_into_batch()
5503 .await
5504 .unwrap();
5505
5506 let f32_col: &Float32Array = scan_res_original
5507 .column_by_name("vec")
5508 .unwrap()
5509 .as_fixed_size_list()
5510 .values()
5511 .as_primitive();
5512 let f16_col: &Float16Array = scan_res
5513 .column_by_name("f16")
5514 .unwrap()
5515 .as_fixed_size_list()
5516 .values()
5517 .as_primitive();
5518
5519 for (f32_val, f16_val) in f32_col.iter().zip(f16_col.iter()) {
5520 let f32_val = f32_val.unwrap();
5521 let f16_val = f16_val.unwrap();
5522 assert_eq!(f16::from_f32(f32_val), f16_val);
5523 }
5524 }
5525
5526 struct ScalarIndexTestFixture {
5527 _test_dir: TempStrDir,
5528 dataset: Dataset,
5529 sample_query: Arc<dyn Array>,
5530 delete_query: Arc<dyn Array>,
5531 original_version: u64,
5533 compact_version: u64,
5535 append_version: u64,
5537 updated_version: u64,
5539 delete_version: u64,
5541 append_then_delete_version: u64,
5543 }
5544
5545 #[derive(Debug, PartialEq)]
5546 struct ScalarTestParams {
5547 use_index: bool,
5548 use_projection: bool,
5549 use_deleted_data: bool,
5550 use_new_data: bool,
5551 with_row_id: bool,
5552 use_compaction: bool,
5553 use_updated: bool,
5554 }
5555
5556 impl ScalarIndexTestFixture {
5557 async fn new(data_storage_version: LanceFileVersion, use_stable_row_ids: bool) -> Self {
5558 let test_dir = TempStrDir::default();
5559 let test_uri = &test_dir;
5560
5561 let data = gen_batch()
5567 .col(
5568 "vector",
5569 array::rand_vec::<Float32Type>(Dimension::from(32)),
5570 )
5571 .col("indexed", array::step::<Int32Type>())
5572 .col("not_indexed", array::step::<Int32Type>())
5573 .into_batch_rows(RowCount::from(1000))
5574 .unwrap();
5575
5576 let mut dataset = Dataset::write(
5578 RecordBatchIterator::new(vec![Ok(data.clone())], data.schema().clone()),
5579 test_uri,
5580 Some(WriteParams {
5581 max_rows_per_file: 500,
5582 data_storage_version: Some(data_storage_version),
5583 enable_stable_row_ids: use_stable_row_ids,
5584 ..Default::default()
5585 }),
5586 )
5587 .await
5588 .unwrap();
5589
5590 dataset
5591 .create_index(
5592 &["vector"],
5593 IndexType::Vector,
5594 None,
5595 &VectorIndexParams::ivf_pq(2, 8, 2, MetricType::L2, 2),
5596 false,
5597 )
5598 .await
5599 .unwrap();
5600
5601 dataset
5602 .create_index(
5603 &["indexed"],
5604 IndexType::Scalar,
5605 None,
5606 &ScalarIndexParams::default(),
5607 false,
5608 )
5609 .await
5610 .unwrap();
5611
5612 let original_version = dataset.version().version;
5613 let sample_query = data["vector"].as_fixed_size_list().value(50);
5614 let delete_query = data["vector"].as_fixed_size_list().value(75);
5615
5616 let new_indexed =
5621 arrow_arith::numeric::add(&data["indexed"], &Int32Array::new_scalar(1000)).unwrap();
5622 let new_not_indexed =
5623 arrow_arith::numeric::add(&data["indexed"], &Int32Array::new_scalar(1000)).unwrap();
5624 let append_data = RecordBatch::try_new(
5625 data.schema(),
5626 vec![data["vector"].clone(), new_indexed, new_not_indexed],
5627 )
5628 .unwrap();
5629
5630 dataset
5631 .append(
5632 RecordBatchIterator::new(vec![Ok(append_data)], data.schema()),
5633 Some(WriteParams {
5634 data_storage_version: Some(data_storage_version),
5635 ..Default::default()
5636 }),
5637 )
5638 .await
5639 .unwrap();
5640
5641 let append_version = dataset.version().version;
5642
5643 dataset.optimize_indices(&Default::default()).await.unwrap();
5646 let updated_version = dataset.version().version;
5647
5648 dataset.checkout_version(append_version).await.unwrap();
5651 dataset.restore().await.unwrap();
5652
5653 dataset.delete("not_indexed = 75").await.unwrap();
5654
5655 let append_then_delete_version = dataset.version().version;
5656
5657 let mut dataset = dataset.checkout_version(original_version).await.unwrap();
5660 dataset.restore().await.unwrap();
5661
5662 dataset.delete("not_indexed = 75").await.unwrap();
5663
5664 let delete_version = dataset.version().version;
5665
5666 compact_files(&mut dataset, CompactionOptions::default(), None)
5668 .await
5669 .unwrap();
5670 let compact_version = dataset.version().version;
5671 dataset.checkout_version(original_version).await.unwrap();
5672 dataset.restore().await.unwrap();
5673
5674 Self {
5675 _test_dir: test_dir,
5676 dataset,
5677 sample_query,
5678 delete_query,
5679 original_version,
5680 compact_version,
5681 append_version,
5682 updated_version,
5683 delete_version,
5684 append_then_delete_version,
5685 }
5686 }
5687
5688 fn sample_query(&self) -> &PrimitiveArray<Float32Type> {
5689 self.sample_query.as_primitive::<Float32Type>()
5690 }
5691
5692 fn delete_query(&self) -> &PrimitiveArray<Float32Type> {
5693 self.delete_query.as_primitive::<Float32Type>()
5694 }
5695
5696 async fn get_dataset(&self, params: &ScalarTestParams) -> Dataset {
5697 let version = if params.use_compaction {
5698 if params.use_deleted_data || params.use_new_data || params.use_updated {
5700 panic!(
5701 "There is no test data combining new/deleted/updated data with compaction"
5702 );
5703 } else {
5704 self.compact_version
5705 }
5706 } else if params.use_updated {
5707 if params.use_deleted_data || params.use_new_data || params.use_compaction {
5709 panic!(
5710 "There is no test data combining updated data with new/deleted/compaction"
5711 );
5712 } else {
5713 self.updated_version
5714 }
5715 } else {
5716 match (params.use_new_data, params.use_deleted_data) {
5717 (false, false) => self.original_version,
5718 (false, true) => self.delete_version,
5719 (true, false) => self.append_version,
5720 (true, true) => self.append_then_delete_version,
5721 }
5722 };
5723 self.dataset.checkout_version(version).await.unwrap()
5724 }
5725
5726 async fn run_query(
5727 &self,
5728 query: &str,
5729 vector: Option<&PrimitiveArray<Float32Type>>,
5730 params: &ScalarTestParams,
5731 ) -> (String, RecordBatch) {
5732 let dataset = self.get_dataset(params).await;
5733 let mut scan = dataset.scan();
5734 if let Some(vector) = vector {
5735 scan.nearest("vector", vector, 10).unwrap();
5736 }
5737 if params.use_projection {
5738 scan.project(&["indexed"]).unwrap();
5739 }
5740 if params.with_row_id {
5741 scan.with_row_id();
5742 }
5743 scan.scan_in_order(true);
5744 scan.use_index(params.use_index);
5745 scan.filter(query).unwrap();
5746 scan.prefilter(true);
5747
5748 let plan = scan.explain_plan(true).await.unwrap();
5749 let batch = scan.try_into_batch().await.unwrap();
5750
5751 if params.use_projection {
5752 let mut expected_columns = 1;
5754 if vector.is_some() {
5755 expected_columns += 1;
5757 }
5758 if params.with_row_id {
5759 expected_columns += 1;
5760 }
5761 assert_eq!(batch.num_columns(), expected_columns);
5762 } else {
5763 let mut expected_columns = 3;
5764 if vector.is_some() {
5765 expected_columns += 1;
5767 }
5768 if params.with_row_id {
5769 expected_columns += 1;
5770 }
5771 assert_eq!(batch.num_columns(), expected_columns);
5773 }
5774
5775 (plan, batch)
5776 }
5777
5778 fn assert_none<F: Fn(i32) -> bool>(
5779 &self,
5780 batch: &RecordBatch,
5781 predicate: F,
5782 message: &str,
5783 ) {
5784 let indexed = batch["indexed"].as_primitive::<Int32Type>();
5785 if indexed.iter().map(|val| val.unwrap()).any(predicate) {
5786 panic!("{}", message);
5787 }
5788 }
5789
5790 fn assert_one<F: Fn(i32) -> bool>(&self, batch: &RecordBatch, predicate: F, message: &str) {
5791 let indexed = batch["indexed"].as_primitive::<Int32Type>();
5792 if !indexed.iter().map(|val| val.unwrap()).any(predicate) {
5793 panic!("{}", message);
5794 }
5795 }
5796
5797 async fn check_vector_scalar_indexed_and_refine(&self, params: &ScalarTestParams) {
5798 let (query_plan, batch) = self
5799 .run_query(
5800 "indexed != 50 AND ((not_indexed < 100) OR (not_indexed >= 1000 AND not_indexed < 1100))",
5801 Some(self.sample_query()),
5802 params,
5803 )
5804 .await;
5805 if self.dataset.is_legacy_storage() {
5807 assert!(query_plan.contains("MaterializeIndex"));
5808 }
5809 self.assert_none(
5811 &batch,
5812 |val| val == 50,
5813 "The query contained 50 even though it was filtered",
5814 );
5815 if !params.use_new_data {
5816 self.assert_none(
5818 &batch,
5819 |val| (100..1000).contains(&val) || (val >= 1100),
5820 "The non-indexed refine filter was not applied",
5821 );
5822 }
5823
5824 if params.use_new_data || params.use_updated {
5826 self.assert_one(
5827 &batch,
5828 |val| val == 1050,
5829 "The query did not contain 1050 from the new data",
5830 );
5831 }
5832 }
5833
5834 async fn check_vector_scalar_indexed_only(&self, params: &ScalarTestParams) {
5835 let (query_plan, batch) = self
5836 .run_query("indexed != 50", Some(self.sample_query()), params)
5837 .await;
5838 if self.dataset.is_legacy_storage() {
5839 if params.use_index {
5840 assert!(query_plan.contains("ScalarIndexQuery"));
5843 } else {
5844 assert!(query_plan.contains("MaterializeIndex"));
5846 }
5847 }
5848 self.assert_none(
5850 &batch,
5851 |val| val == 50,
5852 "The query contained 50 even though it was filtered",
5853 );
5854 if params.use_new_data {
5856 self.assert_one(
5857 &batch,
5858 |val| val == 1050,
5859 "The query did not contain 1050 from the new data",
5860 );
5861 if !params.use_new_data {
5862 let (_, batch) = self
5864 .run_query("indexed == 1050", Some(self.sample_query()), params)
5865 .await;
5866 assert_eq!(batch.num_rows(), 1);
5867 }
5868 }
5869 if params.use_deleted_data {
5870 let (_, batch) = self
5871 .run_query("indexed == 75", Some(self.delete_query()), params)
5872 .await;
5873 if !params.use_new_data {
5874 assert_eq!(batch.num_rows(), 0);
5875 }
5876 }
5877 }
5878
5879 async fn check_vector_queries(&self, params: &ScalarTestParams) {
5880 self.check_vector_scalar_indexed_only(params).await;
5881 self.check_vector_scalar_indexed_and_refine(params).await;
5882 }
5883
5884 async fn check_simple_indexed_only(&self, params: &ScalarTestParams) {
5885 let (query_plan, batch) = self.run_query("indexed != 50", None, params).await;
5886 if self.dataset.is_legacy_storage() {
5888 assert!(query_plan.contains("MaterializeIndex"));
5889 } else {
5890 assert!(query_plan.contains("LanceRead"));
5891 }
5892 self.assert_none(
5894 &batch,
5895 |val| val == 50,
5896 "The query contained 50 even though it was filtered",
5897 );
5898 let mut expected_num_rows = if params.use_new_data || params.use_updated {
5899 1999
5900 } else {
5901 999
5902 };
5903 if params.use_deleted_data || params.use_compaction {
5904 expected_num_rows -= 1;
5905 }
5906 assert_eq!(batch.num_rows(), expected_num_rows);
5907
5908 if params.use_new_data || params.use_updated {
5910 let (_, batch) = self.run_query("indexed == 1050", None, params).await;
5911 assert_eq!(batch.num_rows(), 1);
5912 }
5913
5914 if params.use_deleted_data || params.use_compaction {
5916 let (_, batch) = self.run_query("indexed == 75", None, params).await;
5917 assert_eq!(batch.num_rows(), 0);
5918 }
5919 }
5920
5921 async fn check_simple_indexed_and_refine(&self, params: &ScalarTestParams) {
5922 let (query_plan, batch) = self.run_query(
5923 "indexed != 50 AND ((not_indexed < 100) OR (not_indexed >= 1000 AND not_indexed < 1100))",
5924 None,
5925 params
5926 ).await;
5927 if self.dataset.is_legacy_storage() {
5929 assert!(query_plan.contains("MaterializeIndex"));
5930 } else {
5931 assert!(query_plan.contains("LanceRead"));
5932 }
5933 self.assert_none(
5935 &batch,
5936 |val| val == 50,
5937 "The query contained 50 even though it was filtered",
5938 );
5939 self.assert_none(
5941 &batch,
5942 |val| (100..1000).contains(&val) || (val >= 1100),
5943 "The non-indexed refine filter was not applied",
5944 );
5945
5946 let mut expected_num_rows = if params.use_new_data || params.use_updated {
5947 199
5948 } else {
5949 99
5950 };
5951 if params.use_deleted_data || params.use_compaction {
5952 expected_num_rows -= 1;
5953 }
5954 assert_eq!(batch.num_rows(), expected_num_rows);
5955 }
5956
5957 async fn check_simple_queries(&self, params: &ScalarTestParams) {
5958 self.check_simple_indexed_only(params).await;
5959 self.check_simple_indexed_and_refine(params).await;
5960 }
5961 }
5962
5963 #[rstest]
5967 #[tokio::test]
5968 async fn test_secondary_index_scans(
5969 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
5970 data_storage_version: LanceFileVersion,
5971 #[values(false, true)] use_stable_row_ids: bool,
5972 ) {
5973 let fixture = Box::pin(ScalarIndexTestFixture::new(
5974 data_storage_version,
5975 use_stable_row_ids,
5976 ))
5977 .await;
5978
5979 for use_index in [false, true] {
5980 for use_projection in [false, true] {
5981 for use_deleted_data in [false, true] {
5982 for use_new_data in [false, true] {
5983 let compaction_choices =
5988 if use_deleted_data || use_new_data || use_stable_row_ids {
5989 vec![false]
5990 } else {
5991 vec![false, true]
5992 };
5993 for use_compaction in compaction_choices {
5994 let updated_choices =
5995 if use_deleted_data || use_new_data || use_compaction {
5996 vec![false]
5997 } else {
5998 vec![false, true]
5999 };
6000 for use_updated in updated_choices {
6001 for with_row_id in [false, true] {
6002 let params = ScalarTestParams {
6003 use_index,
6004 use_projection,
6005 use_deleted_data,
6006 use_new_data,
6007 with_row_id,
6008 use_compaction,
6009 use_updated,
6010 };
6011 fixture.check_vector_queries(¶ms).await;
6012 fixture.check_simple_queries(¶ms).await;
6013 }
6014 }
6015 }
6016 }
6017 }
6018 }
6019 }
6020 }
6021
6022 #[tokio::test]
6023 async fn can_filter_row_id() {
6024 let dataset = lance_datagen::gen_batch()
6025 .col("x", array::step::<Int32Type>())
6026 .into_ram_dataset(FragmentCount::from(1), FragmentRowCount::from(1000))
6027 .await
6028 .unwrap();
6029
6030 let mut scan = dataset.scan();
6031 scan.with_row_id();
6032 scan.project::<&str>(&[]).unwrap();
6033 scan.filter("_rowid == 50").unwrap();
6034 let batch = scan.try_into_batch().await.unwrap();
6035 assert_eq!(batch.num_rows(), 1);
6036 assert_eq!(batch.column(0).as_primitive::<UInt64Type>().values()[0], 50);
6037 }
6038
6039 #[rstest]
6040 #[tokio::test]
6041 async fn test_index_take_batch_size() {
6042 let fixture = Box::pin(ScalarIndexTestFixture::new(LanceFileVersion::Stable, false)).await;
6043 let stream = fixture
6044 .dataset
6045 .scan()
6046 .filter("indexed > 0")
6047 .unwrap()
6048 .batch_size(16)
6049 .try_into_stream()
6050 .await
6051 .unwrap();
6052 let batches = stream.collect::<Vec<_>>().await;
6053 assert_eq!(batches.len(), 1000_usize.div_ceil(16));
6054 }
6055
6056 async fn assert_plan_equals(
6060 dataset: &Dataset,
6061 plan: impl Fn(&mut Scanner) -> Result<&mut Scanner>,
6062 expected: &str,
6063 ) -> Result<()> {
6064 let mut scan = dataset.scan();
6065 plan(&mut scan)?;
6066 let exec_plan = scan.create_plan().await?;
6067 assert_plan_node_equals(exec_plan, expected).await
6068 }
6069
6070 #[tokio::test]
6071 async fn test_count_plan() {
6072 let dim = 256;
6074 let fixture = TestVectorDataset::new_with_dimension(LanceFileVersion::Stable, true, dim)
6075 .await
6076 .unwrap();
6077
6078 let err = fixture
6080 .dataset
6081 .scan()
6082 .create_count_plan()
6083 .await
6084 .unwrap_err();
6085 assert!(matches!(err, Error::InvalidInput { .. }));
6086
6087 let mut scan = fixture.dataset.scan();
6088 scan.project(&Vec::<String>::default()).unwrap();
6089
6090 let err = scan.create_count_plan().await.unwrap_err();
6092 assert!(matches!(err, Error::InvalidInput { .. }));
6093
6094 scan.with_row_id();
6095
6096 let plan = scan.create_count_plan().await.unwrap();
6097
6098 assert_plan_node_equals(
6099 plan,
6100 "AggregateExec: mode=Single, gby=[], aggr=[count_rows]
6101 LanceRead: uri=..., projection=[], num_fragments=2, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=--, refine_filter=--",
6102 )
6103 .await
6104 .unwrap();
6105
6106 scan.filter("s == ''").unwrap();
6107
6108 let plan = scan.create_count_plan().await.unwrap();
6109
6110 assert_plan_node_equals(
6111 plan,
6112 "AggregateExec: mode=Single, gby=[], aggr=[count_rows]
6113 ProjectionExec: expr=[_rowid@1 as _rowid]
6114 LanceRead: uri=..., projection=[s], num_fragments=2, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=s = Utf8(\"\"), refine_filter=s = Utf8(\"\")",
6115 )
6116 .await
6117 .unwrap();
6118 }
6119
6120 #[tokio::test]
6121 async fn test_inexact_scalar_index_plans() {
6122 let data = gen_batch()
6123 .col("ngram", array::rand_utf8(ByteCount::from(5), false))
6124 .col("exact", array::rand_type(&DataType::UInt32))
6125 .col("no_index", array::rand_type(&DataType::UInt32))
6126 .into_reader_rows(RowCount::from(1000), BatchCount::from(5));
6127
6128 let mut dataset = Dataset::write(data, "memory://test", None).await.unwrap();
6129 dataset
6130 .create_index(
6131 &["ngram"],
6132 IndexType::NGram,
6133 None,
6134 &ScalarIndexParams::default(),
6135 true,
6136 )
6137 .await
6138 .unwrap();
6139 dataset
6140 .create_index(
6141 &["exact"],
6142 IndexType::BTree,
6143 None,
6144 &ScalarIndexParams::default(),
6145 true,
6146 )
6147 .await
6148 .unwrap();
6149
6150 assert_plan_equals(
6152 &dataset,
6153 |scanner| scanner.filter("contains(ngram, 'test string')"),
6154 "LanceRead: uri=..., projection=[ngram, exact, no_index], num_fragments=1, \
6155 range_before=None, range_after=None, row_id=false, row_addr=false, \
6156 full_filter=contains(ngram, Utf8(\"test string\")), refine_filter=--
6157 ScalarIndexQuery: query=[contains(ngram, Utf8(\"test string\"))]@ngram_idx",
6158 )
6159 .await
6160 .unwrap();
6161
6162 assert_plan_equals(
6164 &dataset,
6165 |scanner| scanner.filter("contains(ngram, 'test string') and exact < 50"),
6166 "LanceRead: uri=..., projection=[ngram, exact, no_index], num_fragments=1, \
6167 range_before=None, range_after=None, row_id=false, row_addr=false, \
6168 full_filter=contains(ngram, Utf8(\"test string\")) AND exact < UInt32(50), \
6169 refine_filter=--
6170 ScalarIndexQuery: query=AND([contains(ngram, Utf8(\"test string\"))]@ngram_idx,[exact < 50]@exact_idx)",
6171 )
6172 .await
6173 .unwrap();
6174
6175 assert_plan_equals(
6177 &dataset,
6178 |scanner| {
6179 scanner.filter("contains(ngram, 'test string') and exact < 50 AND no_index > 100")
6180 },
6181 "ProjectionExec: expr=[ngram@0 as ngram, exact@1 as exact, no_index@2 as no_index]
6182 LanceRead: uri=..., projection=[ngram, exact, no_index], num_fragments=1, range_before=None, \
6183 range_after=None, row_id=true, row_addr=false, full_filter=contains(ngram, Utf8(\"test string\")) AND exact < UInt32(50) AND no_index > UInt32(100), \
6184 refine_filter=no_index > UInt32(100)
6185 ScalarIndexQuery: query=AND([contains(ngram, Utf8(\"test string\"))]@ngram_idx,[exact < 50]@exact_idx)",
6186 )
6187 .await
6188 .unwrap();
6189 }
6190
6191 #[rstest]
6192 #[tokio::test]
6193 async fn test_late_materialization(
6194 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
6195 data_storage_version: LanceFileVersion,
6196 ) {
6197 use lance_table::io::commit::RenameCommitHandler;
6200 let data = gen_batch()
6201 .col(
6202 "vector",
6203 array::rand_vec::<Float32Type>(Dimension::from(32)),
6204 )
6205 .col("indexed", array::step::<Int32Type>())
6206 .col("not_indexed", array::step::<Int32Type>())
6207 .into_reader_rows(RowCount::from(1000), BatchCount::from(20));
6208
6209 let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper();
6210 let mut dataset = Dataset::write(
6211 data,
6212 "memory://test",
6213 Some(WriteParams {
6214 store_params: Some(ObjectStoreParams {
6215 object_store_wrapper: Some(io_stats_wrapper),
6216 ..Default::default()
6217 }),
6218 commit_handler: Some(Arc::new(RenameCommitHandler)),
6219 data_storage_version: Some(data_storage_version),
6220 ..Default::default()
6221 }),
6222 )
6223 .await
6224 .unwrap();
6225 dataset
6226 .create_index(
6227 &["indexed"],
6228 IndexType::Scalar,
6229 None,
6230 &ScalarIndexParams::default(),
6231 false,
6232 )
6233 .await
6234 .unwrap();
6235
6236 let get_bytes = || io_stats.lock().unwrap().read_bytes;
6237
6238 let start_bytes = get_bytes();
6240 dataset.scan().try_into_batch().await.unwrap();
6241 let full_scan_bytes = get_bytes() - start_bytes;
6242
6243 let start_bytes = get_bytes();
6245 dataset
6246 .scan()
6247 .use_stats(false)
6248 .filter("not_indexed = 50")
6249 .unwrap()
6250 .try_into_batch()
6251 .await
6252 .unwrap();
6253 let filtered_scan_bytes = get_bytes() - start_bytes;
6254
6255 assert!(filtered_scan_bytes < full_scan_bytes);
6256
6257 if data_storage_version == LanceFileVersion::Legacy {
6260 let start_bytes = get_bytes();
6261 dataset
6262 .scan()
6263 .filter("not_indexed = 50")
6264 .unwrap()
6265 .try_into_batch()
6266 .await
6267 .unwrap();
6268 let pushdown_scan_bytes = get_bytes() - start_bytes;
6269
6270 assert!(pushdown_scan_bytes < filtered_scan_bytes);
6271 }
6272
6273 let start_bytes = get_bytes();
6277 dataset
6278 .scan()
6279 .filter("indexed = 50")
6280 .unwrap()
6281 .try_into_batch()
6282 .await
6283 .unwrap();
6284 let index_scan_bytes = get_bytes() - start_bytes;
6285 assert!(index_scan_bytes < full_scan_bytes);
6286
6287 let start_bytes = get_bytes();
6290 dataset
6291 .scan()
6292 .filter("indexed = 50")
6293 .unwrap()
6294 .try_into_batch()
6295 .await
6296 .unwrap();
6297 let second_index_scan_bytes = get_bytes() - start_bytes;
6298 assert!(second_index_scan_bytes < index_scan_bytes);
6299 }
6300
6301 #[rstest]
6302 #[tokio::test]
6303 async fn test_project_nested(
6304 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
6305 data_storage_version: LanceFileVersion,
6306 ) -> Result<()> {
6307 let struct_i_field = ArrowField::new("i", DataType::Int32, true);
6308 let struct_o_field = ArrowField::new("o", DataType::Utf8, true);
6309 let schema = Arc::new(ArrowSchema::new(vec![
6310 ArrowField::new(
6311 "struct",
6312 DataType::Struct(vec![struct_i_field.clone(), struct_o_field.clone()].into()),
6313 true,
6314 ),
6315 ArrowField::new("s", DataType::Utf8, true),
6316 ]));
6317
6318 let input_batches: Vec<RecordBatch> = (0..5)
6319 .map(|i| {
6320 let struct_i_arr: Arc<Int32Array> =
6321 Arc::new(Int32Array::from_iter_values(i * 20..(i + 1) * 20));
6322 let struct_o_arr: Arc<StringArray> = Arc::new(StringArray::from_iter_values(
6323 (i * 20..(i + 1) * 20).map(|v| format!("o-{:02}", v)),
6324 ));
6325 RecordBatch::try_new(
6326 schema.clone(),
6327 vec![
6328 Arc::new(StructArray::from(vec![
6329 (Arc::new(struct_i_field.clone()), struct_i_arr as ArrayRef),
6330 (Arc::new(struct_o_field.clone()), struct_o_arr as ArrayRef),
6331 ])),
6332 Arc::new(StringArray::from_iter_values(
6333 (i * 20..(i + 1) * 20).map(|v| format!("s-{}", v)),
6334 )),
6335 ],
6336 )
6337 .unwrap()
6338 })
6339 .collect();
6340 let batches =
6341 RecordBatchIterator::new(input_batches.clone().into_iter().map(Ok), schema.clone());
6342 let test_dir = TempStrDir::default();
6343 let test_uri = &test_dir;
6344 let write_params = WriteParams {
6345 max_rows_per_file: 40,
6346 max_rows_per_group: 10,
6347 data_storage_version: Some(data_storage_version),
6348 ..Default::default()
6349 };
6350 Dataset::write(batches, test_uri, Some(write_params))
6351 .await
6352 .unwrap();
6353
6354 let dataset = Dataset::open(test_uri).await.unwrap();
6355
6356 let batches = dataset
6357 .scan()
6358 .project(&["struct.i"])
6359 .unwrap()
6360 .try_into_stream()
6361 .await
6362 .unwrap()
6363 .try_collect::<Vec<_>>()
6364 .await
6365 .unwrap();
6366 let batch = concat_batches(&batches[0].schema(), &batches).unwrap();
6367 assert!(batch.column_by_name("struct.i").is_some());
6368 Ok(())
6369 }
6370
6371 #[rstest]
6372 #[tokio::test]
6373 async fn test_plans(
6374 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
6375 data_storage_version: LanceFileVersion,
6376 #[values(false, true)] stable_row_id: bool,
6377 ) -> Result<()> {
6378 use lance_index::scalar::inverted::query::BoostQuery;
6381 let dim = 256;
6382 let mut dataset =
6383 TestVectorDataset::new_with_dimension(data_storage_version, stable_row_id, dim).await?;
6384 let lance_schema = dataset.dataset.schema();
6385
6386 if data_storage_version == LanceFileVersion::Legacy {
6390 log::info!("Test case: Pushdown scan");
6391 assert_plan_equals(
6392 &dataset.dataset,
6393 |scan| scan.project(&["s"])?.filter("i > 10 and i < 20"),
6394 "LancePushdownScan: uri=..., projection=[s], predicate=i > Int32(10) AND i < Int32(20), row_id=false, row_addr=false, ordered=true"
6395 ).await?;
6396 }
6397
6398 log::info!("Test case: Project and filter");
6399 let expected = if data_storage_version == LanceFileVersion::Legacy {
6400 "ProjectionExec: expr=[s@2 as s]
6401 Take: columns=\"i, _rowid, (s)\"
6402 CoalesceBatchesExec: target_batch_size=8192
6403 FilterExec: i@0 > 10 AND i@0 < 20
6404 LanceScan: uri..., projection=[i], row_id=true, row_addr=false, ordered=true, range=None"
6405 } else {
6406 "ProjectionExec: expr=[s@2 as s]
6407 Take: columns=\"i, _rowid, (s)\"
6408 CoalesceBatchesExec: target_batch_size=8192
6409 LanceRead: ..., projection=[i], num_fragments=2, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10) AND i < Int32(20), refine_filter=i > Int32(10) AND i < Int32(20)"
6410 };
6411 assert_plan_equals(
6412 &dataset.dataset,
6413 |scan| {
6414 scan.use_stats(false)
6415 .project(&["s"])?
6416 .filter("i > 10 and i < 20")
6417 },
6418 expected,
6419 )
6420 .await?;
6421
6422 log::info!("Test case: Late materialization");
6425 let expected = if data_storage_version == LanceFileVersion::Legacy {
6426 "ProjectionExec: expr=[i@0 as i, s@1 as s, vec@3 as vec]
6427 Take: columns=\"i, s, _rowid, (vec)\"
6428 CoalesceBatchesExec: target_batch_size=8192
6429 FilterExec: s@1 IS NOT NULL
6430 LanceScan: uri..., projection=[i, s], row_id=true, row_addr=false, ordered=true, range=None"
6431 } else {
6432 "ProjectionExec: expr=[i@0 as i, s@1 as s, vec@3 as vec]
6433 Take: columns=\"i, s, _rowid, (vec)\"
6434 CoalesceBatchesExec: target_batch_size=8192
6435 LanceRead: uri=..., projection=[i, s], num_fragments=2, range_before=None, range_after=None, \
6436 row_id=true, row_addr=false, full_filter=s IS NOT NULL, refine_filter=s IS NOT NULL"
6437 };
6438 assert_plan_equals(
6439 &dataset.dataset,
6440 |scan| scan.use_stats(false).filter("s IS NOT NULL"),
6441 expected,
6442 )
6443 .await?;
6444
6445 log::info!("Test case: Custom materialization (all early)");
6447 let expected = if data_storage_version == LanceFileVersion::Legacy {
6448 "ProjectionExec: expr=[i@0 as i, s@1 as s, vec@2 as vec]
6449 FilterExec: s@1 IS NOT NULL
6450 LanceScan: uri..., projection=[i, s, vec], row_id=true, row_addr=false, ordered=true, range=None"
6451 } else {
6452 "ProjectionExec: expr=[i@0 as i, s@1 as s, vec@2 as vec]
6453 LanceRead: uri=..., projection=[i, s, vec], num_fragments=2, range_before=None, \
6454 range_after=None, row_id=true, row_addr=false, full_filter=s IS NOT NULL, refine_filter=s IS NOT NULL"
6455 };
6456 assert_plan_equals(
6457 &dataset.dataset,
6458 |scan| {
6459 scan.use_stats(false)
6460 .materialization_style(MaterializationStyle::AllEarly)
6461 .filter("s IS NOT NULL")
6462 },
6463 expected,
6464 )
6465 .await?;
6466
6467 log::info!("Test case: Custom materialization 2 (all late)");
6468 let expected = if data_storage_version == LanceFileVersion::Legacy {
6469 "ProjectionExec: expr=[i@2 as i, s@0 as s, vec@3 as vec]
6470 Take: columns=\"s, _rowid, (i), (vec)\"
6471 CoalesceBatchesExec: target_batch_size=8192
6472 FilterExec: s@0 IS NOT NULL
6473 LanceScan: uri..., projection=[s], row_id=true, row_addr=false, ordered=true, range=None"
6474 } else {
6475 "ProjectionExec: expr=[i@2 as i, s@0 as s, vec@3 as vec]
6476 Take: columns=\"s, _rowid, (i), (vec)\"
6477 CoalesceBatchesExec: target_batch_size=8192
6478 LanceRead: uri=..., projection=[s], num_fragments=2, range_before=None, \
6479 range_after=None, row_id=true, row_addr=false, full_filter=s IS NOT NULL, refine_filter=s IS NOT NULL"
6480 };
6481 assert_plan_equals(
6482 &dataset.dataset,
6483 |scan| {
6484 scan.use_stats(false)
6485 .materialization_style(MaterializationStyle::AllLate)
6486 .filter("s IS NOT NULL")
6487 },
6488 expected,
6489 )
6490 .await?;
6491
6492 log::info!("Test case: Custom materialization 3 (mixed)");
6493 let expected = if data_storage_version == LanceFileVersion::Legacy {
6494 "ProjectionExec: expr=[i@3 as i, s@0 as s, vec@1 as vec]
6495 Take: columns=\"s, vec, _rowid, (i)\"
6496 CoalesceBatchesExec: target_batch_size=8192
6497 FilterExec: s@0 IS NOT NULL
6498 LanceScan: uri..., projection=[s, vec], row_id=true, row_addr=false, ordered=true, range=None"
6499 } else {
6500 "ProjectionExec: expr=[i@3 as i, s@0 as s, vec@1 as vec]
6501 Take: columns=\"s, vec, _rowid, (i)\"
6502 CoalesceBatchesExec: target_batch_size=8192
6503 LanceRead: uri=..., projection=[s, vec], num_fragments=2, range_before=None, range_after=None, \
6504 row_id=true, row_addr=false, full_filter=s IS NOT NULL, refine_filter=s IS NOT NULL"
6505 };
6506 assert_plan_equals(
6507 &dataset.dataset,
6508 |scan| {
6509 scan.use_stats(false)
6510 .materialization_style(
6511 MaterializationStyle::all_early_except(&["i"], lance_schema).unwrap(),
6512 )
6513 .filter("s IS NOT NULL")
6514 },
6515 expected,
6516 )
6517 .await?;
6518
6519 log::info!("Test case: Scan out of order");
6520 let expected = if data_storage_version == LanceFileVersion::Legacy {
6521 "LanceScan: uri=..., projection=[s], row_id=true, row_addr=false, ordered=false, range=None"
6522 } else {
6523 "LanceRead: uri=..., projection=[s], num_fragments=2, range_before=None, range_after=None, row_id=true, \
6524 row_addr=false, full_filter=--, refine_filter=--"
6525 };
6526 assert_plan_equals(
6527 &dataset.dataset,
6528 |scan| Ok(scan.project(&["s"])?.with_row_id().scan_in_order(false)),
6529 expected,
6530 )
6531 .await?;
6532
6533 let q: Float32Array = (32..32 + dim).map(|v| v as f32).collect();
6536 log::info!("Test case: Basic KNN");
6537 let expected = if data_storage_version == LanceFileVersion::Legacy {
6538 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6539 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6540 CoalesceBatchesExec: target_batch_size=8192
6541 FilterExec: _distance@2 IS NOT NULL
6542 SortExec: TopK(fetch=5), expr=...
6543 KNNVectorDistance: metric=l2
6544 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None"
6545 } else {
6546 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6547 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6548 CoalesceBatchesExec: target_batch_size=8192
6549 FilterExec: _distance@2 IS NOT NULL
6550 SortExec: TopK(fetch=5), expr=...
6551 KNNVectorDistance: metric=l2
6552 LanceRead: uri=..., projection=[vec], num_fragments=2, range_before=None, range_after=None, \
6553 row_id=true, row_addr=false, full_filter=--, refine_filter=--"
6554 };
6555 assert_plan_equals(
6556 &dataset.dataset,
6557 |scan| scan.nearest("vec", &q, 5),
6558 expected,
6559 )
6560 .await?;
6561
6562 let q: Float32Array = (32..32 + dim).map(|v| v as f32).collect();
6565 log::info!("Test case: KNN with extraneous limit");
6566 let expected = if data_storage_version == LanceFileVersion::Legacy {
6567 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6568 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6569 CoalesceBatchesExec: target_batch_size=8192
6570 GlobalLimitExec: skip=0, fetch=1
6571 FilterExec: _distance@2 IS NOT NULL
6572 SortExec: TopK(fetch=5), expr=...
6573 KNNVectorDistance: metric=l2
6574 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None"
6575 } else {
6576 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6577 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6578 CoalesceBatchesExec: target_batch_size=8192
6579 GlobalLimitExec: skip=0, fetch=1
6580 FilterExec: _distance@2 IS NOT NULL
6581 SortExec: TopK(fetch=5), expr=...
6582 KNNVectorDistance: metric=l2
6583 LanceRead: uri=..., projection=[vec], num_fragments=2, range_before=None, range_after=None, \
6584 row_id=true, row_addr=false, full_filter=--, refine_filter=--"
6585 };
6586 assert_plan_equals(
6587 &dataset.dataset,
6588 |scan| scan.nearest("vec", &q, 5)?.limit(Some(1), None),
6589 expected,
6590 )
6591 .await?;
6592
6593 dataset.make_vector_index().await?;
6596 log::info!("Test case: Basic ANN");
6597 let expected =
6598 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
6599 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
6600 CoalesceBatchesExec: target_batch_size=8192
6601 SortExec: TopK(fetch=42), expr=...
6602 ANNSubIndex: name=..., k=42, deltas=1
6603 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
6604 assert_plan_equals(
6605 &dataset.dataset,
6606 |scan| scan.nearest("vec", &q, 42),
6607 expected,
6608 )
6609 .await?;
6610
6611 log::info!("Test case: ANN with refine");
6612 let expected =
6613 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6614 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
6615 CoalesceBatchesExec: target_batch_size=8192
6616 FilterExec: _distance@... IS NOT NULL
6617 SortExec: TopK(fetch=10), expr=...
6618 KNNVectorDistance: metric=l2
6619 Take: columns=\"_distance, _rowid, (vec)\"
6620 CoalesceBatchesExec: target_batch_size=8192
6621 SortExec: TopK(fetch=40), expr=...
6622 ANNSubIndex: name=..., k=40, deltas=1
6623 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
6624 assert_plan_equals(
6625 &dataset.dataset,
6626 |scan| Ok(scan.nearest("vec", &q, 10)?.refine(4)),
6627 expected,
6628 )
6629 .await?;
6630
6631 log::info!("Test case: ANN with index disabled");
6633 let expected = if data_storage_version == LanceFileVersion::Legacy {
6634 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6635 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6636 CoalesceBatchesExec: target_batch_size=8192
6637 FilterExec: _distance@... IS NOT NULL
6638 SortExec: TopK(fetch=13), expr=...
6639 KNNVectorDistance: metric=l2
6640 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None"
6641 } else {
6642 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@0 as vec, _distance@2 as _distance]
6643 Take: columns=\"vec, _rowid, _distance, (i), (s)\"
6644 CoalesceBatchesExec: target_batch_size=8192
6645 FilterExec: _distance@... IS NOT NULL
6646 SortExec: TopK(fetch=13), expr=...
6647 KNNVectorDistance: metric=l2
6648 LanceRead: uri=..., projection=[vec], num_fragments=2, range_before=None, range_after=None, \
6649 row_id=true, row_addr=false, full_filter=--, refine_filter=--"
6650 };
6651 assert_plan_equals(
6652 &dataset.dataset,
6653 |scan| Ok(scan.nearest("vec", &q, 13)?.use_index(false)),
6654 expected,
6655 )
6656 .await?;
6657
6658 log::info!("Test case: ANN with postfilter");
6659 let expected = "ProjectionExec: expr=[s@3 as s, vec@4 as vec, _distance@0 as _distance, _rowid@1 as _rowid]
6660 Take: columns=\"_distance, _rowid, i, (s), (vec)\"
6661 CoalesceBatchesExec: target_batch_size=8192
6662 FilterExec: i@2 > 10
6663 Take: columns=\"_distance, _rowid, (i)\"
6664 CoalesceBatchesExec: target_batch_size=8192
6665 SortExec: TopK(fetch=17), expr=...
6666 ANNSubIndex: name=..., k=17, deltas=1
6667 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
6668 assert_plan_equals(
6669 &dataset.dataset,
6670 |scan| {
6671 Ok(scan
6672 .nearest("vec", &q, 17)?
6673 .filter("i > 10")?
6674 .project(&["s", "vec"])?
6675 .with_row_id())
6676 },
6677 expected,
6678 )
6679 .await?;
6680
6681 log::info!("Test case: ANN with prefilter");
6682 let expected = if data_storage_version == LanceFileVersion::Legacy {
6683 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
6684 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
6685 CoalesceBatchesExec: target_batch_size=8192
6686 SortExec: TopK(fetch=17), expr=...
6687 ANNSubIndex: name=..., k=17, deltas=1
6688 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6689 FilterExec: i@0 > 10
6690 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None"
6691 } else {
6692 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
6693 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
6694 CoalesceBatchesExec: target_batch_size=8192
6695 SortExec: TopK(fetch=17), expr=...
6696 ANNSubIndex: name=..., k=17, deltas=1
6697 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6698 LanceRead: uri=..., projection=[], num_fragments=2, range_before=None, range_after=None, \
6699 row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)
6700"
6701 };
6702 assert_plan_equals(
6703 &dataset.dataset,
6704 |scan| {
6705 Ok(scan
6706 .nearest("vec", &q, 17)?
6707 .filter("i > 10")?
6708 .prefilter(true))
6709 },
6710 expected,
6711 )
6712 .await?;
6713
6714 dataset.append_new_data().await?;
6715 log::info!("Test case: Combined KNN/ANN");
6716 let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6717 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
6718 CoalesceBatchesExec: target_batch_size=8192
6719 FilterExec: _distance@... IS NOT NULL
6720 SortExec: TopK(fetch=6), expr=...
6721 KNNVectorDistance: metric=l2
6722 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
6723 UnionExec
6724 ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec]
6725 FilterExec: _distance@... IS NOT NULL
6726 SortExec: TopK(fetch=6), expr=...
6727 KNNVectorDistance: metric=l2
6728 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None
6729 Take: columns=\"_distance, _rowid, (vec)\"
6730 CoalesceBatchesExec: target_batch_size=8192
6731 SortExec: TopK(fetch=6), expr=...
6732 ANNSubIndex: name=..., k=6, deltas=1
6733 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
6734 assert_plan_equals(
6735 &dataset.dataset,
6736 |scan| scan.nearest("vec", &q, 6),
6737 expected,
6740 )
6741 .await?;
6742
6743 log::info!("Test case: Combined KNN/ANN with postfilter");
6745 let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6746 Take: columns=\"_rowid, vec, _distance, i, (s)\"
6747 CoalesceBatchesExec: target_batch_size=8192
6748 FilterExec: i@3 > 10
6749 Take: columns=\"_rowid, vec, _distance, (i)\"
6750 CoalesceBatchesExec: target_batch_size=8192
6751 FilterExec: _distance@... IS NOT NULL
6752 SortExec: TopK(fetch=15), expr=...
6753 KNNVectorDistance: metric=l2
6754 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
6755 UnionExec
6756 ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec]
6757 FilterExec: _distance@... IS NOT NULL
6758 SortExec: TopK(fetch=15), expr=...
6759 KNNVectorDistance: metric=l2
6760 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None
6761 Take: columns=\"_distance, _rowid, (vec)\"
6762 CoalesceBatchesExec: target_batch_size=8192
6763 SortExec: TopK(fetch=15), expr=...
6764 ANNSubIndex: name=..., k=15, deltas=1
6765 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1";
6766 assert_plan_equals(
6767 &dataset.dataset,
6768 |scan| scan.nearest("vec", &q, 15)?.filter("i > 10"),
6769 expected,
6770 )
6771 .await?;
6772
6773 log::info!("Test case: Combined KNN/ANN with prefilter");
6775 let expected = if data_storage_version == LanceFileVersion::Legacy {
6776 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6777 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
6778 CoalesceBatchesExec: target_batch_size=8192
6779 FilterExec: _distance@... IS NOT NULL
6780 SortExec: TopK(fetch=5), expr=...
6781 KNNVectorDistance: metric=l2
6782 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
6783 UnionExec
6784 ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec]
6785 FilterExec: _distance@... IS NOT NULL
6786 SortExec: TopK(fetch=5), expr=...
6787 KNNVectorDistance: metric=l2
6788 FilterExec: i@1 > 10
6789 LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None
6790 Take: columns=\"_distance, _rowid, (vec)\"
6791 CoalesceBatchesExec: target_batch_size=8192
6792 SortExec: TopK(fetch=5), expr=...
6793 ANNSubIndex: name=..., k=5, deltas=1
6794 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6795 FilterExec: i@0 > 10
6796 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None"
6797 } else {
6798 "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6799 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
6800 CoalesceBatchesExec: target_batch_size=8192
6801 FilterExec: _distance@... IS NOT NULL
6802 SortExec: TopK(fetch=5), expr=...
6803 KNNVectorDistance: metric=l2
6804 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
6805 UnionExec
6806 ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec]
6807 FilterExec: _distance@... IS NOT NULL
6808 SortExec: TopK(fetch=5), expr=...
6809 KNNVectorDistance: metric=l2
6810 FilterExec: i@1 > 10
6811 LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None
6812 Take: columns=\"_distance, _rowid, (vec)\"
6813 CoalesceBatchesExec: target_batch_size=8192
6814 SortExec: TopK(fetch=5), expr=...
6815 ANNSubIndex: name=..., k=5, deltas=1
6816 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6817 LanceRead: uri=..., projection=[], num_fragments=2, range_before=None, range_after=None, \
6818 row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)"
6819 };
6820 assert_plan_equals(
6821 &dataset.dataset,
6822 |scan| {
6823 Ok(scan
6824 .nearest("vec", &q, 5)?
6825 .filter("i > 10")?
6826 .prefilter(true))
6827 },
6828 expected,
6831 )
6832 .await?;
6833
6834 dataset.make_vector_index().await?;
6838 dataset.make_scalar_index().await?;
6839
6840 log::info!("Test case: ANN with scalar index");
6841 let expected =
6842 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
6843 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
6844 CoalesceBatchesExec: target_batch_size=8192
6845 SortExec: TopK(fetch=5), expr=...
6846 ANNSubIndex: name=..., k=5, deltas=1
6847 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6848 ScalarIndexQuery: query=[i > 10]@i_idx";
6849 assert_plan_equals(
6850 &dataset.dataset,
6851 |scan| {
6852 Ok(scan
6853 .nearest("vec", &q, 5)?
6854 .filter("i > 10")?
6855 .prefilter(true))
6856 },
6857 expected,
6858 )
6859 .await?;
6860
6861 log::info!("Test case: ANN with scalar index disabled");
6862 let expected = if data_storage_version == LanceFileVersion::Legacy {
6863 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
6864 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
6865 CoalesceBatchesExec: target_batch_size=8192
6866 SortExec: TopK(fetch=5), expr=...
6867 ANNSubIndex: name=..., k=5, deltas=1
6868 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6869 FilterExec: i@0 > 10
6870 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None"
6871 } else {
6872 "ProjectionExec: expr=[i@2 as i, s@3 as s, vec@4 as vec, _distance@0 as _distance]
6873 Take: columns=\"_distance, _rowid, (i), (s), (vec)\"
6874 CoalesceBatchesExec: target_batch_size=8192
6875 SortExec: TopK(fetch=5), expr=...
6876 ANNSubIndex: name=..., k=5, deltas=1
6877 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6878 LanceRead: uri=..., projection=[], num_fragments=3, range_before=None, \
6879 range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)"
6880 };
6881 assert_plan_equals(
6882 &dataset.dataset,
6883 |scan| {
6884 Ok(scan
6885 .nearest("vec", &q, 5)?
6886 .use_scalar_index(false)
6887 .filter("i > 10")?
6888 .prefilter(true))
6889 },
6890 expected,
6891 )
6892 .await?;
6893
6894 dataset.append_new_data().await?;
6895
6896 log::info!("Test case: Combined KNN/ANN with scalar index");
6897 let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6898 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
6899 CoalesceBatchesExec: target_batch_size=8192
6900 FilterExec: _distance@... IS NOT NULL
6901 SortExec: TopK(fetch=8), expr=...
6902 KNNVectorDistance: metric=l2
6903 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
6904 UnionExec
6905 ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec]
6906 FilterExec: _distance@... IS NOT NULL
6907 SortExec: TopK(fetch=8), expr=...
6908 KNNVectorDistance: metric=l2
6909 FilterExec: i@1 > 10
6910 LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None
6911 Take: columns=\"_distance, _rowid, (vec)\"
6912 CoalesceBatchesExec: target_batch_size=8192
6913 SortExec: TopK(fetch=8), expr=...
6914 ANNSubIndex: name=..., k=8, deltas=1
6915 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6916 ScalarIndexQuery: query=[i > 10]@i_idx";
6917 assert_plan_equals(
6918 &dataset.dataset,
6919 |scan| {
6920 Ok(scan
6921 .nearest("vec", &q, 8)?
6922 .filter("i > 10")?
6923 .prefilter(true))
6924 },
6925 expected,
6926 )
6927 .await?;
6928
6929 log::info!(
6931 "Test case: Combined KNN/ANN with updated scalar index and outdated vector index"
6932 );
6933 let expected = "ProjectionExec: expr=[i@3 as i, s@4 as s, vec@1 as vec, _distance@2 as _distance]
6934 Take: columns=\"_rowid, vec, _distance, (i), (s)\"
6935 CoalesceBatchesExec: target_batch_size=8192
6936 FilterExec: _distance@... IS NOT NULL
6937 SortExec: TopK(fetch=11), expr=...
6938 KNNVectorDistance: metric=l2
6939 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
6940 UnionExec
6941 ProjectionExec: expr=[_distance@3 as _distance, _rowid@2 as _rowid, vec@0 as vec]
6942 FilterExec: _distance@... IS NOT NULL
6943 SortExec: TopK(fetch=11), expr=...
6944 KNNVectorDistance: metric=l2
6945 FilterExec: i@1 > 10
6946 LanceScan: uri=..., projection=[vec, i], row_id=true, row_addr=false, ordered=false, range=None
6947 Take: columns=\"_distance, _rowid, (vec)\"
6948 CoalesceBatchesExec: target_batch_size=8192
6949 SortExec: TopK(fetch=11), expr=...
6950 ANNSubIndex: name=..., k=11, deltas=1
6951 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1
6952 ScalarIndexQuery: query=[i > 10]@i_idx";
6953 dataset.make_scalar_index().await?;
6954 assert_plan_equals(
6955 &dataset.dataset,
6956 |scan| {
6957 Ok(scan
6958 .nearest("vec", &q, 11)?
6959 .filter("i > 10")?
6960 .prefilter(true))
6961 },
6962 expected,
6963 )
6964 .await?;
6965
6966 log::info!("Test case: Filtered read with scalar index");
6969 let expected = if data_storage_version == LanceFileVersion::Legacy {
6970 "ProjectionExec: expr=[s@1 as s]
6971 Take: columns=\"_rowid, (s)\"
6972 CoalesceBatchesExec: target_batch_size=8192
6973 MaterializeIndex: query=[i > 10]@i_idx"
6974 } else {
6975 "LanceRead: uri=..., projection=[s], num_fragments=4, range_before=None, \
6976 range_after=None, row_id=false, row_addr=false, full_filter=i > Int32(10), refine_filter=--
6977 ScalarIndexQuery: query=[i > 10]@i_idx"
6978 };
6979 assert_plan_equals(
6980 &dataset.dataset,
6981 |scan| scan.project(&["s"])?.filter("i > 10"),
6982 expected,
6983 )
6984 .await?;
6985
6986 if data_storage_version != LanceFileVersion::Legacy {
6987 log::info!(
6988 "Test case: Filtered read with scalar index disabled (late materialization)"
6989 );
6990 assert_plan_equals(
6991 &dataset.dataset,
6992 |scan| {
6993 scan.project(&["s"])?
6994 .use_scalar_index(false)
6995 .filter("i > 10")
6996 },
6997 "ProjectionExec: expr=[s@2 as s]
6998 Take: columns=\"i, _rowid, (s)\"
6999 CoalesceBatchesExec: target_batch_size=8192
7000 LanceRead: uri=..., projection=[i], num_fragments=4, range_before=None, \
7001 range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=i > Int32(10)",
7002 )
7003 .await?;
7004 }
7005
7006 log::info!("Test case: Empty projection");
7007 let expected = if data_storage_version == LanceFileVersion::Legacy {
7008 "ProjectionExec: expr=[_rowaddr@0 as _rowaddr]
7009 AddRowAddrExec
7010 MaterializeIndex: query=[i > 10]@i_idx"
7011 } else {
7012 "LanceRead: uri=..., projection=[], num_fragments=4, range_before=None, \
7013 range_after=None, row_id=false, row_addr=true, full_filter=i > Int32(10), refine_filter=--
7014 ScalarIndexQuery: query=[i > 10]@i_idx"
7015 };
7016 assert_plan_equals(
7017 &dataset.dataset,
7018 |scan| {
7019 scan.filter("i > 10")
7020 .unwrap()
7021 .with_row_address()
7022 .project::<&str>(&[])
7023 },
7024 expected,
7025 )
7026 .await?;
7027
7028 dataset.append_new_data().await?;
7029 log::info!("Test case: Combined Scalar/non-scalar filtered read");
7030 let expected = if data_storage_version == LanceFileVersion::Legacy {
7031 "ProjectionExec: expr=[s@1 as s]
7032 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7033 UnionExec
7034 Take: columns=\"_rowid, (s)\"
7035 CoalesceBatchesExec: target_batch_size=8192
7036 MaterializeIndex: query=[i > 10]@i_idx
7037 ProjectionExec: expr=[_rowid@2 as _rowid, s@1 as s]
7038 FilterExec: i@0 > 10
7039 LanceScan: uri=..., projection=[i, s], row_id=true, row_addr=false, ordered=false, range=None"
7040 } else {
7041 "LanceRead: uri=..., projection=[s], num_fragments=5, range_before=None, \
7042 range_after=None, row_id=false, row_addr=false, full_filter=i > Int32(10), refine_filter=--
7043 ScalarIndexQuery: query=[i > 10]@i_idx"
7044 };
7045 assert_plan_equals(
7046 &dataset.dataset,
7047 |scan| scan.project(&["s"])?.filter("i > 10"),
7048 expected,
7049 )
7050 .await?;
7051
7052 log::info!("Test case: Combined Scalar/non-scalar filtered read with empty projection");
7053 let expected = if data_storage_version == LanceFileVersion::Legacy {
7054 "ProjectionExec: expr=[_rowaddr@0 as _rowaddr]
7055 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7056 UnionExec
7057 AddRowAddrExec
7058 MaterializeIndex: query=[i > 10]@i_idx
7059 ProjectionExec: expr=[_rowaddr@2 as _rowaddr, _rowid@1 as _rowid]
7060 FilterExec: i@0 > 10
7061 LanceScan: uri=..., projection=[i], row_id=true, row_addr=true, ordered=false, range=None"
7062 } else {
7063 "LanceRead: uri=..., projection=[], num_fragments=5, range_before=None, \
7064 range_after=None, row_id=false, row_addr=true, full_filter=i > Int32(10), refine_filter=--
7065 ScalarIndexQuery: query=[i > 10]@i_idx"
7066 };
7067 assert_plan_equals(
7068 &dataset.dataset,
7069 |scan| {
7070 scan.filter("i > 10")
7071 .unwrap()
7072 .with_row_address()
7073 .project::<&str>(&[])
7074 },
7075 expected,
7076 )
7077 .await?;
7078
7079 log::info!("Test case: Dynamic projection");
7082 let expected = if data_storage_version == LanceFileVersion::Legacy {
7083 "ProjectionExec: expr=[regexp_match(s@1, .*) as matches]
7084 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7085 UnionExec
7086 Take: columns=\"_rowid, (s)\"
7087 CoalesceBatchesExec: target_batch_size=8192
7088 MaterializeIndex: query=[i > 10]@i_idx
7089 ProjectionExec: expr=[_rowid@2 as _rowid, s@1 as s]
7090 FilterExec: i@0 > 10
7091 LanceScan: uri=..., row_id=true, row_addr=false, ordered=false, range=None"
7092 } else {
7093 "ProjectionExec: expr=[regexp_match(s@0, .*) as matches]
7094 LanceRead: uri=..., projection=[s], num_fragments=5, range_before=None, \
7095 range_after=None, row_id=false, row_addr=false, full_filter=i > Int32(10), refine_filter=--
7096 ScalarIndexQuery: query=[i > 10]@i_idx"
7097 };
7098 assert_plan_equals(
7099 &dataset.dataset,
7100 |scan| {
7101 scan.project_with_transform(&[("matches", "regexp_match(s, \".*\")")])?
7102 .filter("i > 10")
7103 },
7104 expected,
7105 )
7106 .await?;
7107
7108 dataset.make_fts_index().await?;
7112 log::info!("Test case: Full text search (match query)");
7113 let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7114 Take: columns="_rowid, _score, (s)"
7115 CoalesceBatchesExec: target_batch_size=8192
7116 MatchQuery: query=hello"#;
7117 assert_plan_equals(
7118 &dataset.dataset,
7119 |scan| {
7120 scan.project(&["s"])?
7121 .with_row_id()
7122 .full_text_search(FullTextSearchQuery::new("hello".to_owned()))
7123 },
7124 expected,
7125 )
7126 .await?;
7127
7128 log::info!("Test case: Full text search (phrase query)");
7129 let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7130 Take: columns="_rowid, _score, (s)"
7131 CoalesceBatchesExec: target_batch_size=8192
7132 PhraseQuery: query=hello world"#;
7133 assert_plan_equals(
7134 &dataset.dataset,
7135 |scan| {
7136 let query = PhraseQuery::new("hello world".to_owned());
7137 scan.project(&["s"])?
7138 .with_row_id()
7139 .full_text_search(FullTextSearchQuery::new_query(query.into()))
7140 },
7141 expected,
7142 )
7143 .await?;
7144
7145 log::info!("Test case: Full text search (boost query)");
7146 let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7147 Take: columns="_rowid, _score, (s)"
7148 CoalesceBatchesExec: target_batch_size=8192
7149 BoostQuery: negative_boost=1
7150 MatchQuery: query=hello
7151 MatchQuery: query=world"#;
7152 assert_plan_equals(
7153 &dataset.dataset,
7154 |scan| {
7155 let positive =
7156 MatchQuery::new("hello".to_owned()).with_column(Some("s".to_owned()));
7157 let negative =
7158 MatchQuery::new("world".to_owned()).with_column(Some("s".to_owned()));
7159 let query = BoostQuery::new(positive.into(), negative.into(), Some(1.0));
7160 scan.project(&["s"])?
7161 .with_row_id()
7162 .full_text_search(FullTextSearchQuery::new_query(query.into()))
7163 },
7164 expected,
7165 )
7166 .await?;
7167
7168 log::info!("Test case: Full text search with prefilter");
7169 let expected = if data_storage_version == LanceFileVersion::Legacy {
7170 r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7171 Take: columns="_rowid, _score, (s)"
7172 CoalesceBatchesExec: target_batch_size=8192
7173 MatchQuery: query=hello
7174 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7175 UnionExec
7176 MaterializeIndex: query=[i > 10]@i_idx
7177 ProjectionExec: expr=[_rowid@1 as _rowid]
7178 FilterExec: i@0 > 10
7179 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None"#
7180 } else {
7181 r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7182 Take: columns="_rowid, _score, (s)"
7183 CoalesceBatchesExec: target_batch_size=8192
7184 MatchQuery: query=hello
7185 LanceRead: uri=..., projection=[], num_fragments=5, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=--
7186 ScalarIndexQuery: query=[i > 10]@i_idx"#
7187 };
7188 assert_plan_equals(
7189 &dataset.dataset,
7190 |scan| {
7191 scan.project(&["s"])?
7192 .with_row_id()
7193 .filter("i > 10")?
7194 .prefilter(true)
7195 .full_text_search(FullTextSearchQuery::new("hello".to_owned()))
7196 },
7197 expected,
7198 )
7199 .await?;
7200
7201 log::info!("Test case: Full text search with unindexed rows");
7202 let expected = r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7203 Take: columns="_rowid, _score, (s)"
7204 CoalesceBatchesExec: target_batch_size=8192
7205 SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false]
7206 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7207 UnionExec
7208 MatchQuery: query=hello
7209 FlatMatchQuery: query=hello
7210 LanceScan: uri=..., projection=[s], row_id=true, row_addr=false, ordered=false, range=None"#;
7211 dataset.append_new_data().await?;
7212 assert_plan_equals(
7213 &dataset.dataset,
7214 |scan| {
7215 scan.project(&["s"])?
7216 .with_row_id()
7217 .full_text_search(FullTextSearchQuery::new("hello".to_owned()))
7218 },
7219 expected,
7220 )
7221 .await?;
7222
7223 log::info!("Test case: Full text search with unindexed rows and prefilter");
7224 let expected = if data_storage_version == LanceFileVersion::Legacy {
7225 r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7226 Take: columns="_rowid, _score, (s)"
7227 CoalesceBatchesExec: target_batch_size=8192
7228 SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false]
7229 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7230 UnionExec
7231 MatchQuery: query=hello
7232 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7233 UnionExec
7234 MaterializeIndex: query=[i > 10]@i_idx
7235 ProjectionExec: expr=[_rowid@1 as _rowid]
7236 FilterExec: i@0 > 10
7237 LanceScan: uri=..., projection=[i], row_id=true, row_addr=false, ordered=false, range=None
7238 FlatMatchQuery: query=hello
7239 FilterExec: i@1 > 10
7240 LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false, range=None"#
7241 } else {
7242 r#"ProjectionExec: expr=[s@2 as s, _score@1 as _score, _rowid@0 as _rowid]
7243 Take: columns="_rowid, _score, (s)"
7244 CoalesceBatchesExec: target_batch_size=8192
7245 SortExec: expr=[_score@1 DESC NULLS LAST], preserve_partitioning=[false]
7246 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7247 UnionExec
7248 MatchQuery: query=hello
7249 LanceRead: uri=..., projection=[], num_fragments=5, range_before=None, range_after=None, row_id=true, row_addr=false, full_filter=i > Int32(10), refine_filter=--
7250 ScalarIndexQuery: query=[i > 10]@i_idx
7251 FlatMatchQuery: query=hello
7252 FilterExec: i@1 > 10
7253 LanceScan: uri=..., projection=[s, i], row_id=true, row_addr=false, ordered=false, range=None"#
7254 };
7255 assert_plan_equals(
7256 &dataset.dataset,
7257 |scan| {
7258 scan.project(&["s"])?
7259 .with_row_id()
7260 .filter("i > 10")?
7261 .prefilter(true)
7262 .full_text_search(FullTextSearchQuery::new("hello".to_owned()))
7263 },
7264 expected,
7265 )
7266 .await?;
7267
7268 Ok(())
7269 }
7270
7271 #[tokio::test]
7272 async fn test_fast_search_plan() {
7273 let mut dataset = TestVectorDataset::new(LanceFileVersion::Stable, true)
7275 .await
7276 .unwrap();
7277 dataset.make_vector_index().await.unwrap();
7278 dataset.append_new_data().await.unwrap();
7279
7280 let q: Float32Array = (32..64).map(|v| v as f32).collect();
7281
7282 assert_plan_equals(
7283 &dataset.dataset,
7284 |scan| {
7285 scan.nearest("vec", &q, 32)?
7286 .fast_search()
7287 .project(&["_distance", "_rowid"])
7288 },
7289 "SortExec: TopK(fetch=32), expr=[_distance@0 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]...
7290 ANNSubIndex: name=idx, k=32, deltas=1
7291 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1",
7292 )
7293 .await
7294 .unwrap();
7295
7296 assert_plan_equals(
7297 &dataset.dataset,
7298 |scan| {
7299 scan.nearest("vec", &q, 33)?
7300 .fast_search()
7301 .with_row_id()
7302 .project(&["_distance", "_rowid"])
7303 },
7304 "SortExec: TopK(fetch=33), expr=[_distance@0 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]...
7305 ANNSubIndex: name=idx, k=33, deltas=1
7306 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1",
7307 )
7308 .await
7309 .unwrap();
7310
7311 assert_plan_equals(
7313 &dataset.dataset,
7314 |scan| {
7315 scan.nearest("vec", &q, 34)?
7316 .with_row_id()
7317 .project(&["_distance", "_rowid"])
7318 },
7319 "ProjectionExec: expr=[_distance@2 as _distance, _rowid@0 as _rowid]
7320 FilterExec: _distance@2 IS NOT NULL
7321 SortExec: TopK(fetch=34), expr=[_distance@2 ASC NULLS LAST, _rowid@0 ASC NULLS LAST]...
7322 KNNVectorDistance: metric=l2
7323 RepartitionExec: partitioning=RoundRobinBatch(1), input_partitions=2
7324 UnionExec
7325 ProjectionExec: expr=[_distance@2 as _distance, _rowid@1 as _rowid, vec@0 as vec]
7326 FilterExec: _distance@2 IS NOT NULL
7327 SortExec: TopK(fetch=34), expr=[_distance@2 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]...
7328 KNNVectorDistance: metric=l2
7329 LanceScan: uri=..., projection=[vec], row_id=true, row_addr=false, ordered=false, range=None
7330 Take: columns=\"_distance, _rowid, (vec)\"
7331 CoalesceBatchesExec: target_batch_size=8192
7332 SortExec: TopK(fetch=34), expr=[_distance@0 ASC NULLS LAST, _rowid@1 ASC NULLS LAST]...
7333 ANNSubIndex: name=idx, k=34, deltas=1
7334 ANNIvfPartition: uuid=..., minimum_nprobes=20, maximum_nprobes=None, deltas=1",
7335 )
7336 .await
7337 .unwrap();
7338 }
7339
7340 #[rstest]
7341 #[tokio::test]
7342 pub async fn test_scan_planning_io(
7343 #[values(LanceFileVersion::Legacy, LanceFileVersion::Stable)]
7344 data_storage_version: LanceFileVersion,
7345 ) {
7346 use lance_index::scalar::inverted::tokenizer::InvertedIndexParams;
7350 let data = gen_batch()
7351 .col(
7352 "vector",
7353 array::rand_vec::<Float32Type>(Dimension::from(32)),
7354 )
7355 .col("text", array::rand_utf8(ByteCount::from(4), false))
7356 .col("indexed", array::step::<Int32Type>())
7357 .col("not_indexed", array::step::<Int32Type>())
7358 .into_reader_rows(RowCount::from(100), BatchCount::from(5));
7359
7360 let (io_stats_wrapper, io_stats) = IoTrackingStore::new_wrapper();
7361 let mut dataset = Dataset::write(
7362 data,
7363 "memory://test",
7364 Some(WriteParams {
7365 store_params: Some(ObjectStoreParams {
7366 object_store_wrapper: Some(io_stats_wrapper),
7367 ..Default::default()
7368 }),
7369 data_storage_version: Some(data_storage_version),
7370 ..Default::default()
7371 }),
7372 )
7373 .await
7374 .unwrap();
7375 dataset
7376 .create_index(
7377 &["indexed"],
7378 IndexType::Scalar,
7379 None,
7380 &ScalarIndexParams::default(),
7381 false,
7382 )
7383 .await
7384 .unwrap();
7385 dataset
7386 .create_index(
7387 &["text"],
7388 IndexType::Inverted,
7389 None,
7390 &InvertedIndexParams::default(),
7391 false,
7392 )
7393 .await
7394 .unwrap();
7395 dataset
7396 .create_index(
7397 &["vector"],
7398 IndexType::Vector,
7399 None,
7400 &VectorIndexParams {
7401 metric_type: DistanceType::L2,
7402 stages: vec![
7403 StageParams::Ivf(IvfBuildParams {
7404 max_iters: 2,
7405 num_partitions: Some(2),
7406 sample_rate: 2,
7407 ..Default::default()
7408 }),
7409 StageParams::PQ(PQBuildParams {
7410 max_iters: 2,
7411 num_sub_vectors: 2,
7412 ..Default::default()
7413 }),
7414 ],
7415 version: crate::index::vector::IndexFileVersion::Legacy,
7416 },
7417 false,
7418 )
7419 .await
7420 .unwrap();
7421
7422 struct IopsTracker {
7423 baseline: u64,
7424 new_iops: u64,
7425 io_stats: Arc<Mutex<IoStats>>,
7426 }
7427
7428 impl IopsTracker {
7429 fn update(&mut self) {
7430 let iops = self.io_stats.lock().unwrap().read_iops;
7431 self.new_iops = iops - self.baseline;
7432 self.baseline = iops;
7433 }
7434
7435 fn new_iops(&mut self) -> u64 {
7436 self.update();
7437 self.new_iops
7438 }
7439 }
7440
7441 let mut tracker = IopsTracker {
7442 baseline: 0,
7443 new_iops: 0,
7444 io_stats,
7445 };
7446
7447 dataset
7449 .scan()
7450 .prefilter(true)
7451 .filter("indexed > 10")
7452 .unwrap()
7453 .explain_plan(true)
7454 .await
7455 .unwrap();
7456
7457 assert!(tracker.new_iops() > 0);
7459
7460 dataset
7462 .scan()
7463 .prefilter(true)
7464 .filter("indexed > 10")
7465 .unwrap()
7466 .explain_plan(true)
7467 .await
7468 .unwrap();
7469
7470 assert_eq!(tracker.new_iops(), 0);
7471
7472 dataset
7473 .scan()
7474 .prefilter(true)
7475 .filter("true")
7476 .unwrap()
7477 .explain_plan(true)
7478 .await
7479 .unwrap();
7480
7481 assert_eq!(tracker.new_iops(), 0);
7482
7483 dataset
7484 .scan()
7485 .prefilter(true)
7486 .materialization_style(MaterializationStyle::AllEarly)
7487 .filter("true")
7488 .unwrap()
7489 .explain_plan(true)
7490 .await
7491 .unwrap();
7492
7493 assert_eq!(tracker.new_iops(), 0);
7494
7495 dataset
7496 .scan()
7497 .prefilter(true)
7498 .materialization_style(MaterializationStyle::AllLate)
7499 .filter("true")
7500 .unwrap()
7501 .explain_plan(true)
7502 .await
7503 .unwrap();
7504
7505 assert_eq!(tracker.new_iops(), 0);
7506 }
7507
7508 #[rstest]
7509 #[tokio::test]
7510 pub async fn test_row_meta_columns(
7511 #[values(
7512 (true, false), (false, true), (true, true) )]
7516 columns: (bool, bool),
7517 ) {
7518 let (with_row_id, with_row_address) = columns;
7519 let test_dir = TempStrDir::default();
7520 let uri = &test_dir;
7521
7522 let schema = Arc::new(arrow_schema::Schema::new(vec![
7523 arrow_schema::Field::new("data_item_id", arrow_schema::DataType::Int32, false),
7524 arrow_schema::Field::new("a", arrow_schema::DataType::Int32, false),
7525 ]));
7526
7527 let data = RecordBatch::try_new(
7528 schema.clone(),
7529 vec![
7530 Arc::new(Int32Array::from(vec![1001, 1002, 1003])),
7531 Arc::new(Int32Array::from(vec![1, 2, 3])),
7532 ],
7533 )
7534 .unwrap();
7535
7536 let dataset = Dataset::write(
7537 RecordBatchIterator::new(vec![Ok(data)], schema.clone()),
7538 uri,
7539 None,
7540 )
7541 .await
7542 .unwrap();
7543
7544 let mut scanner = dataset.scan();
7546
7547 let mut projection = vec!["data_item_id".to_string()];
7548 if with_row_id {
7549 scanner.with_row_id();
7550 projection.push(ROW_ID.to_string());
7551 }
7552 if with_row_address {
7553 scanner.with_row_address();
7554 projection.push(ROW_ADDR.to_string());
7555 }
7556
7557 scanner.project(&projection).unwrap();
7558 let stream = scanner.try_into_stream().await.unwrap();
7559 let batch = stream.try_collect::<Vec<_>>().await.unwrap().pop().unwrap();
7560
7561 if with_row_id {
7563 let column = batch.column_by_name(ROW_ID).unwrap();
7564 assert_eq!(column.data_type(), &DataType::UInt64);
7565 }
7566 if with_row_address {
7567 let column = batch.column_by_name(ROW_ADDR).unwrap();
7568 assert_eq!(column.data_type(), &DataType::UInt64);
7569 }
7570
7571 let mut scanner = dataset.scan();
7573 if with_row_id {
7574 scanner.with_row_id();
7575 }
7576 if with_row_address {
7577 scanner.with_row_address();
7578 }
7579 scanner.project(&["data_item_id"]).unwrap();
7580 let stream = scanner.try_into_stream().await.unwrap();
7581 let batch = stream.try_collect::<Vec<_>>().await.unwrap().pop().unwrap();
7582 let meta_column = batch.column_by_name(if with_row_id { ROW_ID } else { ROW_ADDR });
7583 assert!(meta_column.is_some());
7584
7585 let mut scanner = dataset.scan();
7587 if with_row_id {
7588 scanner.project(&[ROW_ID]).unwrap();
7589 } else {
7590 scanner.project(&[ROW_ADDR]).unwrap();
7591 };
7592 let stream = scanner.try_into_stream().await.unwrap();
7593 assert_eq!(stream.schema().fields().len(), 1);
7594 if with_row_id {
7595 assert!(stream.schema().field_with_name(ROW_ID).is_ok());
7596 } else {
7597 assert!(stream.schema().field_with_name(ROW_ADDR).is_ok());
7598 }
7599 }
7600
7601 async fn limit_offset_equivalency_test(scanner: &Scanner) {
7602 async fn test_one(
7603 scanner: &Scanner,
7604 full_result: &RecordBatch,
7605 limit: Option<i64>,
7606 offset: Option<i64>,
7607 ) {
7608 let mut new_scanner = scanner.clone();
7609 new_scanner.limit(limit, offset).unwrap();
7610 if let Some(nearest) = new_scanner.nearest_mut() {
7611 nearest.k = offset.unwrap_or(0).saturating_add(limit.unwrap_or(10_000)) as usize;
7612 }
7613 let result = new_scanner.try_into_batch().await.unwrap();
7614
7615 let resolved_offset = offset.unwrap_or(0).min(full_result.num_rows() as i64);
7616 let resolved_length = limit
7617 .unwrap_or(i64::MAX)
7618 .min(full_result.num_rows() as i64 - resolved_offset);
7619
7620 let expected = full_result.slice(resolved_offset as usize, resolved_length as usize);
7621
7622 if expected != result {
7623 let plan = new_scanner.analyze_plan().await.unwrap();
7624 assert_eq!(
7625 &expected, &result,
7626 "Limit: {:?}, Offset: {:?}, Plan: \n{}",
7627 limit, offset, plan
7628 );
7629 }
7630 }
7631
7632 let mut scanner_full = scanner.clone();
7633 if let Some(nearest) = scanner_full.nearest_mut() {
7634 nearest.k = 500;
7635 }
7636 let full_results = scanner_full.try_into_batch().await.unwrap();
7637
7638 test_one(scanner, &full_results, Some(1), None).await;
7639 test_one(scanner, &full_results, Some(1), Some(1)).await;
7640 test_one(scanner, &full_results, Some(1), Some(2)).await;
7641 test_one(scanner, &full_results, Some(1), Some(10)).await;
7642
7643 test_one(scanner, &full_results, Some(3), None).await;
7644 test_one(scanner, &full_results, Some(3), Some(2)).await;
7645 test_one(scanner, &full_results, Some(3), Some(4)).await;
7646
7647 test_one(scanner, &full_results, None, Some(3)).await;
7648 test_one(scanner, &full_results, None, Some(10)).await;
7649 }
7650
7651 #[tokio::test]
7652 async fn test_scan_limit_offset() {
7653 let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
7654 .await
7655 .unwrap();
7656 let scanner = test_ds.dataset.scan();
7657 limit_offset_equivalency_test(&scanner).await;
7658 }
7659
7660 #[tokio::test]
7661 async fn test_knn_limit_offset() {
7662 let test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
7663 .await
7664 .unwrap();
7665 let query_vector = Float32Array::from(vec![0.0; 32]);
7666 let mut scanner = test_ds.dataset.scan();
7667 scanner
7668 .nearest("vec", &query_vector, 5)
7669 .unwrap()
7670 .project(&["i"])
7671 .unwrap();
7672 limit_offset_equivalency_test(&scanner).await;
7673 }
7674
7675 #[tokio::test]
7676 async fn test_ivf_pq_limit_offset() {
7677 let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
7678 .await
7679 .unwrap();
7680 test_ds.make_vector_index().await.unwrap();
7681 test_ds.append_new_data().await.unwrap();
7682 let query_vector = Float32Array::from(vec![0.0; 32]);
7683 let mut scanner = test_ds.dataset.scan();
7684 scanner.nearest("vec", &query_vector, 500).unwrap();
7685 limit_offset_equivalency_test(&scanner).await;
7686 }
7687
7688 #[tokio::test]
7689 async fn test_fts_limit_offset() {
7690 let mut test_ds = TestVectorDataset::new(LanceFileVersion::Stable, false)
7691 .await
7692 .unwrap();
7693 test_ds.make_fts_index().await.unwrap();
7694 test_ds.append_new_data().await.unwrap();
7695 let mut scanner = test_ds.dataset.scan();
7696 scanner
7697 .full_text_search(FullTextSearchQuery::new("4".into()))
7698 .unwrap();
7699 limit_offset_equivalency_test(&scanner).await;
7700 }
7701
7702 async fn test_row_offset_read_helper(
7703 ds: &Dataset,
7704 scan_builder: impl FnOnce(&mut Scanner) -> &mut Scanner,
7705 expected_cols: &[&str],
7706 expected_row_offsets: &[u64],
7707 ) {
7708 let mut scanner = ds.scan();
7709 let scanner = scan_builder(&mut scanner);
7710 let stream = scanner.try_into_stream().await.unwrap();
7711
7712 let schema = stream.schema();
7713 let actual_cols = schema
7714 .fields()
7715 .iter()
7716 .map(|f| f.name().as_str())
7717 .collect::<Vec<_>>();
7718 assert_eq!(&actual_cols, expected_cols);
7719
7720 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
7721 let batch = arrow_select::concat::concat_batches(&schema, &batches).unwrap();
7722
7723 let row_offsets = batch
7724 .column_by_name(ROW_OFFSET)
7725 .unwrap()
7726 .as_primitive::<UInt64Type>()
7727 .values();
7728 assert_eq!(row_offsets.as_ref(), expected_row_offsets);
7729 }
7730
7731 #[tokio::test]
7732 async fn test_row_offset_read() {
7733 let mut ds = lance_datagen::gen_batch()
7734 .col("idx", array::step::<Int32Type>())
7735 .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(3))
7736 .await
7737 .unwrap();
7738 ds.delete("idx >= 2 AND idx <= 6").await.unwrap();
7742
7743 test_row_offset_read_helper(
7745 &ds,
7746 |scanner| scanner.project(&["idx", ROW_OFFSET]).unwrap(),
7747 &["idx", ROW_OFFSET],
7748 &[0, 1, 2, 3],
7749 )
7750 .await;
7751
7752 test_row_offset_read_helper(
7754 &ds,
7755 |scanner| scanner.project(&[ROW_OFFSET]).unwrap(),
7756 &[ROW_OFFSET],
7757 &[0, 1, 2, 3],
7758 )
7759 .await;
7760
7761 test_row_offset_read_helper(
7763 &ds,
7764 |scanner| {
7765 scanner
7766 .filter("idx > 1")
7767 .unwrap()
7768 .project(&[ROW_OFFSET])
7769 .unwrap()
7770 },
7771 &[ROW_OFFSET],
7772 &[2, 3],
7773 )
7774 .await;
7775 }
7776
7777 #[tokio::test]
7778 async fn test_filter_to_take() {
7779 let mut ds = lance_datagen::gen_batch()
7780 .col("idx", array::step::<Int32Type>())
7781 .into_ram_dataset(FragmentCount::from(3), FragmentRowCount::from(100))
7782 .await
7783 .unwrap();
7784
7785 let row_ids = ds
7786 .scan()
7787 .project(&Vec::<&str>::default())
7788 .unwrap()
7789 .with_row_id()
7790 .try_into_stream()
7791 .await
7792 .unwrap()
7793 .try_collect::<Vec<_>>()
7794 .await
7795 .unwrap();
7796 let schema = row_ids[0].schema();
7797 let row_ids = concat_batches(&schema, row_ids.iter()).unwrap();
7798 let row_ids = row_ids.column(0).as_primitive::<UInt64Type>().clone();
7799
7800 let row_addrs = ds
7801 .scan()
7802 .project(&Vec::<&str>::default())
7803 .unwrap()
7804 .with_row_address()
7805 .try_into_stream()
7806 .await
7807 .unwrap()
7808 .try_collect::<Vec<_>>()
7809 .await
7810 .unwrap();
7811 let schema = row_addrs[0].schema();
7812 let row_addrs = concat_batches(&schema, row_addrs.iter()).unwrap();
7813 let row_addrs = row_addrs.column(0).as_primitive::<UInt64Type>().clone();
7814
7815 ds.delete("idx >= 190 AND idx < 210").await.unwrap();
7816
7817 let ds_copy = ds.clone();
7818 let do_check = async move |filt: &str, expected_idx: &[i32], applies_optimization: bool| {
7819 let mut scanner = ds_copy.scan();
7820 scanner.filter(filt).unwrap();
7821 let plan = scanner.explain_plan(true).await.unwrap();
7823 if applies_optimization {
7824 assert!(
7825 plan.contains("OneShotStream"),
7826 "expected take optimization to be applied. Filter: '{}'. Plan:\n{}",
7827 filt,
7828 plan
7829 );
7830 } else {
7831 assert!(
7832 !plan.contains("OneShotStream"),
7833 "expected take optimization to not be applied. Filter: '{}'. Plan:\n{}",
7834 filt,
7835 plan
7836 );
7837 }
7838
7839 let stream = scanner.try_into_stream().await.unwrap();
7841 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
7842 let idx = batches
7843 .iter()
7844 .map(|b| b.column_by_name("idx").unwrap().as_ref())
7845 .collect::<Vec<_>>();
7846
7847 if idx.is_empty() {
7848 assert!(expected_idx.is_empty());
7849 return;
7850 }
7851
7852 let idx = arrow::compute::concat(&idx).unwrap();
7853 assert_eq!(idx.as_primitive::<Int32Type>().values(), expected_idx);
7854 };
7855 let check =
7856 async |filt: &str, expected_idx: &[i32]| do_check(filt, expected_idx, true).await;
7857 let check_no_opt = async |filt: &str, expected_idx: &[i32]| {
7858 do_check(filt, expected_idx, false).await;
7859 };
7860
7861 check("_rowid = 50", &[50]).await;
7863 check("_rowaddr = 50", &[50]).await;
7864 check("_rowoffset = 50", &[50]).await;
7865
7866 check(
7867 "_rowid = 50 OR _rowid = 51 OR _rowid = 52 OR _rowid = 49",
7868 &[49, 50, 51, 52],
7869 )
7870 .await;
7871 check(
7872 "_rowaddr = 50 OR _rowaddr = 51 OR _rowaddr = 52 OR _rowaddr = 49",
7873 &[49, 50, 51, 52],
7874 )
7875 .await;
7876 check(
7877 "_rowoffset = 50 OR _rowoffset = 51 OR _rowoffset = 52 OR _rowoffset = 49",
7878 &[49, 50, 51, 52],
7879 )
7880 .await;
7881
7882 check("_rowid IN (52, 51, 50, 17)", &[17, 50, 51, 52]).await;
7883 check("_rowaddr IN (52, 51, 50, 17)", &[17, 50, 51, 52]).await;
7884 check("_rowoffset IN (52, 51, 50, 17)", &[17, 50, 51, 52]).await;
7885
7886 check(&format!("_rowid = {}", row_ids.value(190)), &[]).await;
7890 check(&format!("_rowaddr = {}", row_addrs.value(190)), &[]).await;
7891 check("_rowoffset = 190", &[210]).await;
7894
7895 check(&format!("_rowid = {}", row_ids.value(250)), &[250]).await;
7897 check(&format!("_rowaddr = {}", row_addrs.value(250)), &[250]).await;
7898 check("_rowoffset = 250", &[270]).await;
7899
7900 check("_rowoffset = 1000", &[]).await;
7902
7903 check("_rowid IN (5, 10, 15) AND idx > 10", &[15]).await;
7905 check("_rowaddr IN (5, 10, 15) AND idx > 10", &[15]).await;
7906 check("_rowoffset IN (5, 10, 15) AND idx > 10", &[15]).await;
7907 check("idx > 10 AND _rowid IN (5, 10, 15)", &[15]).await;
7908 check("idx > 10 AND _rowaddr IN (5, 10, 15)", &[15]).await;
7909 check("idx > 10 AND _rowoffset IN (5, 10, 15)", &[15]).await;
7910 check("_rowid = 50 AND _rowid = 50", &[50]).await;
7912
7913 check_no_opt("_rowid = 50 AND _rowid = 51", &[]).await;
7915 check_no_opt("(_rowid = 50 AND idx < 100) OR _rowid = 51", &[50, 51]).await;
7916
7917 let mut scanner = ds.scan();
7919 scanner.filter("_rowoffset = 77").unwrap();
7920 scanner
7921 .project_with_transform(&[("foo", "idx * 2")])
7922 .unwrap();
7923 let stream = scanner.try_into_stream().await.unwrap();
7924 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
7925 assert_eq!(batches[0].schema().field(0).name(), "foo");
7926 let val = batches[0].column(0).as_primitive::<Int32Type>().values()[0];
7927 assert_eq!(val, 154);
7928 }
7929
7930 #[tokio::test]
7931 async fn test_nested_field_ordering() {
7932 use arrow_array::StructArray;
7933
7934 let id_array = Int32Array::from(vec![3, 1, 2]);
7936 let nested_values = Int32Array::from(vec![30, 10, 20]);
7937 let nested_struct = StructArray::from(vec![(
7938 Arc::new(ArrowField::new("value", DataType::Int32, false)),
7939 Arc::new(nested_values) as ArrayRef,
7940 )]);
7941
7942 let schema = Arc::new(ArrowSchema::new(vec![
7943 ArrowField::new("id", DataType::Int32, false),
7944 ArrowField::new(
7945 "nested",
7946 DataType::Struct(vec![ArrowField::new("value", DataType::Int32, false)].into()),
7947 false,
7948 ),
7949 ]));
7950
7951 let batch = RecordBatch::try_new(
7952 schema.clone(),
7953 vec![Arc::new(id_array), Arc::new(nested_struct)],
7954 )
7955 .unwrap();
7956
7957 let test_dir = TempStrDir::default();
7958 let test_uri = &test_dir;
7959 let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone());
7960
7961 let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
7962
7963 let mut scanner = dataset.scan();
7965 scanner
7966 .order_by(Some(vec![ColumnOrdering {
7967 column_name: "nested.value".to_string(),
7968 ascending: true,
7969 nulls_first: true,
7970 }]))
7971 .unwrap(); let stream = scanner.try_into_stream().await.unwrap();
7974 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
7975
7976 let sorted_ids = batches[0].column(0).as_primitive::<Int32Type>().values();
7978 assert_eq!(sorted_ids[0], 1); assert_eq!(sorted_ids[1], 2); assert_eq!(sorted_ids[2], 3); }
7982
7983 #[tokio::test]
7984 async fn test_limit_with_ordering_not_pushed_down() {
7985 let id_array = Int32Array::from(vec![5, 2, 8, 1, 3, 7, 4, 6]);
7991 let value_array = Int32Array::from(vec![50, 20, 80, 10, 30, 70, 40, 60]);
7992
7993 let schema = Arc::new(ArrowSchema::new(vec![
7994 ArrowField::new("id", DataType::Int32, false),
7995 ArrowField::new("value", DataType::Int32, false),
7996 ]));
7997
7998 let batch = RecordBatch::try_new(
7999 schema.clone(),
8000 vec![Arc::new(id_array), Arc::new(value_array)],
8001 )
8002 .unwrap();
8003
8004 let test_dir = TempStrDir::default();
8005 let test_uri = &test_dir;
8006 let reader = RecordBatchIterator::new(vec![Ok(batch)].into_iter(), schema.clone());
8007
8008 let dataset = Dataset::write(reader, test_uri, None).await.unwrap();
8009
8010 let mut scanner = dataset.scan();
8012 scanner
8013 .order_by(Some(vec![ColumnOrdering {
8014 column_name: "value".to_string(),
8015 ascending: true,
8016 nulls_first: true,
8017 }]))
8018 .unwrap();
8019 scanner.limit(Some(3), None).unwrap();
8020
8021 let stream = scanner.try_into_stream().await.unwrap();
8022 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
8023
8024 let sorted_ids = batches[0].column(0).as_primitive::<Int32Type>().values();
8026 let sorted_values = batches[0].column(1).as_primitive::<Int32Type>().values();
8027 assert_eq!(batches[0].num_rows(), 3);
8028 assert_eq!(sorted_ids[0], 1); assert_eq!(sorted_ids[1], 2); assert_eq!(sorted_ids[2], 3); assert_eq!(sorted_values[0], 10);
8032 assert_eq!(sorted_values[1], 20);
8033 assert_eq!(sorted_values[2], 30);
8034
8035 let mut scanner = dataset.scan();
8037 scanner
8038 .order_by(Some(vec![ColumnOrdering {
8039 column_name: "value".to_string(),
8040 ascending: true,
8041 nulls_first: true,
8042 }]))
8043 .unwrap();
8044 scanner.limit(Some(3), Some(2)).unwrap(); let stream = scanner.try_into_stream().await.unwrap();
8047 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
8048
8049 let sorted_ids = batches[0].column(0).as_primitive::<Int32Type>().values();
8050 let sorted_values = batches[0].column(1).as_primitive::<Int32Type>().values();
8051 assert_eq!(batches[0].num_rows(), 3);
8052 assert_eq!(sorted_ids[0], 3); assert_eq!(sorted_ids[1], 4); assert_eq!(sorted_ids[2], 5); assert_eq!(sorted_values[0], 30);
8056 assert_eq!(sorted_values[1], 40);
8057 assert_eq!(sorted_values[2], 50);
8058
8059 let mut scanner = dataset.scan();
8061 scanner.limit(Some(3), None).unwrap();
8062
8063 let stream = scanner.try_into_stream().await.unwrap();
8064 let batches = stream.try_collect::<Vec<_>>().await.unwrap();
8065
8066 assert_eq!(batches[0].num_rows(), 3);
8068 let unsorted_values = batches[0].column(1).as_primitive::<Int32Type>().values();
8069 assert_eq!(unsorted_values[0], 50);
8071 assert_eq!(unsorted_values[1], 20);
8072 assert_eq!(unsorted_values[2], 80);
8073 }
8074}