Skip to main content

lance_index/scalar/
zonemap.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Zone Map Index
5//!
6//! Zone maps are a columnar database technique for predicate pushdown and scan pruning.
7//! They break data into fixed-size chunks called "zones" and maintain summary statistics
8//! (min, max, null count) for each zone. This enables efficient filtering by eliminating
9//! zones that cannot contain matching values.
10//!
11//! Zone maps are "inexact" filters - they can definitively exclude zones but may include
12//! false positives that require rechecking.
13//!
14//!
15use crate::Any;
16use crate::pbold;
17use crate::scalar::expression::{SargableQueryParser, ScalarQueryParser};
18use crate::scalar::registry::{
19    ScalarIndexPlugin, TrainingCriteria, TrainingOrdering, TrainingRequest,
20};
21use crate::scalar::{
22    BuiltinIndexType, CreatedIndex, SargableQuery, ScalarIndexParams, UpdateCriteria,
23    compute_next_prefix,
24};
25use datafusion::functions_aggregate::min_max::{MaxAccumulator, MinAccumulator};
26use datafusion_expr::Accumulator;
27use lance_core::cache::{LanceCache, WeakLanceCache};
28use serde::{Deserialize, Serialize};
29use std::sync::LazyLock;
30
31use arrow_array::{ArrayRef, RecordBatch, UInt32Array, UInt64Array, new_empty_array};
32use arrow_schema::{DataType, Field};
33use datafusion::execution::SendableRecordBatchStream;
34use datafusion_common::ScalarValue;
35use std::{collections::HashMap, sync::Arc};
36
37use super::{AnyQuery, IndexStore, MetricsCollector, ScalarIndex, SearchResult};
38use crate::scalar::FragReuseIndex;
39use crate::vector::VectorIndex;
40use crate::{Index, IndexType};
41use async_trait::async_trait;
42use deepsize::DeepSizeOf;
43use lance_core::Error;
44use lance_core::Result;
45use roaring::RoaringBitmap;
46
47use super::zoned::{ZoneBound, ZoneProcessor, ZoneTrainer, rebuild_zones, search_zones};
48const ROWS_PER_ZONE_DEFAULT: u64 = 8192; // 1 zone every two batches
49
50const ZONEMAP_FILENAME: &str = "zonemap.lance";
51const ZONEMAP_SIZE_META_KEY: &str = "rows_per_zone";
52const ZONEMAP_INDEX_VERSION: u32 = 0;
53
54/// Basic stats about zonemap index
55#[derive(Debug, PartialEq, Clone)]
56struct ZoneMapStatistics {
57    min: ScalarValue,
58    max: ScalarValue,
59    null_count: u32,
60    // only apply to float type
61    nan_count: u32,
62    // Bound of this zone within the fragment. Persisted as three separate columns
63    // (fragment_id, zone_start, zone_length) in the index file.
64    bound: ZoneBound,
65}
66
67impl DeepSizeOf for ZoneMapStatistics {
68    fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
69        // Estimate sizes for ScalarValue
70        let min_size = self.min.size() - std::mem::size_of::<ScalarValue>();
71        let max_size = self.max.size() - std::mem::size_of::<ScalarValue>();
72
73        min_size + max_size
74    }
75}
76
77impl AsRef<ZoneBound> for ZoneMapStatistics {
78    fn as_ref(&self) -> &ZoneBound {
79        &self.bound
80    }
81}
82
83/// ZoneMap index
84/// At high level it's a columnar database technique for predicate push down and scan pruning.
85/// It breaks data into fixed-size chunks called `zones` and store summary statistics(min, max, null_count,
86/// nan_count, fragment_id, local_row_offset) for each zone. It enables efficient filtering by skipping zones that do not contain matching values
87///
88/// This is an inexact filter, similar to a bloom filter. It can return false positives that require rechecking.
89///
90/// Note that it cannot return false negatives.
91/// Input:
92/// * Fragment 1: - 10 rows   -> 0  -> 9
93/// * Fragment 2: - 7 rows    -> 10 -> 16
94/// * Fragment 3: - 4 rows    -> 20 -> 23
95/// * Zone size AKA “rows_per_zone” (from user) - 5
96///
97/// Output:
98/// fragment id | min | max | zone_length
99/// 1           | 0   |  4  | 5
100/// 1           | 5   |  9  | 5
101/// 2           | 10  | 14  | 5
102/// 2           | 15  | 16  | 2
103/// 3           | 20  | 23  | 4
104pub struct ZoneMapIndex {
105    zones: Vec<ZoneMapStatistics>,
106    data_type: DataType,
107    // The maximum rows per zone provided by user
108    rows_per_zone: u64,
109    store: Arc<dyn IndexStore>,
110    fri: Option<Arc<FragReuseIndex>>,
111    index_cache: WeakLanceCache,
112}
113
114impl std::fmt::Debug for ZoneMapIndex {
115    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
116        f.debug_struct("ZoneMapIndex")
117            .field("zones", &self.zones)
118            .field("data_type", &self.data_type)
119            .field("rows_per_zone", &self.rows_per_zone)
120            .field("store", &self.store)
121            .field("fri", &self.fri)
122            .field("index_cache", &self.index_cache)
123            .finish()
124    }
125}
126
127impl DeepSizeOf for ZoneMapIndex {
128    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
129        self.zones.deep_size_of_children(context)
130    }
131}
132
133impl ZoneMapIndex {
134    /// Evaluates whether a zone could potentially contain values matching the query
135    /// For NaN, total order is used here
136    /// reference: https://doc.rust-lang.org/std/primitive.f64.html#method.total_cmp
137    fn evaluate_zone_against_query(
138        &self,
139        zone: &ZoneMapStatistics,
140        query: &SargableQuery,
141    ) -> Result<bool> {
142        use std::ops::Bound;
143
144        match query {
145            SargableQuery::IsNull() => {
146                // Zone contains matching values if it has any null values
147                Ok(zone.null_count > 0)
148            }
149            SargableQuery::Equals(target) => {
150                // Zone contains matching values if target falls within [min, max] range
151                // Handle null values - if target is null, check null_count
152                if target.is_null() {
153                    return Ok(zone.null_count > 0);
154                }
155
156                // Handle NaN values - if target is NaN, check nan_count
157                let is_nan = match target {
158                    ScalarValue::Float16(Some(f)) => f.is_nan(),
159                    ScalarValue::Float32(Some(f)) => f.is_nan(),
160                    ScalarValue::Float64(Some(f)) => f.is_nan(),
161                    _ => false,
162                };
163
164                if is_nan {
165                    return Ok(zone.nan_count > 0);
166                }
167
168                // Check if target is within the zone's range
169                // Handle the case where zone.max is NaN (zone contains both finite values and NaN)
170                let min_check = target >= &zone.min;
171                let max_check = match &zone.max {
172                    ScalarValue::Float16(Some(f)) if f.is_nan() => true,
173                    ScalarValue::Float32(Some(f)) if f.is_nan() => true,
174                    ScalarValue::Float64(Some(f)) if f.is_nan() => true,
175                    _ => target <= &zone.max,
176                };
177                Ok(min_check && max_check)
178            }
179            SargableQuery::Range(start, end) => {
180                // Zone overlaps with query range if there's any intersection between
181                // the zone's [min, max] and the query's range
182                let zone_min = &zone.min;
183                let zone_max = &zone.max;
184
185                let start_check = match start {
186                    Bound::Unbounded => true,
187                    Bound::Included(s) => {
188                        // Handle NaN in range bounds - NaN is greater than all finite values
189                        match s {
190                            ScalarValue::Float16(Some(f)) => {
191                                if f.is_nan() {
192                                    return Ok(zone.nan_count > 0);
193                                }
194                            }
195                            ScalarValue::Float32(Some(f)) => {
196                                if f.is_nan() {
197                                    return Ok(zone.nan_count > 0);
198                                }
199                            }
200                            ScalarValue::Float64(Some(f)) => {
201                                if f.is_nan() {
202                                    return Ok(zone.nan_count > 0);
203                                }
204                            }
205                            _ => {}
206                        }
207                        // Handle the case where zone_max is NaN
208                        // If zone_max is NaN, the zone contains both finite values and NaN
209                        // Since we don't know the actual max, we'll be conservative and include the zone
210                        match zone_max {
211                            ScalarValue::Float16(Some(f)) if f.is_nan() => true,
212                            ScalarValue::Float32(Some(f)) if f.is_nan() => true,
213                            ScalarValue::Float64(Some(f)) if f.is_nan() => true,
214                            _ => zone_max >= s,
215                        }
216                    }
217                    Bound::Excluded(s) => {
218                        // Handle NaN in range bounds
219                        match s {
220                            ScalarValue::Float16(Some(f)) => {
221                                if f.is_nan() {
222                                    return Ok(false); // Nothing is greater than NaN
223                                }
224                            }
225                            ScalarValue::Float32(Some(f)) => {
226                                if f.is_nan() {
227                                    return Ok(false); // Nothing is greater than NaN
228                                }
229                            }
230                            ScalarValue::Float64(Some(f)) => {
231                                if f.is_nan() {
232                                    return Ok(false); // Nothing is greater than NaN
233                                }
234                            }
235                            _ => {}
236                        }
237                        zone_max > s
238                    }
239                };
240
241                let end_check = match end {
242                    Bound::Unbounded => true,
243                    Bound::Included(e) => {
244                        // Handle NaN in range bounds
245                        match e {
246                            ScalarValue::Float16(Some(f)) => {
247                                if f.is_nan() {
248                                    // NaN is included, so check if zone has NaN values or finite values
249                                    return Ok(zone.nan_count > 0 || zone_min <= e);
250                                }
251                            }
252                            ScalarValue::Float32(Some(f)) => {
253                                if f.is_nan() {
254                                    return Ok(zone.nan_count > 0 || zone_min <= e);
255                                }
256                            }
257                            ScalarValue::Float64(Some(f)) => {
258                                if f.is_nan() {
259                                    return Ok(zone.nan_count > 0 || zone_min <= e);
260                                }
261                            }
262                            _ => {}
263                        }
264                        zone_min <= e
265                    }
266                    Bound::Excluded(e) => {
267                        // Handle NaN in range bounds
268                        match e {
269                            ScalarValue::Float16(Some(f)) => {
270                                if f.is_nan() {
271                                    // Everything is less than NaN, so include all finite values
272                                    return Ok(true);
273                                }
274                            }
275                            ScalarValue::Float32(Some(f)) => {
276                                if f.is_nan() {
277                                    return Ok(true);
278                                }
279                            }
280                            ScalarValue::Float64(Some(f)) => {
281                                if f.is_nan() {
282                                    return Ok(true);
283                                }
284                            }
285                            _ => {}
286                        }
287                        zone_min < e
288                    }
289                };
290
291                Ok(start_check && end_check)
292            }
293            SargableQuery::IsIn(values) => {
294                // Zone contains matching values if any value in the set falls within [min, max]
295                Ok(values.iter().any(|value| {
296                    if value.is_null() {
297                        zone.null_count > 0
298                    } else {
299                        match value {
300                            ScalarValue::Float16(Some(f)) => {
301                                if f.is_nan() {
302                                    zone.nan_count > 0
303                                } else {
304                                    value >= &zone.min && value <= &zone.max
305                                }
306                            }
307                            ScalarValue::Float32(Some(f)) => {
308                                if f.is_nan() {
309                                    zone.nan_count > 0
310                                } else {
311                                    value >= &zone.min && value <= &zone.max
312                                }
313                            }
314                            ScalarValue::Float64(Some(f)) => {
315                                if f.is_nan() {
316                                    zone.nan_count > 0
317                                } else {
318                                    value >= &zone.min && value <= &zone.max
319                                }
320                            }
321                            _ => value >= &zone.min && value <= &zone.max,
322                        }
323                    }
324                }))
325            }
326            SargableQuery::FullTextSearch(_) => Err(Error::not_supported_source(
327                "full text search is not supported for zonemap indexes".into(),
328            )),
329            SargableQuery::LikePrefix(prefix) => {
330                // For prefix matching, a zone can match if:
331                // - zone.max >= prefix (there could be values >= prefix)
332                // - zone.min < next_prefix (there could be values < next_prefix)
333                //
334                // For example, prefix "foo":
335                // - Zone [aaa, azz]: max="azz" < "foo", so no match
336                // - Zone [fa, foz]: min="fa" < "fop", max="foz" >= "foo", so potential match
337                // - Zone [fop, fzz]: min="fop" >= "fop", so no match
338
339                let prefix_str = match prefix {
340                    ScalarValue::Utf8(Some(s)) => s.as_str(),
341                    ScalarValue::LargeUtf8(Some(s)) => s.as_str(),
342                    _ => return Ok(true), // Conservative: include zone if not a string prefix
343                };
344
345                // Empty prefix matches everything
346                if prefix_str.is_empty() {
347                    return Ok(true);
348                }
349
350                // Check zone.max >= prefix
351                let max_check = &zone.max >= prefix;
352                if !max_check {
353                    return Ok(false);
354                }
355
356                // Compute next_prefix by incrementing the last byte
357                // If the prefix ends with 0xFF bytes, we need to handle overflow
358                let next_prefix = compute_next_prefix(prefix_str);
359
360                match next_prefix {
361                    Some(next) => {
362                        // Check zone.min < next_prefix
363                        let next_scalar = match prefix {
364                            ScalarValue::Utf8(_) => ScalarValue::Utf8(Some(next)),
365                            ScalarValue::LargeUtf8(_) => ScalarValue::LargeUtf8(Some(next)),
366                            _ => return Ok(true),
367                        };
368                        Ok(zone.min < next_scalar)
369                    }
370                    None => {
371                        // No upper bound (prefix is all 0xFF), so any zone with max >= prefix matches
372                        Ok(true)
373                    }
374                }
375            }
376        }
377    }
378
379    /// Load the scalar index from storage
380    async fn load(
381        store: Arc<dyn IndexStore>,
382        fri: Option<Arc<FragReuseIndex>>,
383        index_cache: &LanceCache,
384    ) -> Result<Arc<Self>>
385    where
386        Self: Sized,
387    {
388        let index_file = store.open_index_file(ZONEMAP_FILENAME).await?;
389        let zone_maps = index_file
390            .read_range(0..index_file.num_rows(), None)
391            .await?;
392        let file_schema = index_file.schema();
393
394        let rows_per_zone: u64 = file_schema
395            .metadata
396            .get(ZONEMAP_SIZE_META_KEY)
397            .and_then(|bs| bs.parse().ok())
398            .unwrap_or(ROWS_PER_ZONE_DEFAULT);
399        Ok(Arc::new(Self::try_from_serialized(
400            zone_maps,
401            store,
402            fri,
403            index_cache,
404            rows_per_zone,
405        )?))
406    }
407
408    fn try_from_serialized(
409        data: RecordBatch,
410        store: Arc<dyn IndexStore>,
411        fri: Option<Arc<FragReuseIndex>>,
412        index_cache: &LanceCache,
413        rows_per_zone: u64,
414    ) -> Result<Self> {
415        // The RecordBatch should have columns: min, max, null_count
416        let min_col = data
417            .column_by_name("min")
418            .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'min' column"))?;
419        let max_col = data
420            .column_by_name("max")
421            .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'max' column"))?;
422        let null_count_col = data
423            .column_by_name("null_count")
424            .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'null_count' column"))?
425            .as_any()
426            .downcast_ref::<arrow_array::UInt32Array>()
427            .ok_or_else(|| {
428                Error::invalid_input("ZoneMapIndex: 'null_count' column is not UInt32")
429            })?;
430        let nan_count_col = data
431            .column_by_name("nan_count")
432            .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'nan_count' column"))?
433            .as_any()
434            .downcast_ref::<arrow_array::UInt32Array>()
435            .ok_or_else(|| {
436                Error::invalid_input("ZoneMapIndex: 'nan_count' column is not UInt32")
437            })?;
438        let zone_length = data
439            .column_by_name("zone_length")
440            .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'zone_length' column"))?
441            .as_any()
442            .downcast_ref::<arrow_array::UInt64Array>()
443            .ok_or_else(|| {
444                Error::invalid_input("ZoneMapIndex: 'zone_length' column is not UInt64")
445            })?;
446
447        let fragment_id_col = data
448            .column_by_name("fragment_id")
449            .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'fragment_id' column"))?
450            .as_any()
451            .downcast_ref::<arrow_array::UInt64Array>()
452            .ok_or_else(|| {
453                Error::invalid_input("ZoneMapIndex: 'fragment_id' column is not UInt64")
454            })?;
455
456        let zone_start_col = data
457            .column_by_name("zone_start")
458            .ok_or_else(|| Error::invalid_input("ZoneMapIndex: missing 'zone_start' column"))?
459            .as_any()
460            .downcast_ref::<arrow_array::UInt64Array>()
461            .ok_or_else(|| {
462                Error::invalid_input("ZoneMapIndex: 'zone_start' column is not UInt64")
463            })?;
464
465        let data_type = min_col.data_type().clone();
466
467        if data.num_rows() == 0 {
468            return Ok(Self {
469                zones: Vec::new(),
470                data_type,
471                rows_per_zone,
472                store,
473                fri,
474                index_cache: WeakLanceCache::from(index_cache),
475            });
476        }
477
478        let num_zones = data.num_rows();
479        let mut zones = Vec::with_capacity(num_zones);
480
481        for i in 0..num_zones {
482            let min = ScalarValue::try_from_array(min_col, i)?;
483            let max = ScalarValue::try_from_array(max_col, i)?;
484            let null_count = null_count_col.value(i);
485            let nan_count = nan_count_col.value(i);
486            zones.push(ZoneMapStatistics {
487                min,
488                max,
489                null_count,
490                nan_count,
491                bound: ZoneBound {
492                    fragment_id: fragment_id_col.value(i),
493                    start: zone_start_col.value(i),
494                    length: zone_length.value(i) as usize,
495                },
496            });
497        }
498
499        Ok(Self {
500            zones,
501            data_type,
502            rows_per_zone,
503            store,
504            fri,
505            index_cache: WeakLanceCache::from(index_cache),
506        })
507    }
508}
509
510#[async_trait]
511impl Index for ZoneMapIndex {
512    fn as_any(&self) -> &dyn Any {
513        self
514    }
515
516    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
517        self
518    }
519
520    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn VectorIndex>> {
521        Err(Error::invalid_input_source(
522            "ZoneMapIndex is not a vector index".into(),
523        ))
524    }
525
526    async fn prewarm(&self) -> Result<()> {
527        // Not much to prewarm
528        Ok(())
529    }
530
531    fn statistics(&self) -> Result<serde_json::Value> {
532        Ok(serde_json::json!({
533            "num_zones": self.zones.len(),
534            "rows_per_zone": self.rows_per_zone,
535        }))
536    }
537
538    fn index_type(&self) -> IndexType {
539        IndexType::ZoneMap
540    }
541
542    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
543        let mut frag_ids = RoaringBitmap::new();
544
545        // Loop through zones and add unique fragment IDs to the bitmap
546        for zone in &self.zones {
547            frag_ids.insert(zone.bound.fragment_id as u32);
548        }
549
550        Ok(frag_ids)
551    }
552}
553
554#[async_trait]
555impl ScalarIndex for ZoneMapIndex {
556    async fn search(
557        &self,
558        query: &dyn AnyQuery,
559        metrics: &dyn MetricsCollector,
560    ) -> Result<SearchResult> {
561        let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
562        search_zones(&self.zones, metrics, |zone| {
563            self.evaluate_zone_against_query(zone, query)
564        })
565    }
566
567    fn can_remap(&self) -> bool {
568        false
569    }
570
571    /// Remap the row ids, creating a new remapped version of this index in `dest_store`
572    async fn remap(
573        &self,
574        _mapping: &HashMap<u64, Option<u64>>,
575        _dest_store: &dyn IndexStore,
576    ) -> Result<CreatedIndex> {
577        Err(Error::invalid_input_source(
578            "ZoneMapIndex does not support remap".into(),
579        ))
580    }
581
582    /// Add the new data , creating an updated version of the index in `dest_store`
583    async fn update(
584        &self,
585        new_data: SendableRecordBatchStream,
586        dest_store: &dyn IndexStore,
587        _old_data_filter: Option<super::OldIndexDataFilter>,
588    ) -> Result<CreatedIndex> {
589        // Train new zones for the incoming data stream
590        let schema = new_data.schema();
591        let value_type = schema.field(0).data_type().clone();
592
593        let options = ZoneMapIndexBuilderParams::new(self.rows_per_zone);
594        let processor = ZoneMapProcessor::new(value_type.clone())?;
595        let trainer = ZoneTrainer::new(processor, self.rows_per_zone)?;
596        let updated_zones = rebuild_zones(&self.zones, trainer, new_data).await?;
597
598        // Serialize the combined zones back into the index file
599        let mut builder = ZoneMapIndexBuilder::try_new(options, self.data_type.clone())?;
600        builder.options.rows_per_zone = self.rows_per_zone;
601        builder.maps = updated_zones;
602        builder.write_index(dest_store).await?;
603
604        Ok(CreatedIndex {
605            index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default())
606                .unwrap(),
607            index_version: ZONEMAP_INDEX_VERSION,
608            files: Some(dest_store.list_files_with_sizes().await?),
609        })
610    }
611
612    fn update_criteria(&self) -> UpdateCriteria {
613        UpdateCriteria::only_new_data(
614            TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(),
615        )
616    }
617
618    fn derive_index_params(&self) -> Result<ScalarIndexParams> {
619        let params = serde_json::to_value(ZoneMapIndexBuilderParams::new(self.rows_per_zone))?;
620        Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::ZoneMap).with_params(&params))
621    }
622}
623
624fn default_rows_per_zone() -> u64 {
625    *DEFAULT_ROWS_PER_ZONE
626}
627
628#[derive(Debug, Clone, Serialize, Deserialize)]
629pub struct ZoneMapIndexBuilderParams {
630    #[serde(default = "default_rows_per_zone")]
631    rows_per_zone: u64,
632}
633
634static DEFAULT_ROWS_PER_ZONE: LazyLock<u64> = LazyLock::new(|| {
635    std::env::var("LANCE_ZONEMAP_DEFAULT_ROWS_PER_ZONE")
636        .unwrap_or_else(|_| (ROWS_PER_ZONE_DEFAULT).to_string())
637        .parse()
638        .expect("failed to parse LANCE_ZONEMAP_DEFAULT_ROWS_PER_ZONE")
639});
640
641impl Default for ZoneMapIndexBuilderParams {
642    fn default() -> Self {
643        Self {
644            rows_per_zone: *DEFAULT_ROWS_PER_ZONE,
645        }
646    }
647}
648
649impl ZoneMapIndexBuilderParams {
650    pub fn new(rows_per_zone: u64) -> Self {
651        Self { rows_per_zone }
652    }
653
654    pub fn rows_per_zone(&self) -> u64 {
655        self.rows_per_zone
656    }
657}
658
659// A builder for zonemap index
660pub struct ZoneMapIndexBuilder {
661    options: ZoneMapIndexBuilderParams,
662
663    items_type: DataType,
664    maps: Vec<ZoneMapStatistics>,
665}
666
667impl ZoneMapIndexBuilder {
668    pub fn try_new(options: ZoneMapIndexBuilderParams, items_type: DataType) -> Result<Self> {
669        Ok(Self {
670            options,
671            items_type,
672            maps: Vec::new(),
673        })
674    }
675
676    /// Train the builder using the shared zone trainer.  The input stream must contain
677    /// the value column followed by `_rowaddr`, matching the dataset scan order enforced
678    /// by the scalar index registry.
679    pub async fn train(&mut self, batches_source: SendableRecordBatchStream) -> Result<()> {
680        let processor = ZoneMapProcessor::new(self.items_type.clone())?;
681        let trainer = ZoneTrainer::new(processor, self.options.rows_per_zone)?;
682        self.maps = trainer.train(batches_source).await?;
683        Ok(())
684    }
685
686    fn zonemap_stats_as_batch(&self) -> Result<RecordBatch> {
687        // Flush self.maps as a RecordBatch
688        let mins = if self.maps.is_empty() {
689            new_empty_array(&self.items_type)
690        } else {
691            ScalarValue::iter_to_array(self.maps.iter().map(|stat| stat.min.clone()))?
692        };
693        let maxs = if self.maps.is_empty() {
694            new_empty_array(&self.items_type)
695        } else {
696            ScalarValue::iter_to_array(self.maps.iter().map(|stat| stat.max.clone()))?
697        };
698        let null_counts =
699            UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.null_count));
700
701        let nan_counts = UInt32Array::from_iter_values(self.maps.iter().map(|stat| stat.nan_count));
702
703        let fragment_ids =
704            UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.bound.fragment_id));
705
706        let zone_lengths =
707            UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.bound.length as u64));
708
709        let zone_starts =
710            UInt64Array::from_iter_values(self.maps.iter().map(|stat| stat.bound.start));
711
712        let schema = Arc::new(arrow_schema::Schema::new(vec![
713            // min and max can be null if the entire batch is null values
714            Field::new("min", self.items_type.clone(), true),
715            Field::new("max", self.items_type.clone(), true),
716            Field::new("null_count", DataType::UInt32, false),
717            Field::new("nan_count", DataType::UInt32, false),
718            Field::new("fragment_id", DataType::UInt64, false),
719            Field::new("zone_start", DataType::UInt64, false),
720            Field::new("zone_length", DataType::UInt64, false),
721        ]));
722
723        let columns: Vec<ArrayRef> = vec![
724            mins,
725            maxs,
726            Arc::new(null_counts) as ArrayRef,
727            Arc::new(nan_counts) as ArrayRef,
728            Arc::new(fragment_ids) as ArrayRef,
729            Arc::new(zone_starts) as ArrayRef,
730            Arc::new(zone_lengths) as ArrayRef,
731        ];
732        Ok(RecordBatch::try_new(schema, columns)?)
733    }
734
735    pub async fn write_index(self, index_store: &dyn IndexStore) -> Result<()> {
736        let record_batch = self.zonemap_stats_as_batch()?;
737
738        let mut file_schema = record_batch.schema().as_ref().clone();
739        file_schema.metadata.insert(
740            ZONEMAP_SIZE_META_KEY.to_string(),
741            self.options.rows_per_zone.to_string(),
742        );
743
744        let mut index_file = index_store
745            .new_index_file(ZONEMAP_FILENAME, Arc::new(file_schema))
746            .await?;
747        index_file.write_record_batch(record_batch).await?;
748        index_file.finish().await?;
749        Ok(())
750    }
751}
752
753/// Index-specific processor that computes min/max statistics for each zone while the
754/// trainer takes care of chunking and fragment boundaries.
755struct ZoneMapProcessor {
756    data_type: DataType,
757    min: MinAccumulator,
758    max: MaxAccumulator,
759    null_count: u32,
760    nan_count: u32,
761}
762
763impl ZoneMapProcessor {
764    fn new(data_type: DataType) -> Result<Self> {
765        let min = MinAccumulator::try_new(&data_type)?;
766        let max = MaxAccumulator::try_new(&data_type)?;
767        Ok(Self {
768            data_type,
769            min,
770            max,
771            null_count: 0,
772            nan_count: 0,
773        })
774    }
775
776    fn count_nans(array: &ArrayRef) -> u32 {
777        match array.data_type() {
778            DataType::Float16 => {
779                let array = array
780                    .as_any()
781                    .downcast_ref::<arrow_array::Float16Array>()
782                    .unwrap();
783                array.values().iter().filter(|&&x| x.is_nan()).count() as u32
784            }
785            DataType::Float32 => {
786                let array = array
787                    .as_any()
788                    .downcast_ref::<arrow_array::Float32Array>()
789                    .unwrap();
790                array.values().iter().filter(|&&x| x.is_nan()).count() as u32
791            }
792            DataType::Float64 => {
793                let array = array
794                    .as_any()
795                    .downcast_ref::<arrow_array::Float64Array>()
796                    .unwrap();
797                array.values().iter().filter(|&&x| x.is_nan()).count() as u32
798            }
799            _ => 0,
800        }
801    }
802}
803
804impl ZoneProcessor for ZoneMapProcessor {
805    type ZoneStatistics = ZoneMapStatistics;
806
807    fn process_chunk(&mut self, array: &ArrayRef) -> Result<()> {
808        self.null_count += array.null_count() as u32;
809        self.nan_count += Self::count_nans(array);
810        self.min.update_batch(std::slice::from_ref(array))?;
811        self.max.update_batch(std::slice::from_ref(array))?;
812        Ok(())
813    }
814
815    fn finish_zone(&mut self, bound: ZoneBound) -> Result<Self::ZoneStatistics> {
816        Ok(ZoneMapStatistics {
817            min: self.min.evaluate()?,
818            max: self.max.evaluate()?,
819            null_count: self.null_count,
820            nan_count: self.nan_count,
821            bound,
822        })
823    }
824
825    fn reset(&mut self) -> Result<()> {
826        self.min = MinAccumulator::try_new(&self.data_type)?;
827        self.max = MaxAccumulator::try_new(&self.data_type)?;
828        self.null_count = 0;
829        self.nan_count = 0;
830        Ok(())
831    }
832}
833
834#[derive(Debug, Default)]
835pub struct ZoneMapIndexPlugin;
836
837impl ZoneMapIndexPlugin {
838    async fn train_zonemap_index(
839        batches_source: SendableRecordBatchStream,
840        index_store: &dyn IndexStore,
841        options: Option<ZoneMapIndexBuilderParams>,
842    ) -> Result<()> {
843        // train_zonemap_index: calling scan_aligned_chunks
844        let value_type = batches_source.schema().field(0).data_type().clone();
845
846        let mut builder = ZoneMapIndexBuilder::try_new(options.unwrap_or_default(), value_type)?;
847
848        builder.train(batches_source).await?;
849
850        builder.write_index(index_store).await?;
851        Ok(())
852    }
853}
854
855pub struct ZoneMapIndexTrainingRequest {
856    pub params: ZoneMapIndexBuilderParams,
857    pub criteria: TrainingCriteria,
858}
859
860impl ZoneMapIndexTrainingRequest {
861    pub fn new(params: ZoneMapIndexBuilderParams) -> Self {
862        Self {
863            params,
864            criteria: TrainingCriteria::new(TrainingOrdering::Addresses).with_row_addr(),
865        }
866    }
867}
868
869impl TrainingRequest for ZoneMapIndexTrainingRequest {
870    fn as_any(&self) -> &dyn std::any::Any {
871        self
872    }
873    fn criteria(&self) -> &TrainingCriteria {
874        &self.criteria
875    }
876}
877
878#[async_trait]
879impl ScalarIndexPlugin for ZoneMapIndexPlugin {
880    fn name(&self) -> &str {
881        "ZoneMap"
882    }
883
884    fn new_training_request(
885        &self,
886        params: &str,
887        field: &Field,
888    ) -> Result<Box<dyn TrainingRequest>> {
889        if field.data_type().is_nested() {
890            return Err(Error::invalid_input_source(
891                "A zone map index can only be created on a non-nested field.".into(),
892            ));
893        }
894
895        let params = serde_json::from_str::<ZoneMapIndexBuilderParams>(params)?;
896
897        Ok(Box::new(ZoneMapIndexTrainingRequest::new(params)))
898    }
899
900    fn provides_exact_answer(&self) -> bool {
901        false
902    }
903
904    fn version(&self) -> u32 {
905        ZONEMAP_INDEX_VERSION
906    }
907
908    fn new_query_parser(
909        &self,
910        index_name: String,
911        _index_details: &prost_types::Any,
912    ) -> Option<Box<dyn ScalarQueryParser>> {
913        Some(Box::new(SargableQueryParser::new(index_name, true)))
914    }
915
916    async fn train_index(
917        &self,
918        data: SendableRecordBatchStream,
919        index_store: &dyn IndexStore,
920        request: Box<dyn TrainingRequest>,
921        _fragment_ids: Option<Vec<u32>>,
922        _progress: Arc<dyn crate::progress::IndexBuildProgress>,
923    ) -> Result<CreatedIndex> {
924        let request = (request as Box<dyn std::any::Any>)
925            .downcast::<ZoneMapIndexTrainingRequest>()
926            .map_err(|_| {
927                Error::invalid_input_source(
928                    "must provide training request created by new_training_request".into(),
929                )
930            })?;
931        Self::train_zonemap_index(data, index_store, Some(request.params)).await?;
932        Ok(CreatedIndex {
933            index_details: prost_types::Any::from_msg(&pbold::ZoneMapIndexDetails::default())
934                .unwrap(),
935            index_version: ZONEMAP_INDEX_VERSION,
936            files: Some(index_store.list_files_with_sizes().await?),
937        })
938    }
939
940    async fn load_index(
941        &self,
942        index_store: Arc<dyn IndexStore>,
943        _index_details: &prost_types::Any,
944        frag_reuse_index: Option<Arc<FragReuseIndex>>,
945        cache: &LanceCache,
946    ) -> Result<Arc<dyn ScalarIndex>> {
947        Ok(ZoneMapIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
948    }
949}
950
951#[cfg(test)]
952mod tests {
953    use crate::scalar::registry::VALUE_COLUMN_NAME;
954    use crate::scalar::{IndexStore, zonemap::ROWS_PER_ZONE_DEFAULT};
955    use std::sync::Arc;
956
957    use crate::scalar::zoned::ZoneBound;
958    use crate::scalar::zonemap::{ZoneMapIndexPlugin, ZoneMapStatistics};
959    use arrow::datatypes::Float32Type;
960    use arrow_array::{Array, RecordBatch, UInt64Array, record_batch};
961    use arrow_schema::{DataType, Field, Schema};
962    use datafusion::execution::SendableRecordBatchStream;
963    use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
964    use datafusion_common::ScalarValue;
965    use futures::{StreamExt, TryStreamExt, stream};
966    use lance_core::utils::mask::NullableRowAddrSet;
967    use lance_core::utils::tempfile::TempObjDir;
968    use lance_core::{
969        ROW_ADDR,
970        cache::{LanceCache, WeakLanceCache},
971        utils::mask::RowAddrTreeMap,
972    };
973    use lance_datafusion::datagen::DatafusionDatagenExt;
974    use lance_datagen::ArrayGeneratorExt;
975    use lance_datagen::{BatchCount, RowCount, array};
976    use lance_io::object_store::ObjectStore;
977
978    use crate::scalar::{
979        SargableQuery, ScalarIndex, SearchResult,
980        lance_format::LanceIndexStore,
981        zonemap::{
982            ZONEMAP_FILENAME, ZONEMAP_SIZE_META_KEY, ZoneMapIndex, ZoneMapIndexBuilderParams,
983        },
984    };
985
986    // Add missing imports for the tests
987    use crate::Index; // Import Index trait to access calculate_included_frags
988    use crate::metrics::NoOpMetricsCollector;
989    use roaring::RoaringBitmap; // Import RoaringBitmap for the test
990    use std::collections::Bound;
991
992    // Adds a _rowaddr column emulating each batch as a new fragment
993    fn add_row_addr(stream: SendableRecordBatchStream) -> SendableRecordBatchStream {
994        let schema = stream.schema();
995        let schema_with_row_addr = Arc::new(Schema::new(vec![
996            schema.field(0).clone(),
997            Field::new(ROW_ADDR, DataType::UInt64, false),
998        ]));
999        let schema = schema_with_row_addr.clone();
1000        let stream = stream.enumerate().map(move |(frag_id, batch)| {
1001            let batch = batch.unwrap();
1002            let row_addr = Arc::new(UInt64Array::from_iter_values(
1003                (0..batch.num_rows() as u64).map(|off| off + ((frag_id as u64) << 32)),
1004            ));
1005            Ok(RecordBatch::try_new(
1006                schema_with_row_addr.clone(),
1007                vec![batch.column(0).clone(), row_addr],
1008            )?)
1009        });
1010        Box::pin(RecordBatchStreamAdapter::new(schema, stream))
1011    }
1012
1013    #[tokio::test]
1014    async fn test_empty_zonemap_index() {
1015        let tmpdir = TempObjDir::default();
1016        let test_store = Arc::new(LanceIndexStore::new(
1017            Arc::new(ObjectStore::local()),
1018            tmpdir.clone(),
1019            Arc::new(LanceCache::no_cache()),
1020        ));
1021
1022        let data = arrow_array::Int32Array::from(Vec::<i32>::new());
1023        let row_ids = arrow_array::UInt64Array::from(Vec::<u64>::new());
1024        let schema = Arc::new(Schema::new(vec![
1025            Field::new(VALUE_COLUMN_NAME, DataType::Int32, false),
1026            Field::new(ROW_ADDR, DataType::UInt64, false),
1027        ]));
1028        let data =
1029            RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap();
1030
1031        let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1032            schema,
1033            stream::once(std::future::ready(Ok(data))),
1034        ));
1035
1036        ZoneMapIndexPlugin::train_zonemap_index(data_stream, test_store.as_ref(), None)
1037            .await
1038            .unwrap();
1039
1040        log::debug!("Successfully wrote the index file");
1041
1042        // Read the index file back and check its contents
1043        let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1044            .await
1045            .expect("Failed to load ZoneMapIndex");
1046        assert_eq!(index.zones.len(), 0);
1047        assert_eq!(index.data_type, DataType::Int32);
1048        assert_eq!(index.rows_per_zone, ROWS_PER_ZONE_DEFAULT);
1049
1050        // Equals query: null (should match nothing, as there are no nulls)
1051        let query = SargableQuery::Equals(ScalarValue::Int32(None));
1052        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1053        assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1054    }
1055
1056    #[tokio::test]
1057    // Test that a zonemap index can be created with null values from few fragments
1058    async fn test_null_zonemap_index() {
1059        let tmpdir = TempObjDir::default();
1060        let test_store = Arc::new(LanceIndexStore::new(
1061            Arc::new(ObjectStore::local()),
1062            tmpdir.clone(),
1063            Arc::new(LanceCache::no_cache()),
1064        ));
1065
1066        let stream = lance_datagen::gen_batch()
1067            .col(
1068                VALUE_COLUMN_NAME,
1069                array::rand::<Float32Type>().with_nulls(&[true, false, false, false, false]),
1070            )
1071            .into_df_stream(RowCount::from(5000), BatchCount::from(10));
1072
1073        // Add _rowaddr column
1074        let stream = add_row_addr(stream);
1075
1076        ZoneMapIndexPlugin::train_zonemap_index(
1077            stream,
1078            test_store.as_ref(),
1079            Some(ZoneMapIndexBuilderParams::new(5000)),
1080        )
1081        .await
1082        .unwrap();
1083
1084        log::debug!("Successfully wrote the index file");
1085
1086        // Read the index file back and check its contents
1087        let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1088            .await
1089            .expect("Failed to load ZoneMapIndex");
1090        assert_eq!(index.zones.len(), 10);
1091        for (i, zone) in index.zones.iter().enumerate() {
1092            assert_eq!(zone.null_count, 1000);
1093            assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
1094            assert_eq!(zone.bound.length, 5000);
1095            assert_eq!(zone.bound.fragment_id, i as u64);
1096        }
1097
1098        // Equals query: null (should match all zones since they contain null values)
1099        let query = SargableQuery::Equals(ScalarValue::Int32(None));
1100        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1101
1102        // Create expected RowAddrTreeMap with all zones since they contain null values
1103        let mut expected = RowAddrTreeMap::new();
1104        for fragment_id in 0..10 {
1105            let start = (fragment_id as u64) << 32;
1106            let end = start + 5000;
1107            expected.insert_range(start..end);
1108        }
1109        assert_eq!(result, SearchResult::at_most(expected));
1110
1111        // Test update - add new data with Float32 values (matching the original data type)
1112        let new_data =
1113            arrow_array::Float32Array::from_iter_values((0..5000).map(|i| i as f32 / 1000.0));
1114        // Create row addresses for fragment 10 (next fragment after 0-9)
1115        let new_row_addr =
1116            UInt64Array::from_iter_values((0..5000).map(|i| (10u64 << 32) | (i as u64)));
1117        let new_schema = Arc::new(Schema::new(vec![
1118            Field::new(VALUE_COLUMN_NAME, DataType::Float32, false), // Match original schema
1119            Field::new(ROW_ADDR, DataType::UInt64, false), // Use _rowaddr as expected by the builder
1120        ]));
1121        let new_data_batch = RecordBatch::try_new(
1122            new_schema.clone(),
1123            vec![Arc::new(new_data), Arc::new(new_row_addr)],
1124        )
1125        .unwrap();
1126        let new_data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1127            new_schema,
1128            stream::once(std::future::ready(Ok(new_data_batch))),
1129        ));
1130
1131        // Directly pass the stream with proper row addresses instead of using MockTrainingSource
1132        // which would regenerate row addresses starting from 0
1133        index
1134            .update(new_data_stream, test_store.as_ref(), None)
1135            .await
1136            .unwrap();
1137
1138        // Verify the updated index has more zones
1139        let updated_index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1140            .await
1141            .expect("Failed to load updated ZoneMapIndex");
1142
1143        // Should have original 10 zones + 1 new zone (5000 rows with zone size 5000)
1144        assert_eq!(updated_index.zones.len(), 11);
1145
1146        // Verify the new zone was added
1147        let new_zone = &updated_index.zones[10]; // Last zone should be the new one
1148        assert_eq!(new_zone.bound.fragment_id, 10u64); // New fragment ID
1149        assert_eq!(new_zone.bound.length, 5000);
1150        assert_eq!(new_zone.null_count, 0); // New data has no nulls
1151        assert_eq!(new_zone.nan_count, 0); // New data has no NaN values
1152
1153        // Test search on updated index - search for null values should still work
1154        let query = SargableQuery::Equals(ScalarValue::Float32(None));
1155        let result = updated_index
1156            .search(&query, &NoOpMetricsCollector)
1157            .await
1158            .unwrap();
1159
1160        // Should match original 10 zones (with nulls) but not the new zone (no nulls)
1161        let mut expected = RowAddrTreeMap::new();
1162        for fragment_id in 0..10 {
1163            let start = (fragment_id as u64) << 32;
1164            let end = start + 5000;
1165            expected.insert_range(start..end);
1166        }
1167        assert_eq!(result, SearchResult::at_most(expected));
1168
1169        // Test search for a value that should be in the new zone
1170        let query = SargableQuery::Equals(ScalarValue::Float32(Some(2.5))); // Value 2500/1000 = 2.5
1171        let result = updated_index
1172            .search(&query, &NoOpMetricsCollector)
1173            .await
1174            .unwrap();
1175
1176        // Should match the new zone (fragment 10)
1177        let mut expected = RowAddrTreeMap::new();
1178        let start = 10u64 << 32;
1179        let end = start + 5000;
1180        expected.insert_range(start..end);
1181        assert_eq!(result, SearchResult::at_most(expected));
1182    }
1183
1184    #[tokio::test]
1185    async fn test_zonemap_null_handling_in_queries() {
1186        // Test that zonemap index correctly returns null_list for queries
1187        let tmpdir = TempObjDir::default();
1188        let store = Arc::new(LanceIndexStore::new(
1189            Arc::new(ObjectStore::local()),
1190            tmpdir.clone(),
1191            Arc::new(LanceCache::no_cache()),
1192        ));
1193
1194        // Create test data: [0, 5, null]
1195        let batch = record_batch!(
1196            (VALUE_COLUMN_NAME, Int64, [Some(0), Some(5), None]),
1197            (ROW_ADDR, UInt64, [0, 1, 2])
1198        )
1199        .unwrap();
1200        let schema = batch.schema();
1201        let stream = stream::once(async move { Ok(batch) });
1202        let stream = Box::pin(RecordBatchStreamAdapter::new(schema, stream));
1203
1204        // Train and write the zonemap index
1205        ZoneMapIndexPlugin::train_zonemap_index(stream, store.as_ref(), None)
1206            .await
1207            .unwrap();
1208
1209        let cache = LanceCache::with_capacity(1024 * 1024);
1210        let index = ZoneMapIndex::load(store.clone(), None, &cache)
1211            .await
1212            .unwrap();
1213
1214        // Test 1: Search for value 5 - zonemap should return at_most with all rows
1215        // Since ZoneMap returns AtMost (superset), it's correct to include nulls in the result
1216        let query = SargableQuery::Equals(ScalarValue::Int64(Some(5)));
1217        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1218
1219        match result {
1220            SearchResult::AtMost(row_ids) => {
1221                // Zonemap can't determine exact matches, so it returns all rows in the zone
1222                // This includes nulls because ZoneMap can't prove they don't match
1223                let all_rows: Vec<u64> = row_ids
1224                    .true_rows()
1225                    .row_addrs()
1226                    .unwrap()
1227                    .map(u64::from)
1228                    .collect();
1229                assert_eq!(
1230                    all_rows,
1231                    vec![0, 1, 2],
1232                    "Should return all rows (including nulls) since ZoneMap is inexact"
1233                );
1234
1235                // For AtMost results, nulls are included in the superset
1236                // Downstream processing will handle null filtering
1237            }
1238            _ => panic!("Expected AtMost search result from zonemap"),
1239        }
1240
1241        // Test 2: Range query - should also return all rows as AtMost
1242        let query = SargableQuery::Range(
1243            std::ops::Bound::Included(ScalarValue::Int64(Some(0))),
1244            std::ops::Bound::Included(ScalarValue::Int64(Some(3))),
1245        );
1246        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1247
1248        match result {
1249            SearchResult::AtMost(row_ids) => {
1250                // Again, ZoneMap returns superset including nulls
1251                let all_rows: Vec<u64> = row_ids
1252                    .true_rows()
1253                    .row_addrs()
1254                    .unwrap()
1255                    .map(u64::from)
1256                    .collect();
1257                assert_eq!(
1258                    all_rows,
1259                    vec![0, 1, 2],
1260                    "Should return all rows in zone as possible matches"
1261                );
1262            }
1263            _ => panic!("Expected AtMost search result from zonemap"),
1264        }
1265    }
1266
1267    #[tokio::test]
1268    async fn test_nan_zonemap_index() {
1269        let tmpdir = TempObjDir::default();
1270        let test_store = Arc::new(LanceIndexStore::new(
1271            Arc::new(ObjectStore::local()),
1272            tmpdir.clone(),
1273            Arc::new(LanceCache::no_cache()),
1274        ));
1275
1276        // Create deterministic data with NaN values
1277        // Pattern: [1.0, 2.0, NaN, 3.0, 4.0, 5.0, NaN, 6.0, 7.0, 8.0, ...]
1278        let mut values = Vec::new();
1279        for i in 0..500 {
1280            if i % 5 == 2 {
1281                values.push(f32::NAN);
1282            } else {
1283                // Other values are sequential numbers
1284                values.push(i as f32);
1285            }
1286        }
1287
1288        let float_data = arrow_array::Float32Array::from(values);
1289        let row_ids = UInt64Array::from_iter_values((0..float_data.len()).map(|i| i as u64));
1290        let schema = Arc::new(Schema::new(vec![
1291            Field::new(VALUE_COLUMN_NAME, DataType::Float32, true),
1292            Field::new(ROW_ADDR, DataType::UInt64, false),
1293        ]));
1294        let data = RecordBatch::try_new(
1295            schema.clone(),
1296            vec![Arc::new(float_data.clone()), Arc::new(row_ids)],
1297        )
1298        .unwrap();
1299        let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1300            schema,
1301            stream::once(std::future::ready(Ok(data))),
1302        ));
1303
1304        ZoneMapIndexPlugin::train_zonemap_index(
1305            data_stream,
1306            test_store.as_ref(),
1307            Some(ZoneMapIndexBuilderParams::new(100)),
1308        )
1309        .await
1310        .unwrap();
1311
1312        // Load the index
1313        let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1314            .await
1315            .expect("Failed to load ZoneMapIndex");
1316
1317        // Should have 5 zones since we have 500 rows and zone size is 100
1318        assert_eq!(index.zones.len(), 5);
1319
1320        // Check that each zone has the expected NaN count
1321        // Each zone has 100 values, and every 5th value (indices 2, 7, 12, ...) is NaN
1322        // So each zone should have 20 NaN values (100/5 = 20)
1323        for (i, zone) in index.zones.iter().enumerate() {
1324            assert_eq!(zone.nan_count, 20, "Zone {} should have 20 NaN values", i);
1325            assert_eq!(
1326                zone.bound.length, 100,
1327                "Zone {} should have zone_length 100",
1328                i
1329            );
1330            assert_eq!(
1331                zone.bound.fragment_id, 0u64,
1332                "Zone {} should have fragment_id 0",
1333                i
1334            );
1335        }
1336
1337        // Test search for NaN values using Equals with NaN
1338        let query = SargableQuery::Equals(ScalarValue::Float32(Some(f32::NAN)));
1339        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1340
1341        // Should match all zones since they all contain NaN values
1342        let mut expected = RowAddrTreeMap::new();
1343        expected.insert_range(0..500); // All rows since NaN is in every zone
1344        assert_eq!(result, SearchResult::at_most(expected));
1345
1346        // Test search for a specific finite value that exists in the data
1347        let query = SargableQuery::Equals(ScalarValue::Float32(Some(5.0)));
1348        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1349
1350        // Should match only the first zone since 5.0 only exists in rows 0-99
1351        let mut expected = RowAddrTreeMap::new();
1352        expected.insert_range(0..100);
1353        assert_eq!(result, SearchResult::at_most(expected));
1354
1355        // Test search for a value that doesn't exist
1356        let query = SargableQuery::Equals(ScalarValue::Float32(Some(1000.0)));
1357        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1358
1359        // Since zones contain NaN values, their max will be NaN, so they will be included
1360        // as potential matches for any finite target (false positive, but acceptable for zone maps)
1361        let mut expected = RowAddrTreeMap::new();
1362        expected.insert_range(0..500);
1363        assert_eq!(result, SearchResult::at_most(expected));
1364
1365        // Test range query that should include finite values
1366        let query = SargableQuery::Range(
1367            Bound::Included(ScalarValue::Float32(Some(0.0))),
1368            Bound::Included(ScalarValue::Float32(Some(250.0))),
1369        );
1370        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1371
1372        // Should match the first three zones since they contain values in the range [0, 250]
1373        let mut expected = RowAddrTreeMap::new();
1374        expected.insert_range(0..300);
1375        assert_eq!(result, SearchResult::at_most(expected));
1376
1377        // Test IsIn query with NaN and finite values
1378        let query = SargableQuery::IsIn(vec![
1379            ScalarValue::Float32(Some(f32::NAN)),
1380            ScalarValue::Float32(Some(5.0)),
1381            ScalarValue::Float32(Some(150.0)), // This value exists in the second zone
1382        ]);
1383        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1384
1385        // Should match all zones since they all contain NaN values
1386        let mut expected = RowAddrTreeMap::new();
1387        expected.insert_range(0..500);
1388        assert_eq!(result, SearchResult::at_most(expected));
1389
1390        // Test range query that excludes all values
1391        let query = SargableQuery::Range(
1392            Bound::Included(ScalarValue::Float32(Some(1000.0))),
1393            Bound::Included(ScalarValue::Float32(Some(2000.0))),
1394        );
1395        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1396
1397        // Since zones contain NaN values, their max will be NaN, so they will be included
1398        // as potential matches for any range query (false positive, but acceptable for zone maps)
1399        let mut expected = RowAddrTreeMap::new();
1400        expected.insert_range(0..500);
1401        assert_eq!(result, SearchResult::at_most(expected));
1402
1403        // Test IsNull query (should match nothing since there are no null values)
1404        let query = SargableQuery::IsNull();
1405        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1406        assert_eq!(result, SearchResult::AtMost(NullableRowAddrSet::empty()));
1407
1408        // Test range queries with NaN bounds
1409        // Range with NaN as start bound (included)
1410        let query = SargableQuery::Range(
1411            Bound::Included(ScalarValue::Float32(Some(f32::NAN))),
1412            Bound::Unbounded,
1413        );
1414        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1415        // Should match all zones since they all contain NaN values
1416        let mut expected = RowAddrTreeMap::new();
1417        expected.insert_range(0..500);
1418        assert_eq!(result, SearchResult::at_most(expected));
1419
1420        // Range with NaN as end bound (included)
1421        let query = SargableQuery::Range(
1422            Bound::Unbounded,
1423            Bound::Included(ScalarValue::Float32(Some(f32::NAN))),
1424        );
1425        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1426        // Should match all zones since they all contain NaN values
1427        let mut expected = RowAddrTreeMap::new();
1428        expected.insert_range(0..500);
1429        assert_eq!(result, SearchResult::at_most(expected));
1430
1431        // Range with NaN as end bound (excluded)
1432        let query = SargableQuery::Range(
1433            Bound::Unbounded,
1434            Bound::Excluded(ScalarValue::Float32(Some(f32::NAN))),
1435        );
1436        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1437        // Should match all zones since everything is less than NaN
1438        let mut expected = RowAddrTreeMap::new();
1439        expected.insert_range(0..500);
1440        assert_eq!(result, SearchResult::at_most(expected));
1441
1442        // Range with NaN as start bound (excluded)
1443        let query = SargableQuery::Range(
1444            Bound::Excluded(ScalarValue::Float32(Some(f32::NAN))),
1445            Bound::Unbounded,
1446        );
1447        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1448        // Should match nothing since nothing is greater than NaN
1449        assert_eq!(result, SearchResult::AtMost(NullableRowAddrSet::empty()));
1450
1451        // Test IsIn query with mixed float types (Float16, Float32, Float64)
1452        let query = SargableQuery::IsIn(vec![
1453            ScalarValue::Float16(Some(half::f16::NAN)),
1454            ScalarValue::Float32(Some(f32::NAN)),
1455            ScalarValue::Float64(Some(f64::NAN)),
1456            ScalarValue::Float32(Some(5.0)),
1457        ]);
1458        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1459        // Should match all zones since they all contain NaN values
1460        let mut expected = RowAddrTreeMap::new();
1461        expected.insert_range(0..500);
1462        assert_eq!(result, SearchResult::at_most(expected));
1463    }
1464
1465    #[tokio::test]
1466    // Test data that belongs to the same fragment but coming from different batches
1467    async fn test_basic_zonemap_index() {
1468        let tmpdir = TempObjDir::default();
1469        let test_store = Arc::new(LanceIndexStore::new(
1470            Arc::new(ObjectStore::local()),
1471            tmpdir.clone(),
1472            Arc::new(LanceCache::no_cache()),
1473        ));
1474
1475        let data = arrow_array::Int32Array::from_iter_values(0..=100);
1476        let row_ids = UInt64Array::from_iter_values((0..data.len()).map(|i| i as u64));
1477        let schema = Arc::new(Schema::new(vec![
1478            Field::new(VALUE_COLUMN_NAME, DataType::Int32, false),
1479            Field::new(ROW_ADDR, DataType::UInt64, false),
1480        ]));
1481        let data =
1482            RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap();
1483        let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1484            schema,
1485            stream::once(std::future::ready(Ok(data))),
1486        ));
1487
1488        ZoneMapIndexPlugin::train_zonemap_index(
1489            data_stream,
1490            test_store.as_ref(),
1491            Some(ZoneMapIndexBuilderParams::new(100)),
1492        )
1493        .await
1494        .unwrap();
1495
1496        log::debug!("Successfully wrote the index file");
1497
1498        // Read the raw index file back and check its contents
1499        let index_file = test_store.open_index_file(ZONEMAP_FILENAME).await.unwrap();
1500        // Print the metadata from the index_file
1501        let metadata = index_file.schema().metadata.clone();
1502        let record_batch = index_file
1503            .read_record_batch(0, index_file.num_rows() as u64)
1504            .await
1505            .unwrap();
1506        assert_eq!(record_batch.num_rows(), 2);
1507        assert_eq!(
1508            record_batch
1509                .column(0)
1510                .as_any()
1511                .downcast_ref::<arrow_array::Int32Array>()
1512                .unwrap()
1513                .values(),
1514            &[0, 100]
1515        );
1516        assert_eq!(
1517            record_batch
1518                .column(1)
1519                .as_any()
1520                .downcast_ref::<arrow_array::Int32Array>()
1521                .unwrap()
1522                .values(),
1523            &[99, 100]
1524        );
1525        assert_eq!(
1526            record_batch
1527                .column(2)
1528                .as_any()
1529                .downcast_ref::<arrow_array::UInt32Array>()
1530                .unwrap()
1531                .values(),
1532            &[0, 0]
1533        );
1534        assert_eq!(metadata.get(ZONEMAP_SIZE_META_KEY).unwrap(), "100");
1535
1536        // Read the index file back and check its contents
1537        let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1538            .await
1539            .expect("Failed to load ZoneMapIndex");
1540        assert_eq!(index.zones.len(), 2);
1541        assert_eq!(
1542            index.zones,
1543            vec![
1544                ZoneMapStatistics {
1545                    min: ScalarValue::Int32(Some(0)),
1546                    max: ScalarValue::Int32(Some(99)),
1547                    null_count: 0,
1548                    nan_count: 0,
1549                    bound: ZoneBound {
1550                        fragment_id: 0,
1551                        start: 0,
1552                        length: 100,
1553                    },
1554                },
1555                ZoneMapStatistics {
1556                    min: ScalarValue::Int32(Some(100)),
1557                    max: ScalarValue::Int32(Some(100)),
1558                    null_count: 0,
1559                    nan_count: 0,
1560                    bound: ZoneBound {
1561                        fragment_id: 0,
1562                        start: 100,
1563                        length: 1,
1564                    },
1565                }
1566            ]
1567        );
1568        // Verify nan_count is 0 for all zones (no NaN values in integer data)
1569        for (i, zone) in index.zones.iter().enumerate() {
1570            assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
1571        }
1572
1573        assert_eq!(index.data_type, DataType::Int32);
1574        assert_eq!(index.rows_per_zone, 100);
1575        assert_eq!(
1576            index.calculate_included_frags().await.unwrap(),
1577            RoaringBitmap::from_iter(0..1)
1578        );
1579
1580        // Test search functionality
1581
1582        // 1. Range query: (50, +inf)
1583        let query = SargableQuery::Range(
1584            Bound::Excluded(ScalarValue::Int32(Some(50))),
1585            Bound::Unbounded,
1586        );
1587        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1588        assert_eq!(result, SearchResult::at_most(0..=100));
1589
1590        // 2. Range query: [0, 50]
1591        let query = SargableQuery::Range(
1592            Bound::Included(ScalarValue::Int32(Some(0))),
1593            Bound::Included(ScalarValue::Int32(Some(50))),
1594        );
1595        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1596        assert_eq!(result, SearchResult::at_most(0..=99));
1597
1598        // 3. Range query: [101, 200] (should only match the second zone, which is row 100)
1599        let query = SargableQuery::Range(
1600            Bound::Included(ScalarValue::Int32(Some(101))),
1601            Bound::Included(ScalarValue::Int32(Some(200))),
1602        );
1603        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1604        // Only row 100 is in the second zone, but its value is 100, so this should be empty
1605        assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1606
1607        // 4. Range query: [100, 100] (should match only the last row)
1608        let query = SargableQuery::Range(
1609            Bound::Included(ScalarValue::Int32(Some(100))),
1610            Bound::Included(ScalarValue::Int32(Some(100))),
1611        );
1612        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1613        assert_eq!(result, SearchResult::at_most(100..=100));
1614
1615        // 5. Equals query: 0 (should match first row)
1616        let query = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
1617        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1618        assert_eq!(result, SearchResult::at_most(0..=99));
1619
1620        // 6. Equals query: 100 (should match only last row)
1621        let query = SargableQuery::Equals(ScalarValue::Int32(Some(100)));
1622        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1623        assert_eq!(result, SearchResult::at_most(100..=100));
1624
1625        // 7. Equals query: 101 (should match nothing)
1626        let query = SargableQuery::Equals(ScalarValue::Int32(Some(101)));
1627        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1628        assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1629
1630        // 8. IsNull query (no nulls in data, should match nothing)
1631        let query = SargableQuery::IsNull();
1632        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1633        assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1634        // 9. IsIn query: [0, 100, 101, 50]
1635        let query = SargableQuery::IsIn(vec![
1636            ScalarValue::Int32(Some(0)),
1637            ScalarValue::Int32(Some(100)),
1638            ScalarValue::Int32(Some(101)),
1639            ScalarValue::Int32(Some(50)),
1640        ]);
1641        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1642        // 0 and 50 are in the first zone, 100 in the second, 101 is not present
1643        assert_eq!(result, SearchResult::at_most(0..=100));
1644
1645        // 10. IsIn query: [101, 102] (should match nothing)
1646        let query = SargableQuery::IsIn(vec![
1647            ScalarValue::Int32(Some(101)),
1648            ScalarValue::Int32(Some(102)),
1649        ]);
1650        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1651        assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1652
1653        // 11. IsIn query: [null] (should match nothing, as there are no nulls)
1654        let query = SargableQuery::IsIn(vec![ScalarValue::Int32(None)]);
1655        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1656        assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1657
1658        // 12. Equals query: null (should match nothing, as there are no nulls)
1659        let query = SargableQuery::Equals(ScalarValue::Int32(None));
1660        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1661        assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1662    }
1663
1664    #[tokio::test]
1665    // Test zonemap with same fragment from multiple batches
1666    async fn test_complex_zonemap_index() {
1667        let tmpdir = TempObjDir::default();
1668        let test_store = Arc::new(LanceIndexStore::new(
1669            Arc::new(ObjectStore::local()),
1670            tmpdir.clone(),
1671            Arc::new(LanceCache::no_cache()),
1672        ));
1673
1674        // Create data that will produce the expected zonemap zones
1675        let data =
1676            arrow_array::Int64Array::from_iter_values(0..(ROWS_PER_ZONE_DEFAULT * 2 + 42) as i64);
1677        let row_ids = UInt64Array::from_iter_values((0..data.len()).map(|i| i as u64));
1678        let schema = Arc::new(Schema::new(vec![
1679            Field::new(VALUE_COLUMN_NAME, DataType::Int64, false),
1680            Field::new(ROW_ADDR, DataType::UInt64, false),
1681        ]));
1682        let data =
1683            RecordBatch::try_new(schema.clone(), vec![Arc::new(data), Arc::new(row_ids)]).unwrap();
1684        let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1685            schema,
1686            stream::once(std::future::ready(Ok(data))),
1687        ));
1688
1689        ZoneMapIndexPlugin::train_zonemap_index(
1690            data_stream,
1691            test_store.as_ref(),
1692            Some(ZoneMapIndexBuilderParams::default()),
1693        )
1694        .await
1695        .unwrap();
1696
1697        log::debug!("Successfully wrote the index file");
1698
1699        // Read the index file back and check its contents
1700        let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1701            .await
1702            .expect("Failed to load ZoneMapIndex");
1703        assert_eq!(index.zones.len(), 3);
1704        assert_eq!(
1705            index.zones,
1706            vec![
1707                ZoneMapStatistics {
1708                    min: ScalarValue::Int64(Some(0)),
1709                    max: ScalarValue::Int64(Some(8191)),
1710                    null_count: 0,
1711                    nan_count: 0,
1712                    bound: ZoneBound {
1713                        fragment_id: 0,
1714                        start: 0,
1715                        length: 8192,
1716                    },
1717                },
1718                ZoneMapStatistics {
1719                    min: ScalarValue::Int64(Some(8192)),
1720                    max: ScalarValue::Int64(Some(16383)),
1721                    null_count: 0,
1722                    nan_count: 0,
1723                    bound: ZoneBound {
1724                        fragment_id: 0,
1725                        start: 8192,
1726                        length: 8192,
1727                    },
1728                },
1729                ZoneMapStatistics {
1730                    min: ScalarValue::Int64(Some(16384)),
1731                    max: ScalarValue::Int64(Some(16425)),
1732                    null_count: 0,
1733                    nan_count: 0,
1734                    bound: ZoneBound {
1735                        fragment_id: 0,
1736                        start: 16384,
1737                        length: 42,
1738                    },
1739                }
1740            ]
1741        );
1742        // Verify nan_count is 0 for all zones (no NaN values in integer data)
1743        for (i, zone) in index.zones.iter().enumerate() {
1744            assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
1745        }
1746
1747        assert_eq!(index.data_type, DataType::Int64);
1748        assert_eq!(index.rows_per_zone, ROWS_PER_ZONE_DEFAULT);
1749        assert_eq!(
1750            index.calculate_included_frags().await.unwrap(),
1751            RoaringBitmap::from_iter(0..1)
1752        );
1753
1754        // TODO: Test search functionality
1755        // Test search functionality
1756
1757        // Search for a value in the first zone
1758        let query = SargableQuery::Equals(ScalarValue::Int64(Some(1000)));
1759        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1760        // Should match row 1000 in fragment 0: row address = (0 << 32) + 1000 = 1000
1761        let mut expected = RowAddrTreeMap::new();
1762        expected.insert_range(0..=8191);
1763        assert_eq!(result, SearchResult::at_most(expected));
1764
1765        // Search for a value in the second zone
1766        let query = SargableQuery::Equals(ScalarValue::Int64(Some(9000)));
1767        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1768        // Should match row 9000 in fragment 0: row address = (0 << 32) + 9000 = 9000
1769        let mut expected = RowAddrTreeMap::new();
1770        expected.insert_range(8192..=16383);
1771        assert_eq!(result, SearchResult::at_most(expected));
1772
1773        // Search for a value not present in any zone
1774        let query = SargableQuery::Equals(ScalarValue::Int64(Some(20000)));
1775        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1776        assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
1777
1778        // Search for a range that spans multiple zones
1779        let query = SargableQuery::Range(
1780            Bound::Included(ScalarValue::Int64(Some(9000))),
1781            Bound::Included(ScalarValue::Int64(Some(16400))),
1782        );
1783        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
1784        // Should match all rows from 8000 to 16400 (inclusive)
1785        let mut expected = RowAddrTreeMap::new();
1786        expected.insert_range(8192..=16425);
1787        assert_eq!(result, SearchResult::at_most(expected));
1788    }
1789
1790    #[tokio::test]
1791    // Test zonemap with multiple fragments from different batches
1792    async fn test_multiple_fragments_zonemap() {
1793        let tmpdir = TempObjDir::default();
1794        let test_store = Arc::new(LanceIndexStore::new(
1795            Arc::new(ObjectStore::local()),
1796            tmpdir.clone(),
1797            Arc::new(LanceCache::no_cache()),
1798        ));
1799
1800        let schema = Arc::new(Schema::new(vec![
1801            Field::new(VALUE_COLUMN_NAME, DataType::Int64, false),
1802            Field::new(ROW_ADDR, DataType::UInt64, false),
1803        ]));
1804
1805        // Create multiple fragments with data that will produce expected zones
1806        // Fragment 0: values 0-8191 (first zone)
1807        let fragment0_data =
1808            arrow_array::Int64Array::from_iter_values(0..ROWS_PER_ZONE_DEFAULT as i64);
1809        let fragment0_row_ids = UInt64Array::from_iter_values(0..ROWS_PER_ZONE_DEFAULT);
1810        let fragment0_batch = RecordBatch::try_new(
1811            schema.clone(),
1812            vec![Arc::new(fragment0_data), Arc::new(fragment0_row_ids)],
1813        )
1814        .unwrap();
1815
1816        // Fragment 1: values 8192-16383 (second zone)
1817        let fragment1_data = arrow_array::Int64Array::from_iter_values(
1818            (ROWS_PER_ZONE_DEFAULT as i64)..((ROWS_PER_ZONE_DEFAULT * 2) as i64),
1819        );
1820        let fragment1_row_ids =
1821            UInt64Array::from_iter_values((0..ROWS_PER_ZONE_DEFAULT).map(|i| i + (1 << 32)));
1822        let fragment1_batch = RecordBatch::try_new(
1823            schema.clone(),
1824            vec![Arc::new(fragment1_data), Arc::new(fragment1_row_ids)],
1825        )
1826        .unwrap();
1827
1828        // Fragment 2: values 16384-16426 (third zone)
1829        let fragment2_data = arrow_array::Int64Array::from_iter_values(
1830            ((ROWS_PER_ZONE_DEFAULT * 2) as i64)..((ROWS_PER_ZONE_DEFAULT * 2 + 42) as i64),
1831        );
1832        let fragment2_row_ids =
1833            UInt64Array::from_iter_values((0..42).map(|i| (i as u64) + (2 << 32)));
1834        let fragment2_batch = RecordBatch::try_new(
1835            schema.clone(),
1836            vec![Arc::new(fragment2_data), Arc::new(fragment2_row_ids)],
1837        )
1838        .unwrap();
1839
1840        // Each fragment is broken into few batches
1841        {
1842            // Create a stream with multiple batches (fragments)
1843            let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
1844                schema.clone(),
1845                stream::iter(vec![
1846                    Ok(fragment0_batch.clone()),
1847                    Ok(fragment1_batch.clone()),
1848                    Ok(fragment2_batch.clone()),
1849                ]),
1850            ));
1851            ZoneMapIndexPlugin::train_zonemap_index(
1852                data_stream,
1853                test_store.as_ref(),
1854                Some(ZoneMapIndexBuilderParams::new(5000)),
1855            )
1856            .await
1857            .unwrap();
1858
1859            // Read the index file back and check its contents
1860            let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
1861                .await
1862                .expect("Failed to load ZoneMapIndex");
1863            assert_eq!(index.zones.len(), 5);
1864            assert_eq!(
1865                index.zones,
1866                vec![
1867                    ZoneMapStatistics {
1868                        min: ScalarValue::Int64(Some(0)),
1869                        max: ScalarValue::Int64(Some(4999)),
1870                        null_count: 0,
1871                        nan_count: 0,
1872                        bound: ZoneBound {
1873                            fragment_id: 0,
1874                            start: 0,
1875                            length: 5000,
1876                        },
1877                    },
1878                    ZoneMapStatistics {
1879                        min: ScalarValue::Int64(Some(5000)),
1880                        max: ScalarValue::Int64(Some(8191)),
1881                        null_count: 0,
1882                        nan_count: 0,
1883                        bound: ZoneBound {
1884                            fragment_id: 0,
1885                            start: 5000,
1886                            length: 3192,
1887                        },
1888                    },
1889                    ZoneMapStatistics {
1890                        min: ScalarValue::Int64(Some(8192)),
1891                        max: ScalarValue::Int64(Some(13191)),
1892                        null_count: 0,
1893                        nan_count: 0,
1894                        bound: ZoneBound {
1895                            fragment_id: 1,
1896                            start: 0,
1897                            length: 5000,
1898                        },
1899                    },
1900                    ZoneMapStatistics {
1901                        min: ScalarValue::Int64(Some(13192)),
1902                        max: ScalarValue::Int64(Some(16383)),
1903                        null_count: 0,
1904                        nan_count: 0,
1905                        bound: ZoneBound {
1906                            fragment_id: 1,
1907                            start: 5000,
1908                            length: 3192,
1909                        },
1910                    },
1911                    ZoneMapStatistics {
1912                        min: ScalarValue::Int64(Some(16384)),
1913                        max: ScalarValue::Int64(Some(16425)),
1914                        null_count: 0,
1915                        nan_count: 0,
1916                        bound: ZoneBound {
1917                            fragment_id: 2,
1918                            start: 0,
1919                            length: 42,
1920                        },
1921                    }
1922                ]
1923            );
1924            // Verify nan_count is 0 for all zones (no NaN values in integer data)
1925            for (i, zone) in index.zones.iter().enumerate() {
1926                assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
1927            }
1928
1929            assert_eq!(index.data_type, DataType::Int64);
1930            assert_eq!(index.rows_per_zone, 5000);
1931            assert_eq!(
1932                index.calculate_included_frags().await.unwrap(),
1933                RoaringBitmap::from_iter(0..3)
1934            );
1935
1936            // Verify _rowaddr column values are properly assigned
1937            let verify_data_stream: SendableRecordBatchStream =
1938                Box::pin(RecordBatchStreamAdapter::new(
1939                    schema.clone(),
1940                    stream::iter(vec![
1941                        Ok(fragment0_batch.clone()),
1942                        Ok(fragment1_batch.clone()),
1943                        Ok(fragment2_batch.clone()),
1944                    ]),
1945                ));
1946            let batches: Vec<RecordBatch> = verify_data_stream.try_collect().await.unwrap();
1947
1948            assert_eq!(batches.len(), 3);
1949
1950            // Check fragment 0 _rowaddr values (should start from 0)
1951            let fragment0_rowaddr_col = batches[0].column_by_name(ROW_ADDR).unwrap();
1952            let fragment0_rowaddrs = fragment0_rowaddr_col
1953                .as_any()
1954                .downcast_ref::<UInt64Array>()
1955                .unwrap();
1956            assert_eq!(
1957                fragment0_rowaddrs.values().len(),
1958                ROWS_PER_ZONE_DEFAULT as usize
1959            );
1960            assert_eq!(fragment0_rowaddrs.values()[0], 0);
1961            assert_eq!(
1962                fragment0_rowaddrs.values()[fragment0_rowaddrs.values().len() - 1],
1963                8191
1964            );
1965
1966            // Check fragment 1 _rowaddr values (should start from fragment_id=1)
1967            let fragment1_rowaddr_col = batches[1].column_by_name(ROW_ADDR).unwrap();
1968            let fragment1_rowaddrs = fragment1_rowaddr_col
1969                .as_any()
1970                .downcast_ref::<UInt64Array>()
1971                .unwrap();
1972            assert_eq!(
1973                fragment1_rowaddrs.values().len(),
1974                ROWS_PER_ZONE_DEFAULT as usize
1975            );
1976            assert_eq!(fragment1_rowaddrs.values()[0], 1u64 << 32); // fragment_id=1, local_offset=0
1977            assert_eq!(
1978                fragment1_rowaddrs.values()[fragment1_rowaddrs.values().len() - 1],
1979                8191 | (1u64 << 32)
1980            );
1981
1982            // Check fragment 2 _rowaddr values (should start from fragment_id=2)
1983            let fragment2_rowaddr_col = batches[2].column_by_name(ROW_ADDR).unwrap();
1984            let fragment2_rowaddrs = fragment2_rowaddr_col
1985                .as_any()
1986                .downcast_ref::<UInt64Array>()
1987                .unwrap();
1988            assert_eq!(fragment2_rowaddrs.values().len(), 42);
1989            assert_eq!(fragment2_rowaddrs.values()[0], 2u64 << 32); // fragment_id=2, local_offset=0
1990            assert_eq!(
1991                fragment2_rowaddrs.values()[fragment2_rowaddrs.values().len() - 1],
1992                (2u64 << 32) | 41
1993            );
1994
1995            // Add a few tests for search functionality
1996
1997            // Test range query that spans multiple fragments
1998            let query = SargableQuery::Range(
1999                Bound::Included(ScalarValue::Int64(Some(5000))),
2000                Bound::Included(ScalarValue::Int64(Some(12000))),
2001            );
2002            let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2003            // Should include zones from fragments 0 and 1 since they overlap with range 5000-12000
2004            let mut expected = RowAddrTreeMap::new();
2005            // zone 1
2006            expected.insert_range(5000..8192);
2007            // zone 2
2008            expected.insert_range((1u64 << 32)..((1u64 << 32) + 5000));
2009            assert_eq!(result, SearchResult::at_most(expected));
2010
2011            // Test exact match query from zone 2
2012            let query = SargableQuery::Equals(ScalarValue::Int64(Some(8192)));
2013            let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2014            // Should include zone 2 since it contains value 8192
2015            let mut expected = RowAddrTreeMap::new();
2016            expected.insert_range((1u64 << 32)..((1u64 << 32) + 5000));
2017            assert_eq!(result, SearchResult::at_most(expected));
2018
2019            // Test exact match query from zone 4
2020            let query = SargableQuery::Equals(ScalarValue::Int64(Some(16385)));
2021            let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2022            // Should include zone 4 since it contains value 16385
2023            let mut expected = RowAddrTreeMap::new();
2024            expected.insert_range(2u64 << 32..((2u64 << 32) + 42));
2025            assert_eq!(result, SearchResult::at_most(expected));
2026
2027            // Test query that matches nothing
2028            let query = SargableQuery::Equals(ScalarValue::Int64(Some(99999)));
2029            let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2030            assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
2031
2032            // Test is_in query
2033            let query = SargableQuery::IsIn(vec![ScalarValue::Int64(Some(16385))]);
2034            let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2035            let mut expected = RowAddrTreeMap::new();
2036            expected.insert_range(2u64 << 32..((2u64 << 32) + 42));
2037            assert_eq!(result, SearchResult::at_most(expected));
2038
2039            // Test equals query with null
2040            let query = SargableQuery::Equals(ScalarValue::Int64(None));
2041            let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2042            let mut expected = RowAddrTreeMap::new();
2043            expected.insert_range(0..=16425);
2044            // expected = {:?}", expected
2045            assert_eq!(result, SearchResult::at_most(RowAddrTreeMap::new()));
2046        }
2047
2048        //  Each fragment is its own batch
2049        {
2050            // Create a stream with multiple batches (fragments)
2051            let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
2052                schema.clone(),
2053                stream::iter(vec![
2054                    Ok(fragment0_batch.clone()),
2055                    Ok(fragment1_batch.clone()),
2056                    Ok(fragment2_batch.clone()),
2057                ]),
2058            ));
2059            ZoneMapIndexPlugin::train_zonemap_index(
2060                data_stream,
2061                test_store.as_ref(),
2062                Some(ZoneMapIndexBuilderParams::default()),
2063            )
2064            .await
2065            .unwrap();
2066
2067            // Read the index file back and check its contents
2068            let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2069                .await
2070                .expect("Failed to load ZoneMapIndex");
2071            assert_eq!(index.zones.len(), 3);
2072            assert_eq!(
2073                index.zones,
2074                vec![
2075                    ZoneMapStatistics {
2076                        min: ScalarValue::Int64(Some(0)),
2077                        max: ScalarValue::Int64(Some(8191)),
2078                        null_count: 0,
2079                        nan_count: 0,
2080                        bound: ZoneBound {
2081                            fragment_id: 0,
2082                            start: 0,
2083                            length: 8192,
2084                        },
2085                    },
2086                    ZoneMapStatistics {
2087                        min: ScalarValue::Int64(Some(8192)),
2088                        max: ScalarValue::Int64(Some(16383)),
2089                        null_count: 0,
2090                        nan_count: 0,
2091                        bound: ZoneBound {
2092                            fragment_id: 1,
2093                            start: 0,
2094                            length: 8192,
2095                        },
2096                    },
2097                    ZoneMapStatistics {
2098                        min: ScalarValue::Int64(Some(16384)),
2099                        max: ScalarValue::Int64(Some(16425)),
2100                        null_count: 0,
2101                        nan_count: 0,
2102                        bound: ZoneBound {
2103                            fragment_id: 2,
2104                            start: 0,
2105                            length: 42,
2106                        },
2107                    }
2108                ]
2109            );
2110            // Verify nan_count is 0 for all zones (no NaN values in integer data)
2111            for (i, zone) in index.zones.iter().enumerate() {
2112                assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
2113            }
2114
2115            assert_eq!(index.data_type, DataType::Int64);
2116            assert_eq!(index.rows_per_zone, ROWS_PER_ZONE_DEFAULT);
2117            assert_eq!(
2118                index.calculate_included_frags().await.unwrap(),
2119                RoaringBitmap::from_iter(0..3)
2120            );
2121        }
2122
2123        //  All fragments are in the same batch
2124        {
2125            // Create a stream with multiple batches (fragments)
2126            let data_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
2127                schema.clone(),
2128                stream::iter(vec![
2129                    Ok(fragment0_batch.clone()),
2130                    Ok(fragment1_batch.clone()),
2131                    Ok(fragment2_batch.clone()),
2132                ]),
2133            ));
2134            ZoneMapIndexPlugin::train_zonemap_index(
2135                data_stream,
2136                test_store.as_ref(),
2137                Some(ZoneMapIndexBuilderParams::new(ROWS_PER_ZONE_DEFAULT * 3)),
2138            )
2139            .await
2140            .unwrap();
2141
2142            // Read the index file back and check its contents
2143            let index = ZoneMapIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2144                .await
2145                .expect("Failed to load ZoneMapIndex");
2146            assert_eq!(index.zones.len(), 3);
2147            assert_eq!(
2148                index.zones,
2149                vec![
2150                    ZoneMapStatistics {
2151                        min: ScalarValue::Int64(Some(0)),
2152                        max: ScalarValue::Int64(Some(8191)),
2153                        null_count: 0,
2154                        nan_count: 0,
2155                        bound: ZoneBound {
2156                            fragment_id: 0,
2157                            start: 0,
2158                            length: 8192,
2159                        },
2160                    },
2161                    ZoneMapStatistics {
2162                        min: ScalarValue::Int64(Some(8192)),
2163                        max: ScalarValue::Int64(Some(16383)),
2164                        null_count: 0,
2165                        nan_count: 0,
2166                        bound: ZoneBound {
2167                            fragment_id: 1,
2168                            start: 0,
2169                            length: 8192,
2170                        },
2171                    },
2172                    ZoneMapStatistics {
2173                        min: ScalarValue::Int64(Some(16384)),
2174                        max: ScalarValue::Int64(Some(16425)),
2175                        null_count: 0,
2176                        nan_count: 0,
2177                        bound: ZoneBound {
2178                            fragment_id: 2,
2179                            start: 0,
2180                            length: 42,
2181                        },
2182                    }
2183                ]
2184            );
2185            // Verify nan_count is 0 for all zones (no NaN values in integer data)
2186            for (i, zone) in index.zones.iter().enumerate() {
2187                assert_eq!(zone.nan_count, 0, "Zone {} should have nan_count = 0", i);
2188            }
2189
2190            assert_eq!(index.data_type, DataType::Int64);
2191            assert_eq!(index.rows_per_zone, ROWS_PER_ZONE_DEFAULT * 3);
2192        }
2193    }
2194
2195    #[tokio::test]
2196    async fn test_fragment_id_assignment() {
2197        // Test that fragment IDs are properly assigned in _rowaddr values
2198        let schema = Arc::new(Schema::new(vec![Field::new(
2199            VALUE_COLUMN_NAME,
2200            DataType::Int32,
2201            false,
2202        )]));
2203
2204        // Create multiple fragments
2205        let fragment0_data = arrow_array::Int32Array::from_iter_values(0..5);
2206        let fragment0_batch =
2207            RecordBatch::try_new(schema.clone(), vec![Arc::new(fragment0_data)]).unwrap();
2208
2209        let fragment1_data = arrow_array::Int32Array::from_iter_values(5..10);
2210        let fragment1_batch =
2211            RecordBatch::try_new(schema.clone(), vec![Arc::new(fragment1_data)]).unwrap();
2212
2213        let aligned_stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
2214            schema,
2215            stream::iter(vec![Ok(fragment0_batch), Ok(fragment1_batch)]),
2216        ));
2217
2218        let aligned_stream = add_row_addr(aligned_stream);
2219
2220        let batches: Vec<RecordBatch> = aligned_stream.try_collect().await.unwrap();
2221
2222        assert_eq!(batches.len(), 2);
2223
2224        // Check fragment 0 _rowaddr values
2225        let fragment0_rowaddr_col = batches[0].column_by_name(ROW_ADDR).unwrap();
2226        let fragment0_rowaddrs = fragment0_rowaddr_col
2227            .as_any()
2228            .downcast_ref::<UInt64Array>()
2229            .unwrap();
2230
2231        // Fragment 0 should have _rowaddr values: 0, 1, 2, 3, 4
2232        assert_eq!(fragment0_rowaddrs.values(), &[0, 1, 2, 3, 4]);
2233
2234        // Check fragment 1 _rowaddr values
2235        let fragment1_rowaddr_col = batches[1].column_by_name(ROW_ADDR).unwrap();
2236        let fragment1_rowaddrs = fragment1_rowaddr_col
2237            .as_any()
2238            .downcast_ref::<UInt64Array>()
2239            .unwrap();
2240
2241        // Fragment 1 should have _rowaddr values: (1 << 32) | 0, (1 << 32) | 1, etc.
2242        // which is: 4294967296, 4294967297, 4294967298, 4294967299, 4294967300
2243        assert_eq!(
2244            fragment1_rowaddrs.values(),
2245            &[4294967296, 4294967297, 4294967298, 4294967299, 4294967300]
2246        );
2247    }
2248
2249    #[tokio::test]
2250    async fn test_like_prefix_query() {
2251        let tmpdir = TempObjDir::default();
2252        let test_store = Arc::new(LanceIndexStore::new(
2253            Arc::new(ObjectStore::local()),
2254            tmpdir.clone(),
2255            Arc::new(LanceCache::no_cache()),
2256        ));
2257
2258        // Create zones with different string ranges
2259        // Zone 0: ["aaa", "azz"] - should NOT match "foo%"
2260        // Zone 1: ["bar", "baz"] - should NOT match "foo%"
2261        // Zone 2: ["fa", "foz"]  - should match "foo%" (contains potential matches)
2262        // Zone 3: ["fop", "fzz"] - should NOT match "foo%" (all values >= "fop")
2263        // Zone 4: ["foo", "foobar"] - should match "foo%"
2264        // Zone 5: ["gaa", "gzz"] - should NOT match "foo%"
2265
2266        let zones = vec![
2267            ZoneMapStatistics {
2268                min: ScalarValue::Utf8(Some("aaa".to_string())),
2269                max: ScalarValue::Utf8(Some("azz".to_string())),
2270                null_count: 0,
2271                nan_count: 0,
2272                bound: ZoneBound {
2273                    fragment_id: 0,
2274                    start: 0,
2275                    length: 100,
2276                },
2277            },
2278            ZoneMapStatistics {
2279                min: ScalarValue::Utf8(Some("bar".to_string())),
2280                max: ScalarValue::Utf8(Some("baz".to_string())),
2281                null_count: 0,
2282                nan_count: 0,
2283                bound: ZoneBound {
2284                    fragment_id: 1,
2285                    start: 0,
2286                    length: 100,
2287                },
2288            },
2289            ZoneMapStatistics {
2290                min: ScalarValue::Utf8(Some("fa".to_string())),
2291                max: ScalarValue::Utf8(Some("foz".to_string())),
2292                null_count: 0,
2293                nan_count: 0,
2294                bound: ZoneBound {
2295                    fragment_id: 2,
2296                    start: 0,
2297                    length: 100,
2298                },
2299            },
2300            ZoneMapStatistics {
2301                min: ScalarValue::Utf8(Some("fop".to_string())),
2302                max: ScalarValue::Utf8(Some("fzz".to_string())),
2303                null_count: 0,
2304                nan_count: 0,
2305                bound: ZoneBound {
2306                    fragment_id: 3,
2307                    start: 0,
2308                    length: 100,
2309                },
2310            },
2311            ZoneMapStatistics {
2312                min: ScalarValue::Utf8(Some("foo".to_string())),
2313                max: ScalarValue::Utf8(Some("foobar".to_string())),
2314                null_count: 0,
2315                nan_count: 0,
2316                bound: ZoneBound {
2317                    fragment_id: 4,
2318                    start: 0,
2319                    length: 100,
2320                },
2321            },
2322            ZoneMapStatistics {
2323                min: ScalarValue::Utf8(Some("gaa".to_string())),
2324                max: ScalarValue::Utf8(Some("gzz".to_string())),
2325                null_count: 0,
2326                nan_count: 0,
2327                bound: ZoneBound {
2328                    fragment_id: 5,
2329                    start: 0,
2330                    length: 100,
2331                },
2332            },
2333        ];
2334
2335        let index = ZoneMapIndex {
2336            zones,
2337            data_type: DataType::Utf8,
2338            rows_per_zone: ROWS_PER_ZONE_DEFAULT,
2339            store: test_store,
2340            fri: None,
2341            index_cache: WeakLanceCache::from(&LanceCache::no_cache()),
2342        };
2343
2344        // Test LikePrefix query for "foo"
2345        let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("foo".to_string())));
2346        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2347
2348        // Should match zones 2 and 4 only
2349        let mut expected = RowAddrTreeMap::new();
2350        // Zone 2: fragment 2
2351        expected.insert_range((2u64 << 32)..((2u64 << 32) + 100));
2352        // Zone 4: fragment 4
2353        expected.insert_range((4u64 << 32)..((4u64 << 32) + 100));
2354
2355        assert_eq!(result, SearchResult::at_most(expected));
2356    }
2357
2358    #[tokio::test]
2359    async fn test_like_prefix_edge_cases() {
2360        let tmpdir = TempObjDir::default();
2361        let test_store = Arc::new(LanceIndexStore::new(
2362            Arc::new(ObjectStore::local()),
2363            tmpdir.clone(),
2364            Arc::new(LanceCache::no_cache()),
2365        ));
2366
2367        // Test edge cases for LIKE prefix
2368        let zones = vec![
2369            // Zone with values that contain the prefix exactly
2370            ZoneMapStatistics {
2371                min: ScalarValue::Utf8(Some("test".to_string())),
2372                max: ScalarValue::Utf8(Some("test".to_string())),
2373                null_count: 0,
2374                nan_count: 0,
2375                bound: ZoneBound {
2376                    fragment_id: 0,
2377                    start: 0,
2378                    length: 100,
2379                },
2380            },
2381            // Zone with values that span across the prefix boundary
2382            ZoneMapStatistics {
2383                min: ScalarValue::Utf8(Some("te".to_string())),
2384                max: ScalarValue::Utf8(Some("tf".to_string())),
2385                null_count: 0,
2386                nan_count: 0,
2387                bound: ZoneBound {
2388                    fragment_id: 1,
2389                    start: 0,
2390                    length: 100,
2391                },
2392            },
2393            // Zone completely before prefix
2394            ZoneMapStatistics {
2395                min: ScalarValue::Utf8(Some("abc".to_string())),
2396                max: ScalarValue::Utf8(Some("def".to_string())),
2397                null_count: 0,
2398                nan_count: 0,
2399                bound: ZoneBound {
2400                    fragment_id: 2,
2401                    start: 0,
2402                    length: 100,
2403                },
2404            },
2405        ];
2406
2407        let index = ZoneMapIndex {
2408            zones,
2409            data_type: DataType::Utf8,
2410            rows_per_zone: ROWS_PER_ZONE_DEFAULT,
2411            store: test_store,
2412            fri: None,
2413            index_cache: WeakLanceCache::from(&LanceCache::no_cache()),
2414        };
2415
2416        // Test LikePrefix "test"
2417        let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("test".to_string())));
2418        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2419
2420        // Should match zones 0 and 1
2421        let mut expected = RowAddrTreeMap::new();
2422        expected.insert_range(0..100); // Zone 0: fragment 0
2423        expected.insert_range((1u64 << 32)..((1u64 << 32) + 100));
2424
2425        assert_eq!(result, SearchResult::at_most(expected));
2426
2427        // Test empty prefix - should match all zones
2428        let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("".to_string())));
2429        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2430
2431        let mut expected = RowAddrTreeMap::new();
2432        expected.insert_range(0..100); // Zone 0: fragment 0
2433        expected.insert_range((1u64 << 32)..((1u64 << 32) + 100));
2434        expected.insert_range((2u64 << 32)..((2u64 << 32) + 100));
2435
2436        assert_eq!(result, SearchResult::at_most(expected));
2437    }
2438
2439    #[tokio::test]
2440    async fn test_like_prefix_large_utf8() {
2441        let tmpdir = TempObjDir::default();
2442        let test_store = Arc::new(LanceIndexStore::new(
2443            Arc::new(ObjectStore::local()),
2444            tmpdir.clone(),
2445            Arc::new(LanceCache::no_cache()),
2446        ));
2447
2448        // Test with LargeUtf8 type
2449        let zones = vec![
2450            ZoneMapStatistics {
2451                min: ScalarValue::LargeUtf8(Some("aaa".to_string())),
2452                max: ScalarValue::LargeUtf8(Some("azz".to_string())),
2453                null_count: 0,
2454                nan_count: 0,
2455                bound: ZoneBound {
2456                    fragment_id: 0,
2457                    start: 0,
2458                    length: 100,
2459                },
2460            },
2461            ZoneMapStatistics {
2462                min: ScalarValue::LargeUtf8(Some("foo".to_string())),
2463                max: ScalarValue::LargeUtf8(Some("foobar".to_string())),
2464                null_count: 0,
2465                nan_count: 0,
2466                bound: ZoneBound {
2467                    fragment_id: 1,
2468                    start: 0,
2469                    length: 100,
2470                },
2471            },
2472        ];
2473
2474        let index = ZoneMapIndex {
2475            zones,
2476            data_type: DataType::LargeUtf8,
2477            rows_per_zone: ROWS_PER_ZONE_DEFAULT,
2478            store: test_store,
2479            fri: None,
2480            index_cache: WeakLanceCache::from(&LanceCache::no_cache()),
2481        };
2482
2483        // Test LikePrefix with LargeUtf8
2484        let query = SargableQuery::LikePrefix(ScalarValue::LargeUtf8(Some("foo".to_string())));
2485        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2486
2487        // Should match only zone 1
2488        let mut expected = RowAddrTreeMap::new();
2489        expected.insert_range((1u64 << 32)..((1u64 << 32) + 100));
2490
2491        assert_eq!(result, SearchResult::at_most(expected));
2492    }
2493
2494    #[test]
2495    fn test_compute_next_prefix() {
2496        use super::compute_next_prefix;
2497
2498        // Basic cases
2499        assert_eq!(compute_next_prefix("foo"), Some("fop".to_string()));
2500        assert_eq!(compute_next_prefix("abc"), Some("abd".to_string()));
2501        assert_eq!(compute_next_prefix("a"), Some("b".to_string()));
2502        assert_eq!(compute_next_prefix("z"), Some("{".to_string())); // 'z' + 1 = '{'
2503
2504        // Edge case: prefix with 'z' at the end
2505        assert_eq!(compute_next_prefix("abz"), Some("ab{".to_string()));
2506
2507        // Edge case with tilde (~) which is 0x7E
2508        assert_eq!(compute_next_prefix("ab~"), Some("ab\x7f".to_string()));
2509
2510        // Empty prefix
2511        assert_eq!(compute_next_prefix(""), None);
2512
2513        // Non-ASCII: works correctly by incrementing Unicode code points
2514        // é (U+00E9) -> ê (U+00EA)
2515        assert_eq!(compute_next_prefix("café"), Some("cafê".to_string()));
2516        // 中 (U+4E2D) -> 丮 (U+4E2E)
2517        assert_eq!(compute_next_prefix("abc中"), Some("abc丮".to_string()));
2518        // ÿ (U+00FF) -> Ā (U+0100) - crosses byte boundary but works
2519        assert_eq!(compute_next_prefix("cafÿ"), Some("cafĀ".to_string()));
2520
2521        // Edge case: character just before surrogate range
2522        // U+D7FF -> U+E000 (skips surrogate range U+D800-U+DFFF)
2523        assert_eq!(
2524            compute_next_prefix("a\u{D7FF}"),
2525            Some("a\u{E000}".to_string())
2526        );
2527
2528        // Edge case: max Unicode character U+10FFFF, falls back to previous char
2529        assert_eq!(compute_next_prefix("ab\u{10FFFF}"), Some("ac".to_string()));
2530        // All max characters
2531        assert_eq!(compute_next_prefix("\u{10FFFF}\u{10FFFF}"), None);
2532    }
2533}