1use crate::Any;
16use crate::pbold;
17use crate::scalar::expression::{SargableQueryParser, ScalarQueryParser};
18use crate::scalar::registry::{
19 ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest,
20};
21use crate::scalar::{
22 BuiltinIndexType, CreatedIndex, SargableQuery, ScalarIndexParams, UpdateCriteria,
23 compute_next_prefix,
24};
25use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
26use datafusion_expr::Accumulator;
27use lance_core::cache::{LanceCache, WeakLanceCache};
28use serde::{Deserialize, Serialize};
29use std::sync::LazyLock;
30
31use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array, new_empty_array};
32use arrow_schema::{DataType, Field};
33use datafusion::execution::SendableRecordBatchStream;
34use datafusion_common::ScalarValue;
35use std::{collections::HashMap, sync::Arc};
36
37use super::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult};
38use crate::scalar::FragReuseIndex;
39use crate::vector::VectorIndex;
40use crate::{Index, IndexType};
41use async_trait::async_trait;
42use deepsize::DeepSizeOf;
43use lance_core::Error;
44use lance_core::Result;
45use roaring::RoaringBitmap;
46
47use super::zoned::{ZoneBound, ZoneProcessor, ZoneTrainer, rebuild_zones, search_zones};
48const ROWS_PER_ZONE_DEFAULT: u64 = 8192; const ZONEMAP_FILENAME: &str = "zonemap.lance";
51const ZONEMAP_SIZE_META_KEY: &str = "rows_per_zone";
52const ZONEMAP_INDEX_VERSION: u32 = 0;
53
54#[derive(Debug, PartialEq, Clone)]
56struct ZoneMapStatistics {
57 min: ScalarValue,
58 max: ScalarValue,
59 null_count: u32,
60 nan_count: u32,
62 bound: ZoneBound,
65}
66
67impl DeepSizeOf for ZoneMapStatistics {
68 fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
69 let min_size = self.min.size() - std::mem::size_of::<ScalarValue>();
71 let max_size = self.max.size() - std::mem::size_of::<ScalarValue>();
72
73 min_size + max_size
74 }
75}
76
77impl AsRef<ZoneBound> for ZoneMapStatistics {
78 fn as_ref(&self) -> &ZoneBound {
79 &self.bound
80 }
81}
82
83pub struct ZoneMapIndex {
105 zones: Vec<ZoneMapStatistics>,
106 data_type: DataType,
107 rows_per_zone: u64,
109 store: Arc<dyn IndexStore>,
110 fri: Option<Arc<FragReuseIndex>>,
111 index_cache: WeakLanceCache,
112}
113
114impl std::fmt::Debug for ZoneMapIndex {
115 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116 f.debug_struct("ZoneMapIndex")
117 .field("zones", &self.zones)
118 .field("data_type", &self.data_type)
119 .field("rows_per_zone", &self.rows_per_zone)
120 .field("store", &self.store)
121 .field("fri", &self.fri)
122 .field("index_cache", &self.index_cache)
123 .finish()
124 }
125}
126
127impl DeepSizeOf for ZoneMapIndex {
128 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
129 self.zones.deep_size_of_children(context)
130 }
131}
132
133impl ZoneMapIndex {
134 fn evaluate_zone_against_query(
138 &self,
139 zone: &ZoneMapStatistics,
140 query: &SargableQuery,
141 ) -> Result<bool> {
142 use std::ops::Bound;
143
144 match query {
145 SargableQuery::IsNull() => {
146 Ok(zone.null_count > 0)
148 }
149 SargableQuery::Equals(target) => {
150 if target.is_null() {
153 return Ok(zone.null_count > 0);
154 }
155
156 let is_nan = match target {
158 ScalarValue::Float16(Some(f)) => f.is_nan(),
159 ScalarValue::Float32(Some(f)) => f.is_nan(),
160 ScalarValue::Float64(Some(f)) => f.is_nan(),
161 _ => false,
162 };
163
164 if is_nan {
165 return Ok(zone.nan_count > 0);
166 }
167
168 let min_check = target >= &zone.min;
171 let max_check = match &zone.max {
172 ScalarValue::Float16(Some(f)) if f.is_nan() => true,
173 ScalarValue::Float32(Some(f)) if f.is_nan() => true,
174 ScalarValue::Float64(Some(f)) if f.is_nan() => true,
175 _ => target <= &zone.max,
176 };
177 Ok(min_check && max_check)
178 }
179 SargableQuery::Range(start, end) => {
180 let zone_min = &zone.min;
183 let zone_max = &zone.max;
184
185 let start_check = match start {
186 Bound::Unbounded => true,
187 Bound::Included(s) => {
188 match s {
190 ScalarValue::Float16(Some(f)) => {
191 if f.is_nan() {
192 return Ok(zone.nan_count > 0);
193 }
194 }
195 ScalarValue::Float32(Some(f)) => {
196 if f.is_nan() {
197 return Ok(zone.nan_count > 0);
198 }
199 }
200 ScalarValue::Float64(Some(f)) => {
201 if f.is_nan() {
202 return Ok(zone.nan_count > 0);
203 }
204 }
205 _ => {}
206 }
207 match zone_max {
211 ScalarValue::Float16(Some(f)) if f.is_nan() => true,
212 ScalarValue::Float32(Some(f)) if f.is_nan() => true,
213 ScalarValue::Float64(Some(f)) if f.is_nan() => true,
214 _ => zone_max >= s,
215 }
216 }
217 Bound::Excluded(s) => {
218 match s {
220 ScalarValue::Float16(Some(f)) => {
221 if f.is_nan() {
222 return Ok(false); }
224 }
225 ScalarValue::Float32(Some(f)) => {
226 if f.is_nan() {
227 return Ok(false); }
229 }
230 ScalarValue::Float64(Some(f)) => {
231 if f.is_nan() {
232 return Ok(false); }
234 }
235 _ => {}
236 }
237 zone_max > s
238 }
239 };
240
241 let end_check = match end {
242 Bound::Unbounded => true,
243 Bound::Included(e) => {
244 match e {
246 ScalarValue::Float16(Some(f)) => {
247 if f.is_nan() {
248 return Ok(zone.nan_count > 0 || zone_min <= e);
250 }
251 }
252 ScalarValue::Float32(Some(f)) => {
253 if f.is_nan() {
254 return Ok(zone.nan_count > 0 || zone_min <= e);
255 }
256 }
257 ScalarValue::Float64(Some(f)) => {
258 if f.is_nan() {
259 return Ok(zone.nan_count > 0 || zone_min <= e);
260 }
261 }
262 _ => {}
263 }
264 zone_min <= e
265 }
266 Bound::Excluded(e) => {
267 match e {
269 ScalarValue::Float16(Some(f)) => {
270 if f.is_nan() {
271 return Ok(true);
273 }
274 }
275 ScalarValue::Float32(Some(f)) => {
276 if f.is_nan() {
277 return Ok(true);
278 }
279 }
280 ScalarValue::Float64(Some(f)) => {
281 if f.is_nan() {
282 return Ok(true);
283 }
284 }
285 _ => {}
286 }
287 zone_min < e
288 }
289 };
290
291 Ok(start_check && end_check)
292 }
293 SargableQuery::IsIn(values) => {
294 Ok(values.iter().any(|value| {
296 if value.is_null() {
297 zone.null_count > 0
298 } else {
299 match value {
300 ScalarValue::Float16(Some(f)) => {
301 if f.is_nan() {
302 zone.nan_count > 0
303 } else {
304 value >= &zone.min && value <= &zone.max
305 }
306 }
307 ScalarValue::Float32(Some(f)) => {
308 if f.is_nan() {
309 zone.nan_count > 0
310 } else {
311 value >= &zone.min && value <= &zone.max
312 }
313 }
314 ScalarValue::Float64(Some(f)) => {
315 if f.is_nan() {
316 zone.nan_count > 0
317 } else {
318 value >= &zone.min && value <= &zone.max
319 }
320 }
321 _ => value >= &zone.min && value <= &zone.max,
322 }
323 }
324 }))
325 }
326 SargableQuery::FullTextSearch(_) => Err(Error::not_supported_source(
327 "full text search is not supported for zonemap indexes".into(),
328 )),
329 SargableQuery::LikePrefix(prefix) => {
330 let prefix_str = match prefix {
340 ScalarValue::Utf8(Some(s)) => s.as_str(),
341 ScalarValue::LargeUtf8(Some(s)) => s.as_str(),
342 _ => return Ok(true), };
344
345 if prefix_str.is_empty() {
347 return Ok(true);
348 }
349
350 let max_check = &zone.max >= prefix;
352 if !max_check {
353 return Ok(false);
354 }
355
356 let next_prefix = compute_next_prefix(prefix_str);
359
360 match next_prefix {
361 Some(next) => {
362 let next_scalar = match prefix {
364 ScalarValue::Utf8(_) => ScalarValue::Utf8(Some(next)),
365 ScalarValue::LargeUtf8(_) => ScalarValue::LargeUtf8(Some(next)),
366 _ => return Ok(true),
367 };
368 Ok(zone.min < next_scalar)
369 }
370 None => {
371 Ok(true)
373 }
374 }
375 }
376 }
377 }
378
379 async fn load(
381 store: Arc<dyn IndexStore>,
382 fri: Option<Arc<FragReuseIndex>>,
383 index_cache: &LanceCache,
384 ) -> Result<Arc<Self>>
385 where
386 Self: Sized,
387 {
388 let index_file = store.open_index_file(ZONEMAP_FILENAME).await?;
389 let zone_maps = index_file
390 .read_range(0..index_file.num_rows(), None)
391 .await?;
392 let file_schema = index_file.schema();
393
394 let rows_per_zone: u64 = file_schema
395 .metadata
396 .get(ZONEMAP_SIZE_META_KEY)
397 .and_then(|bs| bs.parse().ok())
398 .unwrap_or(ROWS_PER_ZONE_DEFAULT);
399 Ok(Arc::new(Self::try_from_serialized(
400 zone_maps,
401 store,
402 fri,
403 index_cache,
404 rows_per_zone,
405 )?))
406 }
407
408 fn try_from_serialized(
409 data: RecordBatch,
410 store: Arc<dyn IndexStore>,
411 fri: Option<Arc<FragReuseIndex>>,
412 index_cache: &LanceCache,
413 rows_per_zone: u64,
414 ) -> Result<Self> {
415 let min_col = data
417 .column_by_name("min")
418 .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'min' column"))?;
419 let max_col = data
420 .column_by_name("max")
421 .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'max' column"))?;
422 let null_count_col = data
423 .column_by_name("null_count")
424 .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'null_count' column"))?
425 .as_any()
426 .downcast_ref::<arrow_array::UInt32Array>()
427 .ok_or_else(|| {
428 Error::invalid_input("ZoneMapIndex: 'null_count' column is not UInt32")
429 })?;
430 let nan_count_col = data
431 .column_by_name("nan_count")
432 .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'nan_count' column"))?
433 .as_any()
434 .downcast_ref::<arrow_array::UInt32Array>()
435 .ok_or_else(|| {
436 Error::invalid_input("ZoneMapIndex: 'nan_count' column is not UInt32")
437 })?;
438 let zone_length = data
439 .column_by_name("zone_length")
440 .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'zone_length' column"))?
441 .as_any()
442 .downcast_ref::<arrow_array::UInt64Array>()
443 .ok_or_else(|| {
444 Error::invalid_input("ZoneMapIndex: 'zone_length' column is not UInt64")
445 })?;
446
447 let fragment_id_col = data
448 .column_by_name("fragment_id")
449 .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'fragment_id' column"))?
450 .as_any()
451 .downcast_ref::<arrow_array::UInt64Array>()
452 .ok_or_else(|| {
453 Error::invalid_input("ZoneMapIndex: 'fragment_id' column is not UInt64")
454 })?;
455
456 let zone_start_col = data
457 .column_by_name("zone_start")
458 .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'zone_start' column"))?
459 .as_any()
460 .downcast_ref::<arrow_array::UInt64Array>()
461 .ok_or_else(|| {
462 Error::invalid_input("ZoneMapIndex: 'zone_start' column is not UInt64")
463 })?;
464
465 let data_type = min_col.data_type().clone();
466
467 if data.num_rows() == 0 {
468 return Ok(Self {
469 zones: Vec::new(),
470 data_type,
471 rows_per_zone,
472 store,
473 fri,
474 index_cache: WeakLanceCache::from(index_cache),
475 });
476 }
477
478 let num_zones = data.num_rows();
479 let mut zones = Vec::with_capacity(num_zones);
480
481 for i in 0..num_zones {
482 let min = ScalarValue::try_from_array(min_col, i)?;
483 let max = ScalarValue::try_from_array(max_col, i)?;
484 let null_count = null_count_col.value(i);
485 let nan_count = nan_count_col.value(i);
486 zones.push(ZoneMapStatistics {
487 min,
488 max,
489 null_count,
490 nan_count,
491 bound: ZoneBound {
492 fragment_id: fragment_id_col.value(i),
493 start: zone_start_col.value(i),
494 length: zone_length.value(i) as usize,
495 },
496 });
497 }
498
499 Ok(Self {
500 zones,
501 data_type,
502 rows_per_zone,
503 store,
504 fri,
505 index_cache: WeakLanceCache::from(index_cache),
506 })
507 }
508}
509
510#[async_trait]
511impl Index for ZoneMapIndex {
512 fn as_any(&self) -> &dyn Any {
513 self
514 }
515
516 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
517 self
518 }
519
520 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn VectorIndex>> {
521 Err(Error::invalid_input_source(
522 "ZoneMapIndex is not a vector index".into(),
523 ))
524 }
525
526 async fn prewarm(&self) -> Result<()> {
527 Ok(())
529 }
530
531 fn statistics(&self) -> Result<serde_json::Value> {
532 Ok(serde_json::json!({
533 "num_zones": self.zones.len(),
534 "rows_per_zone": self.rows_per_zone,
535 }))
536 }
537
538 fn index_type(&self) -> IndexType {
539 IndexType::ZoneMap
540 }
541
542 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
543 let mut frag_ids = RoaringBitmap::new();
544
545 for zone in &self.zones {
547 frag_ids.insert(zone.bound.fragment_id as u32);
548 }
549
550 Ok(frag_ids)
551 }
552}
553
554#[async_trait]
555impl ScalarIndex for ZoneMapIndex {
556 async fn search(
557 &self,
558 query: &dyn AnyQuery,
559 metrics: &dyn MetricsCollector,
560 ) -> Result<SearchResult> {
561 let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
562 search_zones(&self.zones, metrics, |zone| {
563 self.evaluate_zone_against_query(zone, query)
564 })
565 }
566
567 fn can_remap(&self) -> bool {
568 false
569 }
570
571 async fn remap(
573 &self,
574 _mapping: &HashMap<u64, Option<u64>>,
575 _dest_store: &dyn IndexStore,
576 ) -> Result<CreatedIndex> {
577 Err(Error::invalid_input_source(
578 "ZoneMapIndex does not support remap".into(),
579 ))
580 }
581
582 async fn update(
584 &self,
585 new_data: SendableRecordBatchStream,
586 dest_store: &dyn IndexStore,
587 _old_data_filter: Option<super::OldIndexDataFilter>,
588 ) -> Result<CreatedIndex> {
589 let schema = new_data.schema();
591 let value_type = schema.field(0).data_type().clone();
592
593 let options = ZoneMapIndexBuilderParams::new(self.rows_per_zone);
594 let processor = ZoneMapProcessor::new(value_type.clone())?;
595 let trainer = ZoneTrainer::new(processor, self.rows_per_zone)?;
596 let updated_zones = rebuild_zones(&self.zones, trainer, new_data).await?;
597
598 let mut builder = ZoneMapIndexBuilder::try_new(options, self.data_type.clone())?;
600 builder.options.rows_per_zone = self.rows_per_zone;
601 builder.maps = updated_zones;
602 builder.write_index(dest_store).await?;
603
604 Ok(CreatedIndex {
605 index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default())
606 .unwrap(),
607 index_version: ZONEMAP_INDEX_VERSION,
608 files: Some(dest_store.list_files_with_sizes().await?),
609 })
610 }
611
612 fn update_criteria(&self) -> UpdateCriteria {
613 UpdateCriteria::only_new_data(
614 TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(),
615 )
616 }
617
618 fn derive_index_params(&self) -> Result<ScalarIndexParams> {
619 let params = serde_json::to_value(ZoneMapIndexBuilderParams::new(self.rows_per_zone))?;
620 Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(¶ms))
621 }
622}
623
624fn default_rows_per_zone() -> u64 {
625 *DEFAULT_ROWS_PER_ZONE
626}
627
628#[derive(Debug, Clone, Serialize, Deserialize)]
629pub struct ZoneMapIndexBuilderParams {
630 #[serde(default = "default_rows_per_zone")]
631 rows_per_zone: u64,
632}
633
634static DEFAULT_ROWS_PER_ZONE: LazyLock<u64> = LazyLock::new(|| {
635 std::env::var("LANCE_ZONEMAP_DEFAULT_ROWS_PER_ZONE")
636 .unwrap_or_else(|_| (ROWS_PER_ZONE_DEFAULT).to_string())
637 .parse()
638 .expect("failed to parse LANCE_ZONEMAP_DEFAULT_ROWS_PER_ZONE")
639});
640
641impl Default for ZoneMapIndexBuilderParams {
642 fn default() -> Self {
643 Self {
644 rows_per_zone: *DEFAULT_ROWS_PER_ZONE,
645 }
646 }
647}
648
649impl ZoneMapIndexBuilderParams {
650 pub fn new(rows_per_zone: u64) -> Self {
651 Self { rows_per_zone }
652 }
653
654 pub fn rows_per_zone(&self) -> u64 {
655 self.rows_per_zone
656 }
657}
658
659pub struct ZoneMapIndexBuilder {
661 options: ZoneMapIndexBuilderParams,
662
663 items_type: DataType,
664 maps: Vec<ZoneMapStatistics>,
665}
666
667impl ZoneMapIndexBuilder {
668 pub fn try_new(options: ZoneMapIndexBuilderParams, items_type: DataType) -> Result<Self> {
669 Ok(Self {
670 options,
671 items_type,
672 maps: Vec::new(),
673 })
674 }
675
676 pub async fn train(&mut self, batches_source: SendableRecordBatchStream) -> Result<()> {
680 let processor = ZoneMapProcessor::new(self.items_type.clone())?;
681 let trainer = ZoneTrainer::new(processor, self.options.rows_per_zone)?;
682 self.maps = trainer.train(batches_source).await?;
683 Ok(())
684 }
685
686 fn zonemap_stats_as_batch(&self) -> Result<RecordBatch> {
687 let mins = if self.maps.is_empty() {
689 new_empty_array(&self.items_type)
690 } else {
691 ScalarValue::iter_to_array(self.maps.iter().map(|stat| stat.min.clone()))?
692 };
693 let maxs = if self.maps.is_empty() {
694 new_empty_array(&self.items_type)
695 } else {
696 ScalarValue::iter_to_array(self.maps.iter().map(|stat| stat.max.clone()))?
697 };
698 let null_counts =
699 UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.null_count));
700
701 let nan_counts = UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.nan_count));
702
703 let fragment_ids =
704 UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.bound.fragment_id));
705
706 let zone_lengths =
707 UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.bound.length as u64));
708
709 let zone_starts =
710 UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.bound.start));
711
712 let schema = Arc::new(arrow_schema::Schema::new(vec![
713 Field::new("min", self.items_type.clone(), true),
715 Field::new("max", self.items_type.clone(), true),
716 Field::new("null_count", DataType::UInt32, false),
717 Field::new("nan_count", DataType::UInt32, false),
718 Field::new("fragment_id", DataType::UInt64, false),
719 Field::new("zone_start", DataType::UInt64, false),
720 Field::new("zone_length", DataType::UInt64, false),
721 ]));
722
723 let columns: Vec<ArrayRef> = vec![
724 mins,
725 maxs,
726 Arc::new(null_counts) as ArrayRef,
727 Arc::new(nan_counts) as ArrayRef,
728 Arc::new(fragment_ids) as ArrayRef,
729 Arc::new(zone_starts) as ArrayRef,
730 Arc::new(zone_lengths) as ArrayRef,
731 ];
732 Ok(RecordBatch::try_new(schema, columns)?)
733 }
734
735 pub async fn write_index(self, index_store: &dyn IndexStore) -> Result<()> {
736 let record_batch = self.zonemap_stats_as_batch()?;
737
738 let mut file_schema = record_batch.schema().as_ref().clone();
739 file_schema.metadata.insert(
740 ZONEMAP_SIZE_META_KEY.to_string(),
741 self.options.rows_per_zone.to_string(),
742 );
743
744 let mut index_file = index_store
745 .new_index_file(ZONEMAP_FILENAME, Arc::new(file_schema))
746 .await?;
747 index_file.write_record_batch(record_batch).await?;
748 index_file.finish().await?;
749 Ok(())
750 }
751}
752
753struct ZoneMapProcessor {
756 data_type: DataType,
757 min: MinAccumulator,
758 max: MaxAccumulator,
759 null_count: u32,
760 nan_count: u32,
761}
762
763impl ZoneMapProcessor {
764 fn new(data_type: DataType) -> Result<Self> {
765 let min = MinAccumulator::try_new(&data_type)?;
766 let max = MaxAccumulator::try_new(&data_type)?;
767 Ok(Self {
768 data_type,
769 min,
770 max,
771 null_count: 0,
772 nan_count: 0,
773 })
774 }
775
776 fn count_nans(array: &ArrayRef) -> u32 {
777 match array.data_type() {
778 DataType::Float16 => {
779 let array = array
780 .as_any()
781 .downcast_ref::<arrow_array::Float16Array>()
782 .unwrap();
783 array.values().iter().filter(|&&x| x.is_nan()).count() as u32
784 }
785 DataType::Float32 => {
786 let array = array
787 .as_any()
788 .downcast_ref::<arrow_array::Float32Array>()
789 .unwrap();
790 array.values().iter().filter(|&&x| x.is_nan()).count() as u32
791 }
792 DataType::Float64 => {
793 let array = array
794 .as_any()
795 .downcast_ref::<arrow_array::Float64Array>()
796 .unwrap();
797 array.values().iter().filter(|&&x| x.is_nan()).count() as u32
798 }
799 _ => 0,
800 }
801 }
802}
803
804impl ZoneProcessor for ZoneMapProcessor {
805 type ZoneStatistics = ZoneMapStatistics;
806
807 fn process_chunk(&mut self, array: &ArrayRef) -> Result<()> {
808 self.null_count += array.null_count() as u32;
809 self.nan_count += Self::count_nans(array);
810 self.min.update_batch(std::slice::from_ref(array))?;
811 self.max.update_batch(std::slice::from_ref(array))?;
812 Ok(())
813 }
814
815 fn finish_zone(&mut self, bound: ZoneBound) -> Result<Self::ZoneStatistics> {
816 Ok(ZoneMapStatistics {
817 min: self.min.evaluate()?,
818 max: self.max.evaluate()?,
819 null_count: self.null_count,
820 nan_count: self.nan_count,
821 bound,
822 })
823 }
824
825 fn reset(&mut self) -> Result<()> {
826 self.min = MinAccumulator::try_new(&self.data_type)?;
827 self.max = MaxAccumulator::try_new(&self.data_type)?;
828 self.null_count = 0;
829 self.nan_count = 0;
830 Ok(())
831 }
832}
833
834#[derive(Debug, Default)]
835pub struct ZoneMapIndexPlugin;
836
837impl ZoneMapIndexPlugin {
838 async fn train_zonemap_index(
839 batches_source: SendableRecordBatchStream,
840 index_store: &dyn IndexStore,
841 options: Option<ZoneMapIndexBuilderParams>,
842 ) -> Result<()> {
843 let value_type = batches_source.schema().field(0).data_type().clone();
845
846 let mut builder = ZoneMapIndexBuilder::try_new(options.unwrap_or_default(), value_type)?;
847
848 builder.train(batches_source).await?;
849
850 builder.write_index(index_store).await?;
851 Ok(())
852 }
853}
854
855pub struct ZoneMapIndexTrainingRequest {
856 pub params: ZoneMapIndexBuilderParams,
857 pub criteria: TrainingCriteria,
858}
859
860impl ZoneMapIndexTrainingRequest {
861 pub fn new(params: ZoneMapIndexBuilderParams) -> Self {
862 Self {
863 params,
864 criteria: TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(),
865 }
866 }
867}
868
869impl TrainingRequest for ZoneMapIndexTrainingRequest {
870 fn as_any(&self) -> &dyn std::any::Any {
871 self
872 }
873 fn criteria(&self) -> &TrainingCriteria {
874 &self.criteria
875 }
876}
877
878#[async_trait]
879impl ScalarIndexPlugin for ZoneMapIndexPlugin {
880 fn name(&self) -> &str {
881 "ZoneMap"
882 }
883
884 fn new_training_request(
885 &self,
886 params: &str,
887 field: &Field,
888 ) -> Result<Box<dyn TrainingRequest>> {
889 if field.data_type().is_nested() {
890 return Err(Error::invalid_input_source(
891 "A zone map index can only be created on a non-nested field.".into(),
892 ));
893 }
894
895 let params = serde_json::from_str::<ZoneMapIndexBuilderParams>(params)?;
896
897 Ok(Box::new(ZoneMapIndexTrainingRequest::new(params)))
898 }
899
900 fn provides_exact_answer(&self) -> bool {
901 false
902 }
903
904 fn version(&self) -> u32 {
905 ZONEMAP_INDEX_VERSION
906 }
907
908 fn new_query_parser(
909 &self,
910 index_name: String,
911 _index_details: &prost_types::Any,
912 ) -> Option<Box<dyn ScalarQueryParser>> {
913 Some(Box::new(SargableQueryParser::new(index_name, true)))
914 }
915
916 async fn train_index(
917 &self,
918 data: SendableRecordBatchStream,
919 index_store: &dyn IndexStore,
920 request: Box<dyn TrainingRequest>,
921 _fragment_ids: Option<Vec<u32>>,
922 _progress: Arc<dyn crate::progress::IndexBuildProgress>,
923 ) -> Result<CreatedIndex> {
924 let request = (request as Box<dyn std::any::Any>)
925 .downcast::<ZoneMapIndexTrainingRequest>()
926 .map_err(|_| {
927 Error::invalid_input_source(
928 "must provide training request created by new_training_request".into(),
929 )
930 })?;
931 Self::train_zonemap_index(data, index_store, Some(request.params)).await?;
932 Ok(CreatedIndex {
933 index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default())
934 .unwrap(),
935 index_version: ZONEMAP_INDEX_VERSION,
936 files: Some(index_store.list_files_with_sizes().await?),
937 })
938 }
939
940 async fn load_index(
941 &self,
942 index_store: Arc<dyn IndexStore>,
943 _index_details: &prost_types::Any,
944 frag_reuse_index: Option<Arc<FragReuseIndex>>,
945 cache: &LanceCache,
946 ) -> Result<Arc<dyn ScalarIndex>> {
947 Ok(ZoneMapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
948 }
949}
950
951#[cfg(test)]
952mod tests {
953 use crate::scalar::registry::VALUE_COLUMN_NAME;
954 use crate::scalar::{IndexStore, zonemap::ROWS_PER_ZONE_DEFAULT};
955 use std::sync::Arc;
956
957 use crate::scalar::zoned::ZoneBound;
958 use crate::scalar::zonemap::{ZoneMapIndexPlugin, ZoneMapStatistics};
959 use arrow::datatypes::Float32Type;
960 use arrow_array::{Array, RecordBatch, UInt64Array, record_batch};
961 use arrow_schema::{DataType, Field, Schema};
962 use datafusion::execution::SendableRecordBatchStream;
963 use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
964 use datafusion_common::ScalarValue;
965 use futures::{StreamExt, TryStreamExt, stream};
966 use lance_core::utils::mask::NullableRowAddrSet;
967 use lance_core::utils::tempfile::TempObjDir;
968 use lance_core::{
969 ROW_ADDR,
970 cache::{LanceCache, WeakLanceCache},
971 utils::mask::RowAddrTreeMap,
972 };
973 use lance_datafusion::datagen::DatafusionDatagenExt;
974 use lance_datagen::ArrayGeneratorExt;
975 use lance_datagen::{BatchCount, RowCount, array};
976 use lance_io::object_store::ObjectStore;
977
978 use crate::scalar::{
979 SargableQuery, ScalarIndex, SearchResult,
980 lance_format::LanceIndexStore,
981 zonemap::{
982 ZONEMAP_FILENAME, ZONEMAP_SIZE_META_KEY, ZoneMapIndex, ZoneMapIndexBuilderParams,
983 },
984 };
985
986 use crate::Index; use crate::metrics::NoOpMetricsCollector;
989 use roaring::RoaringBitmap; use std::collections::Bound;
991
992 fn add_row_addr(stream: SendableRecordBatchStream) -> SendableRecordBatchStream {
994 let schema = stream.schema();
995 let schema_with_row_addr = Arc::new(Schema::new(vec![
996 schema.field(0).clone(),
997 Field::new(ROW_ADDR, DataType::UInt64, false),
998 ]));
999 let schema = schema_with_row_addr.clone();
1000 let stream = stream.enumerate().map(move |(frag_id, batch)| {
1001 let batch = batch.unwrap();
1002 let row_addr = Arc::new(UInt64Array::from_iter_values(
1003 (0..batch.num_rows() as u64).map(|off| off + ((frag_id as u64) << 32)),
1004 ));
1005 Ok(RecordBatch::try_new(
1006 schema_with_row_addr.clone(),
1007 vec![batch.column(0).clone(), row_addr],
1008 )?)
1009 });
1010 Box::pin(RecordBatchStreamAdapter::new(schema, stream))
1011 }
1012
1013 #[tokio::test]
1014 async fn test_empty_zonemap_index() {
1015 let tmpdir = TempObjDir::default();
1016 let test_store = Arc::new(LanceIndexStore::new(
1017 Arc::new(ObjectStore::local()),
1018 tmpdir.clone(),
1019 Arc::new(LanceCache::no_cache()),
1020 ));
1021
1022 let data = arrow_array::Int32Array::from(Vec::<i32>::new());
1023 let row_ids = arrow_array::UInt64Array::from(Vec::<u64>::new());
1024 let schema = Arc::new(Schema::new(vec![
1025 Field::new(VALUE_COLUMN_NAME, DataType::Int32, false),
1026 Field::new(ROW_ADDR, DataType::UInt64, false),
1027 ]));
1028 let data =
1029 RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap();
1030
1031 let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1032 schema,
1033 stream::once(std::future::ready(Ok(data))),
1034 ));
1035
1036 ZoneMapIndexPlugin::train_zonemap_index(data_stream, test_store.as_ref(), None)
1037 .await
1038 .unwrap();
1039
1040 log::debug!("Successfully wrote the index file");
1041
1042 let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1044 .await
1045 .expect("Failed to load ZoneMapIndex");
1046 assert_eq!(index.zones.len(), 0);
1047 assert_eq!(index.data_type, DataType::Int32);
1048 assert_eq!(index.rows_per_zone, ROWS_PER_ZONE_DEFAULT);
1049
1050 let query = SargableQuery::Equals(ScalarValue::Int32(None));
1052 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1053 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1054 }
1055
1056 #[tokio::test]
1057 async fn test_null_zonemap_index() {
1059 let tmpdir = TempObjDir::default();
1060 let test_store = Arc::new(LanceIndexStore::new(
1061 Arc::new(ObjectStore::local()),
1062 tmpdir.clone(),
1063 Arc::new(LanceCache::no_cache()),
1064 ));
1065
1066 let stream = lance_datagen::gen_batch()
1067 .col(
1068 VALUE_COLUMN_NAME,
1069 array::rand::<Float32Type>().with_nulls(&[true, false, false, false, false]),
1070 )
1071 .into_df_stream(RowCount::from(5000), BatchCount::from(10));
1072
1073 let stream = add_row_addr(stream);
1075
1076 ZoneMapIndexPlugin::train_zonemap_index(
1077 stream,
1078 test_store.as_ref(),
1079 Some(ZoneMapIndexBuilderParams::new(5000)),
1080 )
1081 .await
1082 .unwrap();
1083
1084 log::debug!("Successfully wrote the index file");
1085
1086 let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1088 .await
1089 .expect("Failed to load ZoneMapIndex");
1090 assert_eq!(index.zones.len(), 10);
1091 for (i, zone) in index.zones.iter().enumerate() {
1092 assert_eq!(zone.null_count, 1000);
1093 assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
1094 assert_eq!(zone.bound.length, 5000);
1095 assert_eq!(zone.bound.fragment_id, i as u64);
1096 }
1097
1098 let query = SargableQuery::Equals(ScalarValue::Int32(None));
1100 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1101
1102 let mut expected = RowAddrTreeMap::new();
1104 for fragment_id in 0..10 {
1105 let start = (fragment_id as u64) << 32;
1106 let end = start + 5000;
1107 expected.insert_range(start..end);
1108 }
1109 assert_eq!(result, SearchResult::at_most(expected));
1110
1111 let new_data =
1113 arrow_array::Float32Array::from_iter_values((0..5000).map(|i| i as f32 / 1000.0));
1114 let new_row_addr =
1116 UInt64Array::from_iter_values((0..5000).map(|i| (10u64 << 32) | (i as u64)));
1117 let new_schema = Arc::new(Schema::new(vec![
1118 Field::new(VALUE_COLUMN_NAME, DataType::Float32, false), Field::new(ROW_ADDR, DataType::UInt64, false), ]));
1121 let new_data_batch = RecordBatch::try_new(
1122 new_schema.clone(),
1123 vec![Arc::new(new_data), Arc::new(new_row_addr)],
1124 )
1125 .unwrap();
1126 let new_data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1127 new_schema,
1128 stream::once(std::future::ready(Ok(new_data_batch))),
1129 ));
1130
1131 index
1134 .update(new_data_stream, test_store.as_ref(), None)
1135 .await
1136 .unwrap();
1137
1138 let updated_index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1140 .await
1141 .expect("Failed to load updated ZoneMapIndex");
1142
1143 assert_eq!(updated_index.zones.len(), 11);
1145
1146 let new_zone = &updated_index.zones[10]; assert_eq!(new_zone.bound.fragment_id, 10u64); assert_eq!(new_zone.bound.length, 5000);
1150 assert_eq!(new_zone.null_count, 0); assert_eq!(new_zone.nan_count, 0); let query = SargableQuery::Equals(ScalarValue::Float32(None));
1155 let result = updated_index
1156 .search(&query, &NoOpMetricsCollector)
1157 .await
1158 .unwrap();
1159
1160 let mut expected = RowAddrTreeMap::new();
1162 for fragment_id in 0..10 {
1163 let start = (fragment_id as u64) << 32;
1164 let end = start + 5000;
1165 expected.insert_range(start..end);
1166 }
1167 assert_eq!(result, SearchResult::at_most(expected));
1168
1169 let query = SargableQuery::Equals(ScalarValue::Float32(Some(2.5))); let result = updated_index
1172 .search(&query, &NoOpMetricsCollector)
1173 .await
1174 .unwrap();
1175
1176 let mut expected = RowAddrTreeMap::new();
1178 let start = 10u64 << 32;
1179 let end = start + 5000;
1180 expected.insert_range(start..end);
1181 assert_eq!(result, SearchResult::at_most(expected));
1182 }
1183
1184 #[tokio::test]
1185 async fn test_zonemap_null_handling_in_queries() {
1186 let tmpdir = TempObjDir::default();
1188 let store = Arc::new(LanceIndexStore::new(
1189 Arc::new(ObjectStore::local()),
1190 tmpdir.clone(),
1191 Arc::new(LanceCache::no_cache()),
1192 ));
1193
1194 let batch = record_batch!(
1196 (VALUE_COLUMN_NAME, Int64, [Some(0), Some(5), None]),
1197 (ROW_ADDR, UInt64, [0, 1, 2])
1198 )
1199 .unwrap();
1200 let schema = batch.schema();
1201 let stream = stream::once(async move { Ok(batch) });
1202 let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1203
1204 ZoneMapIndexPlugin::train_zonemap_index(stream, store.as_ref(), None)
1206 .await
1207 .unwrap();
1208
1209 let cache = LanceCache::with_capacity(1024 * 1024);
1210 let index = ZoneMapIndex::load(store.clone(), None, &cache)
1211 .await
1212 .unwrap();
1213
1214 let query = SargableQuery::Equals(ScalarValue::Int64(Some(5)));
1217 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1218
1219 match result {
1220 SearchResult::AtMost(row_ids) => {
1221 let all_rows: Vec<u64> = row_ids
1224 .true_rows()
1225 .row_addrs()
1226 .unwrap()
1227 .map(u64::from)
1228 .collect();
1229 assert_eq!(
1230 all_rows,
1231 vec![0, 1, 2],
1232 "Should return all rows (including nulls) since ZoneMap is inexact"
1233 );
1234
1235 }
1238 _ => panic!("Expected AtMost search result from zonemap"),
1239 }
1240
1241 let query = SargableQuery::Range(
1243 std::ops::Bound::Included(ScalarValue::Int64(Some(0))),
1244 std::ops::Bound::Included(ScalarValue::Int64(Some(3))),
1245 );
1246 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1247
1248 match result {
1249 SearchResult::AtMost(row_ids) => {
1250 let all_rows: Vec<u64> = row_ids
1252 .true_rows()
1253 .row_addrs()
1254 .unwrap()
1255 .map(u64::from)
1256 .collect();
1257 assert_eq!(
1258 all_rows,
1259 vec![0, 1, 2],
1260 "Should return all rows in zone as possible matches"
1261 );
1262 }
1263 _ => panic!("Expected AtMost search result from zonemap"),
1264 }
1265 }
1266
1267 #[tokio::test]
1268 async fn test_nan_zonemap_index() {
1269 let tmpdir = TempObjDir::default();
1270 let test_store = Arc::new(LanceIndexStore::new(
1271 Arc::new(ObjectStore::local()),
1272 tmpdir.clone(),
1273 Arc::new(LanceCache::no_cache()),
1274 ));
1275
1276 let mut values = Vec::new();
1279 for i in 0..500 {
1280 if i % 5 == 2 {
1281 values.push(f32::NAN);
1282 } else {
1283 values.push(i as f32);
1285 }
1286 }
1287
1288 let float_data = arrow_array::Float32Array::from(values);
1289 let row_ids = UInt64Array::from_iter_values((0..float_data.len()).map(|i| i as u64));
1290 let schema = Arc::new(Schema::new(vec![
1291 Field::new(VALUE_COLUMN_NAME, DataType::Float32, true),
1292 Field::new(ROW_ADDR, DataType::UInt64, false),
1293 ]));
1294 let data = RecordBatch::try_new(
1295 schema.clone(),
1296 vec![Arc::new(float_data.clone()), Arc::new(row_ids)],
1297 )
1298 .unwrap();
1299 let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1300 schema,
1301 stream::once(std::future::ready(Ok(data))),
1302 ));
1303
1304 ZoneMapIndexPlugin::train_zonemap_index(
1305 data_stream,
1306 test_store.as_ref(),
1307 Some(ZoneMapIndexBuilderParams::new(100)),
1308 )
1309 .await
1310 .unwrap();
1311
1312 let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1314 .await
1315 .expect("Failed to load ZoneMapIndex");
1316
1317 assert_eq!(index.zones.len(), 5);
1319
1320 for (i, zone) in index.zones.iter().enumerate() {
1324 assert_eq!(zone.nan_count, 20, "Zone {} should have 20 NaN values", i);
1325 assert_eq!(
1326 zone.bound.length, 100,
1327 "Zone {} should have zone_length 100",
1328 i
1329 );
1330 assert_eq!(
1331 zone.bound.fragment_id, 0u64,
1332 "Zone {} should have fragment_id 0",
1333 i
1334 );
1335 }
1336
1337 let query = SargableQuery::Equals(ScalarValue::Float32(Some(f32::NAN)));
1339 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1340
1341 let mut expected = RowAddrTreeMap::new();
1343 expected.insert_range(0..500); assert_eq!(result, SearchResult::at_most(expected));
1345
1346 let query = SargableQuery::Equals(ScalarValue::Float32(Some(5.0)));
1348 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1349
1350 let mut expected = RowAddrTreeMap::new();
1352 expected.insert_range(0..100);
1353 assert_eq!(result, SearchResult::at_most(expected));
1354
1355 let query = SargableQuery::Equals(ScalarValue::Float32(Some(1000.0)));
1357 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1358
1359 let mut expected = RowAddrTreeMap::new();
1362 expected.insert_range(0..500);
1363 assert_eq!(result, SearchResult::at_most(expected));
1364
1365 let query = SargableQuery::Range(
1367 Bound::Included(ScalarValue::Float32(Some(0.0))),
1368 Bound::Included(ScalarValue::Float32(Some(250.0))),
1369 );
1370 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1371
1372 let mut expected = RowAddrTreeMap::new();
1374 expected.insert_range(0..300);
1375 assert_eq!(result, SearchResult::at_most(expected));
1376
1377 let query = SargableQuery::IsIn(vec![
1379 ScalarValue::Float32(Some(f32::NAN)),
1380 ScalarValue::Float32(Some(5.0)),
1381 ScalarValue::Float32(Some(150.0)), ]);
1383 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1384
1385 let mut expected = RowAddrTreeMap::new();
1387 expected.insert_range(0..500);
1388 assert_eq!(result, SearchResult::at_most(expected));
1389
1390 let query = SargableQuery::Range(
1392 Bound::Included(ScalarValue::Float32(Some(1000.0))),
1393 Bound::Included(ScalarValue::Float32(Some(2000.0))),
1394 );
1395 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1396
1397 let mut expected = RowAddrTreeMap::new();
1400 expected.insert_range(0..500);
1401 assert_eq!(result, SearchResult::at_most(expected));
1402
1403 let query = SargableQuery::IsNull();
1405 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1406 assert_eq!(result, SearchResult::AtMost(NullableRowAddrSet::empty()));
1407
1408 let query = SargableQuery::Range(
1411 Bound::Included(ScalarValue::Float32(Some(f32::NAN))),
1412 Bound::Unbounded,
1413 );
1414 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1415 let mut expected = RowAddrTreeMap::new();
1417 expected.insert_range(0..500);
1418 assert_eq!(result, SearchResult::at_most(expected));
1419
1420 let query = SargableQuery::Range(
1422 Bound::Unbounded,
1423 Bound::Included(ScalarValue::Float32(Some(f32::NAN))),
1424 );
1425 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1426 let mut expected = RowAddrTreeMap::new();
1428 expected.insert_range(0..500);
1429 assert_eq!(result, SearchResult::at_most(expected));
1430
1431 let query = SargableQuery::Range(
1433 Bound::Unbounded,
1434 Bound::Excluded(ScalarValue::Float32(Some(f32::NAN))),
1435 );
1436 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1437 let mut expected = RowAddrTreeMap::new();
1439 expected.insert_range(0..500);
1440 assert_eq!(result, SearchResult::at_most(expected));
1441
1442 let query = SargableQuery::Range(
1444 Bound::Excluded(ScalarValue::Float32(Some(f32::NAN))),
1445 Bound::Unbounded,
1446 );
1447 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1448 assert_eq!(result, SearchResult::AtMost(NullableRowAddrSet::empty()));
1450
1451 let query = SargableQuery::IsIn(vec![
1453 ScalarValue::Float16(Some(half::f16::NAN)),
1454 ScalarValue::Float32(Some(f32::NAN)),
1455 ScalarValue::Float64(Some(f64::NAN)),
1456 ScalarValue::Float32(Some(5.0)),
1457 ]);
1458 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1459 let mut expected = RowAddrTreeMap::new();
1461 expected.insert_range(0..500);
1462 assert_eq!(result, SearchResult::at_most(expected));
1463 }
1464
1465 #[tokio::test]
1466 async fn test_basic_zonemap_index() {
1468 let tmpdir = TempObjDir::default();
1469 let test_store = Arc::new(LanceIndexStore::new(
1470 Arc::new(ObjectStore::local()),
1471 tmpdir.clone(),
1472 Arc::new(LanceCache::no_cache()),
1473 ));
1474
1475 let data = arrow_array::Int32Array::from_iter_values(0..=100);
1476 let row_ids = UInt64Array::from_iter_values((0..data.len()).map(|i| i as u64));
1477 let schema = Arc::new(Schema::new(vec![
1478 Field::new(VALUE_COLUMN_NAME, DataType::Int32, false),
1479 Field::new(ROW_ADDR, DataType::UInt64, false),
1480 ]));
1481 let data =
1482 RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap();
1483 let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1484 schema,
1485 stream::once(std::future::ready(Ok(data))),
1486 ));
1487
1488 ZoneMapIndexPlugin::train_zonemap_index(
1489 data_stream,
1490 test_store.as_ref(),
1491 Some(ZoneMapIndexBuilderParams::new(100)),
1492 )
1493 .await
1494 .unwrap();
1495
1496 log::debug!("Successfully wrote the index file");
1497
1498 let index_file = test_store.open_index_file(ZONEMAP_FILENAME).await.unwrap();
1500 let metadata = index_file.schema().metadata.clone();
1502 let record_batch = index_file
1503 .read_record_batch(0, index_file.num_rows() as u64)
1504 .await
1505 .unwrap();
1506 assert_eq!(record_batch.num_rows(), 2);
1507 assert_eq!(
1508 record_batch
1509 .column(0)
1510 .as_any()
1511 .downcast_ref::<arrow_array::Int32Array>()
1512 .unwrap()
1513 .values(),
1514 &[0, 100]
1515 );
1516 assert_eq!(
1517 record_batch
1518 .column(1)
1519 .as_any()
1520 .downcast_ref::<arrow_array::Int32Array>()
1521 .unwrap()
1522 .values(),
1523 &[99, 100]
1524 );
1525 assert_eq!(
1526 record_batch
1527 .column(2)
1528 .as_any()
1529 .downcast_ref::<arrow_array::UInt32Array>()
1530 .unwrap()
1531 .values(),
1532 &[0, 0]
1533 );
1534 assert_eq!(metadata.get(ZONEMAP_SIZE_META_KEY).unwrap(), "100");
1535
1536 let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1538 .await
1539 .expect("Failed to load ZoneMapIndex");
1540 assert_eq!(index.zones.len(), 2);
1541 assert_eq!(
1542 index.zones,
1543 vec![
1544 ZoneMapStatistics {
1545 min: ScalarValue::Int32(Some(0)),
1546 max: ScalarValue::Int32(Some(99)),
1547 null_count: 0,
1548 nan_count: 0,
1549 bound: ZoneBound {
1550 fragment_id: 0,
1551 start: 0,
1552 length: 100,
1553 },
1554 },
1555 ZoneMapStatistics {
1556 min: ScalarValue::Int32(Some(100)),
1557 max: ScalarValue::Int32(Some(100)),
1558 null_count: 0,
1559 nan_count: 0,
1560 bound: ZoneBound {
1561 fragment_id: 0,
1562 start: 100,
1563 length: 1,
1564 },
1565 }
1566 ]
1567 );
1568 for (i, zone) in index.zones.iter().enumerate() {
1570 assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
1571 }
1572
1573 assert_eq!(index.data_type, DataType::Int32);
1574 assert_eq!(index.rows_per_zone, 100);
1575 assert_eq!(
1576 index.calculate_included_frags().await.unwrap(),
1577 RoaringBitmap::from_iter(0..1)
1578 );
1579
1580 let query = SargableQuery::Range(
1584 Bound::Excluded(ScalarValue::Int32(Some(50))),
1585 Bound::Unbounded,
1586 );
1587 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1588 assert_eq!(result, SearchResult::at_most(0..=100));
1589
1590 let query = SargableQuery::Range(
1592 Bound::Included(ScalarValue::Int32(Some(0))),
1593 Bound::Included(ScalarValue::Int32(Some(50))),
1594 );
1595 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1596 assert_eq!(result, SearchResult::at_most(0..=99));
1597
1598 let query = SargableQuery::Range(
1600 Bound::Included(ScalarValue::Int32(Some(101))),
1601 Bound::Included(ScalarValue::Int32(Some(200))),
1602 );
1603 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1604 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1606
1607 let query = SargableQuery::Range(
1609 Bound::Included(ScalarValue::Int32(Some(100))),
1610 Bound::Included(ScalarValue::Int32(Some(100))),
1611 );
1612 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1613 assert_eq!(result, SearchResult::at_most(100..=100));
1614
1615 let query = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
1617 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1618 assert_eq!(result, SearchResult::at_most(0..=99));
1619
1620 let query = SargableQuery::Equals(ScalarValue::Int32(Some(100)));
1622 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1623 assert_eq!(result, SearchResult::at_most(100..=100));
1624
1625 let query = SargableQuery::Equals(ScalarValue::Int32(Some(101)));
1627 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1628 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1629
1630 let query = SargableQuery::IsNull();
1632 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1633 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1634 let query = SargableQuery::IsIn(vec![
1636 ScalarValue::Int32(Some(0)),
1637 ScalarValue::Int32(Some(100)),
1638 ScalarValue::Int32(Some(101)),
1639 ScalarValue::Int32(Some(50)),
1640 ]);
1641 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1642 assert_eq!(result, SearchResult::at_most(0..=100));
1644
1645 let query = SargableQuery::IsIn(vec![
1647 ScalarValue::Int32(Some(101)),
1648 ScalarValue::Int32(Some(102)),
1649 ]);
1650 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1651 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1652
1653 let query = SargableQuery::IsIn(vec![ScalarValue::Int32(None)]);
1655 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1656 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1657
1658 let query = SargableQuery::Equals(ScalarValue::Int32(None));
1660 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1661 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1662 }
1663
1664 #[tokio::test]
1665 async fn test_complex_zonemap_index() {
1667 let tmpdir = TempObjDir::default();
1668 let test_store = Arc::new(LanceIndexStore::new(
1669 Arc::new(ObjectStore::local()),
1670 tmpdir.clone(),
1671 Arc::new(LanceCache::no_cache()),
1672 ));
1673
1674 let data =
1676 arrow_array::Int64Array::from_iter_values(0..(ROWS_PER_ZONE_DEFAULT * 2 + 42) as i64);
1677 let row_ids = UInt64Array::from_iter_values((0..data.len()).map(|i| i as u64));
1678 let schema = Arc::new(Schema::new(vec![
1679 Field::new(VALUE_COLUMN_NAME, DataType::Int64, false),
1680 Field::new(ROW_ADDR, DataType::UInt64, false),
1681 ]));
1682 let data =
1683 RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap();
1684 let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1685 schema,
1686 stream::once(std::future::ready(Ok(data))),
1687 ));
1688
1689 ZoneMapIndexPlugin::train_zonemap_index(
1690 data_stream,
1691 test_store.as_ref(),
1692 Some(ZoneMapIndexBuilderParams::default()),
1693 )
1694 .await
1695 .unwrap();
1696
1697 log::debug!("Successfully wrote the index file");
1698
1699 let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1701 .await
1702 .expect("Failed to load ZoneMapIndex");
1703 assert_eq!(index.zones.len(), 3);
1704 assert_eq!(
1705 index.zones,
1706 vec![
1707 ZoneMapStatistics {
1708 min: ScalarValue::Int64(Some(0)),
1709 max: ScalarValue::Int64(Some(8191)),
1710 null_count: 0,
1711 nan_count: 0,
1712 bound: ZoneBound {
1713 fragment_id: 0,
1714 start: 0,
1715 length: 8192,
1716 },
1717 },
1718 ZoneMapStatistics {
1719 min: ScalarValue::Int64(Some(8192)),
1720 max: ScalarValue::Int64(Some(16383)),
1721 null_count: 0,
1722 nan_count: 0,
1723 bound: ZoneBound {
1724 fragment_id: 0,
1725 start: 8192,
1726 length: 8192,
1727 },
1728 },
1729 ZoneMapStatistics {
1730 min: ScalarValue::Int64(Some(16384)),
1731 max: ScalarValue::Int64(Some(16425)),
1732 null_count: 0,
1733 nan_count: 0,
1734 bound: ZoneBound {
1735 fragment_id: 0,
1736 start: 16384,
1737 length: 42,
1738 },
1739 }
1740 ]
1741 );
1742 for (i, zone) in index.zones.iter().enumerate() {
1744 assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
1745 }
1746
1747 assert_eq!(index.data_type, DataType::Int64);
1748 assert_eq!(index.rows_per_zone, ROWS_PER_ZONE_DEFAULT);
1749 assert_eq!(
1750 index.calculate_included_frags().await.unwrap(),
1751 RoaringBitmap::from_iter(0..1)
1752 );
1753
1754 let query = SargableQuery::Equals(ScalarValue::Int64(Some(1000)));
1759 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1760 let mut expected = RowAddrTreeMap::new();
1762 expected.insert_range(0..=8191);
1763 assert_eq!(result, SearchResult::at_most(expected));
1764
1765 let query = SargableQuery::Equals(ScalarValue::Int64(Some(9000)));
1767 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1768 let mut expected = RowAddrTreeMap::new();
1770 expected.insert_range(8192..=16383);
1771 assert_eq!(result, SearchResult::at_most(expected));
1772
1773 let query = SargableQuery::Equals(ScalarValue::Int64(Some(20000)));
1775 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1776 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1777
1778 let query = SargableQuery::Range(
1780 Bound::Included(ScalarValue::Int64(Some(9000))),
1781 Bound::Included(ScalarValue::Int64(Some(16400))),
1782 );
1783 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1784 let mut expected = RowAddrTreeMap::new();
1786 expected.insert_range(8192..=16425);
1787 assert_eq!(result, SearchResult::at_most(expected));
1788 }
1789
1790 #[tokio::test]
1791 async fn test_multiple_fragments_zonemap() {
1793 let tmpdir = TempObjDir::default();
1794 let test_store = Arc::new(LanceIndexStore::new(
1795 Arc::new(ObjectStore::local()),
1796 tmpdir.clone(),
1797 Arc::new(LanceCache::no_cache()),
1798 ));
1799
1800 let schema = Arc::new(Schema::new(vec![
1801 Field::new(VALUE_COLUMN_NAME, DataType::Int64, false),
1802 Field::new(ROW_ADDR, DataType::UInt64, false),
1803 ]));
1804
1805 let fragment0_data =
1808 arrow_array::Int64Array::from_iter_values(0..ROWS_PER_ZONE_DEFAULT as i64);
1809 let fragment0_row_ids = UInt64Array::from_iter_values(0..ROWS_PER_ZONE_DEFAULT);
1810 let fragment0_batch = RecordBatch::try_new(
1811 schema.clone(),
1812 vec![Arc::new(fragment0_data), Arc::new(fragment0_row_ids)],
1813 )
1814 .unwrap();
1815
1816 let fragment1_data = arrow_array::Int64Array::from_iter_values(
1818 (ROWS_PER_ZONE_DEFAULT as i64)..((ROWS_PER_ZONE_DEFAULT * 2) as i64),
1819 );
1820 let fragment1_row_ids =
1821 UInt64Array::from_iter_values((0..ROWS_PER_ZONE_DEFAULT).map(|i| i + (1 << 32)));
1822 let fragment1_batch = RecordBatch::try_new(
1823 schema.clone(),
1824 vec![Arc::new(fragment1_data), Arc::new(fragment1_row_ids)],
1825 )
1826 .unwrap();
1827
1828 let fragment2_data = arrow_array::Int64Array::from_iter_values(
1830 ((ROWS_PER_ZONE_DEFAULT * 2) as i64)..((ROWS_PER_ZONE_DEFAULT * 2 + 42) as i64),
1831 );
1832 let fragment2_row_ids =
1833 UInt64Array::from_iter_values((0..42).map(|i| (i as u64) + (2 << 32)));
1834 let fragment2_batch = RecordBatch::try_new(
1835 schema.clone(),
1836 vec![Arc::new(fragment2_data), Arc::new(fragment2_row_ids)],
1837 )
1838 .unwrap();
1839
1840 {
1842 let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1844 schema.clone(),
1845 stream::iter(vec![
1846 Ok(fragment0_batch.clone()),
1847 Ok(fragment1_batch.clone()),
1848 Ok(fragment2_batch.clone()),
1849 ]),
1850 ));
1851 ZoneMapIndexPlugin::train_zonemap_index(
1852 data_stream,
1853 test_store.as_ref(),
1854 Some(ZoneMapIndexBuilderParams::new(5000)),
1855 )
1856 .await
1857 .unwrap();
1858
1859 let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1861 .await
1862 .expect("Failed to load ZoneMapIndex");
1863 assert_eq!(index.zones.len(), 5);
1864 assert_eq!(
1865 index.zones,
1866 vec![
1867 ZoneMapStatistics {
1868 min: ScalarValue::Int64(Some(0)),
1869 max: ScalarValue::Int64(Some(4999)),
1870 null_count: 0,
1871 nan_count: 0,
1872 bound: ZoneBound {
1873 fragment_id: 0,
1874 start: 0,
1875 length: 5000,
1876 },
1877 },
1878 ZoneMapStatistics {
1879 min: ScalarValue::Int64(Some(5000)),
1880 max: ScalarValue::Int64(Some(8191)),
1881 null_count: 0,
1882 nan_count: 0,
1883 bound: ZoneBound {
1884 fragment_id: 0,
1885 start: 5000,
1886 length: 3192,
1887 },
1888 },
1889 ZoneMapStatistics {
1890 min: ScalarValue::Int64(Some(8192)),
1891 max: ScalarValue::Int64(Some(13191)),
1892 null_count: 0,
1893 nan_count: 0,
1894 bound: ZoneBound {
1895 fragment_id: 1,
1896 start: 0,
1897 length: 5000,
1898 },
1899 },
1900 ZoneMapStatistics {
1901 min: ScalarValue::Int64(Some(13192)),
1902 max: ScalarValue::Int64(Some(16383)),
1903 null_count: 0,
1904 nan_count: 0,
1905 bound: ZoneBound {
1906 fragment_id: 1,
1907 start: 5000,
1908 length: 3192,
1909 },
1910 },
1911 ZoneMapStatistics {
1912 min: ScalarValue::Int64(Some(16384)),
1913 max: ScalarValue::Int64(Some(16425)),
1914 null_count: 0,
1915 nan_count: 0,
1916 bound: ZoneBound {
1917 fragment_id: 2,
1918 start: 0,
1919 length: 42,
1920 },
1921 }
1922 ]
1923 );
1924 for (i, zone) in index.zones.iter().enumerate() {
1926 assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
1927 }
1928
1929 assert_eq!(index.data_type, DataType::Int64);
1930 assert_eq!(index.rows_per_zone, 5000);
1931 assert_eq!(
1932 index.calculate_included_frags().await.unwrap(),
1933 RoaringBitmap::from_iter(0..3)
1934 );
1935
1936 let verify_data_stream: SendableRecordBatchStream =
1938 Box::pin(RecordBatchStreamAdapter::new(
1939 schema.clone(),
1940 stream::iter(vec![
1941 Ok(fragment0_batch.clone()),
1942 Ok(fragment1_batch.clone()),
1943 Ok(fragment2_batch.clone()),
1944 ]),
1945 ));
1946 let batches: Vec<RecordBatch> = verify_data_stream.try_collect().await.unwrap();
1947
1948 assert_eq!(batches.len(), 3);
1949
1950 let fragment0_rowaddr_col = batches[0].column_by_name(ROW_ADDR).unwrap();
1952 let fragment0_rowaddrs = fragment0_rowaddr_col
1953 .as_any()
1954 .downcast_ref::<UInt64Array>()
1955 .unwrap();
1956 assert_eq!(
1957 fragment0_rowaddrs.values().len(),
1958 ROWS_PER_ZONE_DEFAULT as usize
1959 );
1960 assert_eq!(fragment0_rowaddrs.values()[0], 0);
1961 assert_eq!(
1962 fragment0_rowaddrs.values()[fragment0_rowaddrs.values().len() - 1],
1963 8191
1964 );
1965
1966 let fragment1_rowaddr_col = batches[1].column_by_name(ROW_ADDR).unwrap();
1968 let fragment1_rowaddrs = fragment1_rowaddr_col
1969 .as_any()
1970 .downcast_ref::<UInt64Array>()
1971 .unwrap();
1972 assert_eq!(
1973 fragment1_rowaddrs.values().len(),
1974 ROWS_PER_ZONE_DEFAULT as usize
1975 );
1976 assert_eq!(fragment1_rowaddrs.values()[0], 1u64 << 32); assert_eq!(
1978 fragment1_rowaddrs.values()[fragment1_rowaddrs.values().len() - 1],
1979 8191 | (1u64 << 32)
1980 );
1981
1982 let fragment2_rowaddr_col = batches[2].column_by_name(ROW_ADDR).unwrap();
1984 let fragment2_rowaddrs = fragment2_rowaddr_col
1985 .as_any()
1986 .downcast_ref::<UInt64Array>()
1987 .unwrap();
1988 assert_eq!(fragment2_rowaddrs.values().len(), 42);
1989 assert_eq!(fragment2_rowaddrs.values()[0], 2u64 << 32); assert_eq!(
1991 fragment2_rowaddrs.values()[fragment2_rowaddrs.values().len() - 1],
1992 (2u64 << 32) | 41
1993 );
1994
1995 let query = SargableQuery::Range(
1999 Bound::Included(ScalarValue::Int64(Some(5000))),
2000 Bound::Included(ScalarValue::Int64(Some(12000))),
2001 );
2002 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2003 let mut expected = RowAddrTreeMap::new();
2005 expected.insert_range(5000..8192);
2007 expected.insert_range((1u64 << 32)..((1u64 << 32) + 5000));
2009 assert_eq!(result, SearchResult::at_most(expected));
2010
2011 let query = SargableQuery::Equals(ScalarValue::Int64(Some(8192)));
2013 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2014 let mut expected = RowAddrTreeMap::new();
2016 expected.insert_range((1u64 << 32)..((1u64 << 32) + 5000));
2017 assert_eq!(result, SearchResult::at_most(expected));
2018
2019 let query = SargableQuery::Equals(ScalarValue::Int64(Some(16385)));
2021 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2022 let mut expected = RowAddrTreeMap::new();
2024 expected.insert_range(2u64 << 32..((2u64 << 32) + 42));
2025 assert_eq!(result, SearchResult::at_most(expected));
2026
2027 let query = SargableQuery::Equals(ScalarValue::Int64(Some(99999)));
2029 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2030 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
2031
2032 let query = SargableQuery::IsIn(vec![ScalarValue::Int64(Some(16385))]);
2034 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2035 let mut expected = RowAddrTreeMap::new();
2036 expected.insert_range(2u64 << 32..((2u64 << 32) + 42));
2037 assert_eq!(result, SearchResult::at_most(expected));
2038
2039 let query = SargableQuery::Equals(ScalarValue::Int64(None));
2041 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2042 let mut expected = RowAddrTreeMap::new();
2043 expected.insert_range(0..=16425);
2044 assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
2046 }
2047
2048 {
2050 let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
2052 schema.clone(),
2053 stream::iter(vec![
2054 Ok(fragment0_batch.clone()),
2055 Ok(fragment1_batch.clone()),
2056 Ok(fragment2_batch.clone()),
2057 ]),
2058 ));
2059 ZoneMapIndexPlugin::train_zonemap_index(
2060 data_stream,
2061 test_store.as_ref(),
2062 Some(ZoneMapIndexBuilderParams::default()),
2063 )
2064 .await
2065 .unwrap();
2066
2067 let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2069 .await
2070 .expect("Failed to load ZoneMapIndex");
2071 assert_eq!(index.zones.len(), 3);
2072 assert_eq!(
2073 index.zones,
2074 vec![
2075 ZoneMapStatistics {
2076 min: ScalarValue::Int64(Some(0)),
2077 max: ScalarValue::Int64(Some(8191)),
2078 null_count: 0,
2079 nan_count: 0,
2080 bound: ZoneBound {
2081 fragment_id: 0,
2082 start: 0,
2083 length: 8192,
2084 },
2085 },
2086 ZoneMapStatistics {
2087 min: ScalarValue::Int64(Some(8192)),
2088 max: ScalarValue::Int64(Some(16383)),
2089 null_count: 0,
2090 nan_count: 0,
2091 bound: ZoneBound {
2092 fragment_id: 1,
2093 start: 0,
2094 length: 8192,
2095 },
2096 },
2097 ZoneMapStatistics {
2098 min: ScalarValue::Int64(Some(16384)),
2099 max: ScalarValue::Int64(Some(16425)),
2100 null_count: 0,
2101 nan_count: 0,
2102 bound: ZoneBound {
2103 fragment_id: 2,
2104 start: 0,
2105 length: 42,
2106 },
2107 }
2108 ]
2109 );
2110 for (i, zone) in index.zones.iter().enumerate() {
2112 assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
2113 }
2114
2115 assert_eq!(index.data_type, DataType::Int64);
2116 assert_eq!(index.rows_per_zone, ROWS_PER_ZONE_DEFAULT);
2117 assert_eq!(
2118 index.calculate_included_frags().await.unwrap(),
2119 RoaringBitmap::from_iter(0..3)
2120 );
2121 }
2122
2123 {
2125 let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
2127 schema.clone(),
2128 stream::iter(vec![
2129 Ok(fragment0_batch.clone()),
2130 Ok(fragment1_batch.clone()),
2131 Ok(fragment2_batch.clone()),
2132 ]),
2133 ));
2134 ZoneMapIndexPlugin::train_zonemap_index(
2135 data_stream,
2136 test_store.as_ref(),
2137 Some(ZoneMapIndexBuilderParams::new(ROWS_PER_ZONE_DEFAULT * 3)),
2138 )
2139 .await
2140 .unwrap();
2141
2142 let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2144 .await
2145 .expect("Failed to load ZoneMapIndex");
2146 assert_eq!(index.zones.len(), 3);
2147 assert_eq!(
2148 index.zones,
2149 vec![
2150 ZoneMapStatistics {
2151 min: ScalarValue::Int64(Some(0)),
2152 max: ScalarValue::Int64(Some(8191)),
2153 null_count: 0,
2154 nan_count: 0,
2155 bound: ZoneBound {
2156 fragment_id: 0,
2157 start: 0,
2158 length: 8192,
2159 },
2160 },
2161 ZoneMapStatistics {
2162 min: ScalarValue::Int64(Some(8192)),
2163 max: ScalarValue::Int64(Some(16383)),
2164 null_count: 0,
2165 nan_count: 0,
2166 bound: ZoneBound {
2167 fragment_id: 1,
2168 start: 0,
2169 length: 8192,
2170 },
2171 },
2172 ZoneMapStatistics {
2173 min: ScalarValue::Int64(Some(16384)),
2174 max: ScalarValue::Int64(Some(16425)),
2175 null_count: 0,
2176 nan_count: 0,
2177 bound: ZoneBound {
2178 fragment_id: 2,
2179 start: 0,
2180 length: 42,
2181 },
2182 }
2183 ]
2184 );
2185 for (i, zone) in index.zones.iter().enumerate() {
2187 assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
2188 }
2189
2190 assert_eq!(index.data_type, DataType::Int64);
2191 assert_eq!(index.rows_per_zone, ROWS_PER_ZONE_DEFAULT * 3);
2192 }
2193 }
2194
2195 #[tokio::test]
2196 async fn test_fragment_id_assignment() {
2197 let schema = Arc::new(Schema::new(vec![Field::new(
2199 VALUE_COLUMN_NAME,
2200 DataType::Int32,
2201 false,
2202 )]));
2203
2204 let fragment0_data = arrow_array::Int32Array::from_iter_values(0..5);
2206 let fragment0_batch =
2207 RecordBatch::try_new(schema.clone(), vec![Arc::new(fragment0_data)]).unwrap();
2208
2209 let fragment1_data = arrow_array::Int32Array::from_iter_values(5..10);
2210 let fragment1_batch =
2211 RecordBatch::try_new(schema.clone(), vec![Arc::new(fragment1_data)]).unwrap();
2212
2213 let aligned_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
2214 schema,
2215 stream::iter(vec![Ok(fragment0_batch), Ok(fragment1_batch)]),
2216 ));
2217
2218 let aligned_stream = add_row_addr(aligned_stream);
2219
2220 let batches: Vec<RecordBatch> = aligned_stream.try_collect().await.unwrap();
2221
2222 assert_eq!(batches.len(), 2);
2223
2224 let fragment0_rowaddr_col = batches[0].column_by_name(ROW_ADDR).unwrap();
2226 let fragment0_rowaddrs = fragment0_rowaddr_col
2227 .as_any()
2228 .downcast_ref::<UInt64Array>()
2229 .unwrap();
2230
2231 assert_eq!(fragment0_rowaddrs.values(), &[0, 1, 2, 3, 4]);
2233
2234 let fragment1_rowaddr_col = batches[1].column_by_name(ROW_ADDR).unwrap();
2236 let fragment1_rowaddrs = fragment1_rowaddr_col
2237 .as_any()
2238 .downcast_ref::<UInt64Array>()
2239 .unwrap();
2240
2241 assert_eq!(
2244 fragment1_rowaddrs.values(),
2245 &[4294967296, 4294967297, 4294967298, 4294967299, 4294967300]
2246 );
2247 }
2248
2249 #[tokio::test]
2250 async fn test_like_prefix_query() {
2251 let tmpdir = TempObjDir::default();
2252 let test_store = Arc::new(LanceIndexStore::new(
2253 Arc::new(ObjectStore::local()),
2254 tmpdir.clone(),
2255 Arc::new(LanceCache::no_cache()),
2256 ));
2257
2258 let zones = vec![
2267 ZoneMapStatistics {
2268 min: ScalarValue::Utf8(Some("aaa".to_string())),
2269 max: ScalarValue::Utf8(Some("azz".to_string())),
2270 null_count: 0,
2271 nan_count: 0,
2272 bound: ZoneBound {
2273 fragment_id: 0,
2274 start: 0,
2275 length: 100,
2276 },
2277 },
2278 ZoneMapStatistics {
2279 min: ScalarValue::Utf8(Some("bar".to_string())),
2280 max: ScalarValue::Utf8(Some("baz".to_string())),
2281 null_count: 0,
2282 nan_count: 0,
2283 bound: ZoneBound {
2284 fragment_id: 1,
2285 start: 0,
2286 length: 100,
2287 },
2288 },
2289 ZoneMapStatistics {
2290 min: ScalarValue::Utf8(Some("fa".to_string())),
2291 max: ScalarValue::Utf8(Some("foz".to_string())),
2292 null_count: 0,
2293 nan_count: 0,
2294 bound: ZoneBound {
2295 fragment_id: 2,
2296 start: 0,
2297 length: 100,
2298 },
2299 },
2300 ZoneMapStatistics {
2301 min: ScalarValue::Utf8(Some("fop".to_string())),
2302 max: ScalarValue::Utf8(Some("fzz".to_string())),
2303 null_count: 0,
2304 nan_count: 0,
2305 bound: ZoneBound {
2306 fragment_id: 3,
2307 start: 0,
2308 length: 100,
2309 },
2310 },
2311 ZoneMapStatistics {
2312 min: ScalarValue::Utf8(Some("foo".to_string())),
2313 max: ScalarValue::Utf8(Some("foobar".to_string())),
2314 null_count: 0,
2315 nan_count: 0,
2316 bound: ZoneBound {
2317 fragment_id: 4,
2318 start: 0,
2319 length: 100,
2320 },
2321 },
2322 ZoneMapStatistics {
2323 min: ScalarValue::Utf8(Some("gaa".to_string())),
2324 max: ScalarValue::Utf8(Some("gzz".to_string())),
2325 null_count: 0,
2326 nan_count: 0,
2327 bound: ZoneBound {
2328 fragment_id: 5,
2329 start: 0,
2330 length: 100,
2331 },
2332 },
2333 ];
2334
2335 let index = ZoneMapIndex {
2336 zones,
2337 data_type: DataType::Utf8,
2338 rows_per_zone: ROWS_PER_ZONE_DEFAULT,
2339 store: test_store,
2340 fri: None,
2341 index_cache: WeakLanceCache::from(&LanceCache::no_cache()),
2342 };
2343
2344 let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("foo".to_string())));
2346 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2347
2348 let mut expected = RowAddrTreeMap::new();
2350 expected.insert_range((2u64 << 32)..((2u64 << 32) + 100));
2352 expected.insert_range((4u64 << 32)..((4u64 << 32) + 100));
2354
2355 assert_eq!(result, SearchResult::at_most(expected));
2356 }
2357
2358 #[tokio::test]
2359 async fn test_like_prefix_edge_cases() {
2360 let tmpdir = TempObjDir::default();
2361 let test_store = Arc::new(LanceIndexStore::new(
2362 Arc::new(ObjectStore::local()),
2363 tmpdir.clone(),
2364 Arc::new(LanceCache::no_cache()),
2365 ));
2366
2367 let zones = vec![
2369 ZoneMapStatistics {
2371 min: ScalarValue::Utf8(Some("test".to_string())),
2372 max: ScalarValue::Utf8(Some("test".to_string())),
2373 null_count: 0,
2374 nan_count: 0,
2375 bound: ZoneBound {
2376 fragment_id: 0,
2377 start: 0,
2378 length: 100,
2379 },
2380 },
2381 ZoneMapStatistics {
2383 min: ScalarValue::Utf8(Some("te".to_string())),
2384 max: ScalarValue::Utf8(Some("tf".to_string())),
2385 null_count: 0,
2386 nan_count: 0,
2387 bound: ZoneBound {
2388 fragment_id: 1,
2389 start: 0,
2390 length: 100,
2391 },
2392 },
2393 ZoneMapStatistics {
2395 min: ScalarValue::Utf8(Some("abc".to_string())),
2396 max: ScalarValue::Utf8(Some("def".to_string())),
2397 null_count: 0,
2398 nan_count: 0,
2399 bound: ZoneBound {
2400 fragment_id: 2,
2401 start: 0,
2402 length: 100,
2403 },
2404 },
2405 ];
2406
2407 let index = ZoneMapIndex {
2408 zones,
2409 data_type: DataType::Utf8,
2410 rows_per_zone: ROWS_PER_ZONE_DEFAULT,
2411 store: test_store,
2412 fri: None,
2413 index_cache: WeakLanceCache::from(&LanceCache::no_cache()),
2414 };
2415
2416 let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("test".to_string())));
2418 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2419
2420 let mut expected = RowAddrTreeMap::new();
2422 expected.insert_range(0..100); expected.insert_range((1u64 << 32)..((1u64 << 32) + 100));
2424
2425 assert_eq!(result, SearchResult::at_most(expected));
2426
2427 let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("".to_string())));
2429 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2430
2431 let mut expected = RowAddrTreeMap::new();
2432 expected.insert_range(0..100); expected.insert_range((1u64 << 32)..((1u64 << 32) + 100));
2434 expected.insert_range((2u64 << 32)..((2u64 << 32) + 100));
2435
2436 assert_eq!(result, SearchResult::at_most(expected));
2437 }
2438
2439 #[tokio::test]
2440 async fn test_like_prefix_large_utf8() {
2441 let tmpdir = TempObjDir::default();
2442 let test_store = Arc::new(LanceIndexStore::new(
2443 Arc::new(ObjectStore::local()),
2444 tmpdir.clone(),
2445 Arc::new(LanceCache::no_cache()),
2446 ));
2447
2448 let zones = vec![
2450 ZoneMapStatistics {
2451 min: ScalarValue::LargeUtf8(Some("aaa".to_string())),
2452 max: ScalarValue::LargeUtf8(Some("azz".to_string())),
2453 null_count: 0,
2454 nan_count: 0,
2455 bound: ZoneBound {
2456 fragment_id: 0,
2457 start: 0,
2458 length: 100,
2459 },
2460 },
2461 ZoneMapStatistics {
2462 min: ScalarValue::LargeUtf8(Some("foo".to_string())),
2463 max: ScalarValue::LargeUtf8(Some("foobar".to_string())),
2464 null_count: 0,
2465 nan_count: 0,
2466 bound: ZoneBound {
2467 fragment_id: 1,
2468 start: 0,
2469 length: 100,
2470 },
2471 },
2472 ];
2473
2474 let index = ZoneMapIndex {
2475 zones,
2476 data_type: DataType::LargeUtf8,
2477 rows_per_zone: ROWS_PER_ZONE_DEFAULT,
2478 store: test_store,
2479 fri: None,
2480 index_cache: WeakLanceCache::from(&LanceCache::no_cache()),
2481 };
2482
2483 let query = SargableQuery::LikePrefix(ScalarValue::LargeUtf8(Some("foo".to_string())));
2485 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2486
2487 let mut expected = RowAddrTreeMap::new();
2489 expected.insert_range((1u64 << 32)..((1u64 << 32) + 100));
2490
2491 assert_eq!(result, SearchResult::at_most(expected));
2492 }
2493
2494 #[test]
2495 fn test_compute_next_prefix() {
2496 use super::compute_next_prefix;
2497
2498 assert_eq!(compute_next_prefix("foo"), Some("fop".to_string()));
2500 assert_eq!(compute_next_prefix("abc"), Some("abd".to_string()));
2501 assert_eq!(compute_next_prefix("a"), Some("b".to_string()));
2502 assert_eq!(compute_next_prefix("z"), Some("{".to_string())); assert_eq!(compute_next_prefix("abz"), Some("ab{".to_string()));
2506
2507 assert_eq!(compute_next_prefix("ab~"), Some("ab\x7f".to_string()));
2509
2510 assert_eq!(compute_next_prefix(""), None);
2512
2513 assert_eq!(compute_next_prefix("café"), Some("cafê".to_string()));
2516 assert_eq!(compute_next_prefix("abc中"), Some("abc丮".to_string()));
2518 assert_eq!(compute_next_prefix("cafÿ"), Some("cafĀ".to_string()));
2520
2521 assert_eq!(
2524 compute_next_prefix("a\u{D7FF}"),
2525 Some("a\u{E000}".to_string())
2526 );
2527
2528 assert_eq!(compute_next_prefix("ab\u{10FFFF}"), Some("ac".to_string()));
2530 assert_eq!(compute_next_prefix("\u{10FFFF}\u{10FFFF}"), None);
2532 }
2533}