lance_index/scalar/
btree.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    cmp::Ordering,
7    collections::{BTreeMap, BinaryHeap, HashMap},
8    fmt::{Debug, Display},
9    ops::Bound,
10    sync::Arc,
11};
12
13use super::{
14    flat::FlatIndexMetadata, AnyQuery, BuiltinIndexType, IndexReader, IndexStore, IndexWriter,
15    MetricsCollector, SargableQuery, ScalarIndex, ScalarIndexParams, SearchResult,
16};
17use crate::pbold;
18use crate::{
19    frag_reuse::FragReuseIndex,
20    scalar::{
21        expression::{SargableQueryParser, ScalarQueryParser},
22        registry::{ScalarIndexPlugin, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME},
23        CreatedIndex, UpdateCriteria,
24    },
25};
26use crate::{metrics::NoOpMetricsCollector, scalar::registry::TrainingCriteria};
27use crate::{Index, IndexType};
28use arrow_array::{new_empty_array, Array, RecordBatch, UInt32Array};
29use arrow_schema::{DataType, Field, Schema, SortOptions};
30use async_trait::async_trait;
31use datafusion::physical_plan::{
32    sorts::sort_preserving_merge::SortPreservingMergeExec, stream::RecordBatchStreamAdapter,
33    union::UnionExec, ExecutionPlan, SendableRecordBatchStream,
34};
35use datafusion_common::{DataFusionError, ScalarValue};
36use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
37use deepsize::DeepSizeOf;
38use futures::{
39    future::BoxFuture,
40    stream::{self},
41    FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
42};
43use lance_core::{
44    cache::{CacheKey, LanceCache, WeakLanceCache},
45    error::LanceOptionExt,
46    utils::{
47        mask::RowIdTreeMap,
48        tokio::get_num_compute_intensive_cpus,
49        tracing::{IO_TYPE_LOAD_SCALAR_PART, TRACE_IO_EVENTS},
50    },
51    Error, Result, ROW_ID,
52};
53use lance_datafusion::{
54    chunker::chunk_concat_stream,
55    exec::{execute_plan, LanceExecutionOptions, OneShotExec},
56};
57use lance_io::object_store::ObjectStore;
58use log::{debug, warn};
59use object_store::path::Path;
60use roaring::RoaringBitmap;
61use serde::{Deserialize, Serialize, Serializer};
62use snafu::location;
63use tracing::info;
64
65const BTREE_LOOKUP_NAME: &str = "page_lookup.lance";
66const BTREE_PAGES_NAME: &str = "page_data.lance";
67pub const DEFAULT_BTREE_BATCH_SIZE: u64 = 4096;
68const BATCH_SIZE_META_KEY: &str = "batch_size";
69const BTREE_INDEX_VERSION: u32 = 0;
70pub(crate) const BTREE_VALUES_COLUMN: &str = "values";
71pub(crate) const BTREE_IDS_COLUMN: &str = "ids";
72
73/// Wraps a ScalarValue and implements Ord (ScalarValue only implements PartialOrd)
74#[derive(Clone, Debug)]
75pub struct OrderableScalarValue(pub ScalarValue);
76
77impl DeepSizeOf for OrderableScalarValue {
78    fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
79        // deepsize and size both factor in the size of the ScalarValue
80        self.0.size() - std::mem::size_of::<ScalarValue>()
81    }
82}
83
84impl Display for OrderableScalarValue {
85    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86        std::fmt::Display::fmt(&self.0, f)
87    }
88}
89
90impl PartialEq for OrderableScalarValue {
91    fn eq(&self, other: &Self) -> bool {
92        self.0.eq(&other.0)
93    }
94}
95
96impl Eq for OrderableScalarValue {}
97
98impl PartialOrd for OrderableScalarValue {
99    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
100        Some(self.cmp(other))
101    }
102}
103
104// manual implementation of `Ord` that panics when asked to compare scalars of different type
105// and always puts nulls before non-nulls (this is consistent with Option<T>'s implementation
106// of Ord)
107//
108// TODO: Consider upstreaming this
109impl Ord for OrderableScalarValue {
110    fn cmp(&self, other: &Self) -> Ordering {
111        use ScalarValue::*;
112        // This purposely doesn't have a catch-all "(_, _)" so that
113        // any newly added enum variant will require editing this list
114        // or else face a compile error
115        match (&self.0, &other.0) {
116            (Decimal128(v1, p1, s1), Decimal128(v2, p2, s2)) => {
117                if p1.eq(p2) && s1.eq(s2) {
118                    v1.cmp(v2)
119                } else {
120                    // Two decimal values can only be compared if they have the same precision and scale.
121                    panic!("Attempt to compare decimals with unequal precision / scale")
122                }
123            }
124            (Decimal128(v1, _, _), Null) => {
125                if v1.is_none() {
126                    Ordering::Equal
127                } else {
128                    Ordering::Greater
129                }
130            }
131            (Decimal128(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
132            (Decimal256(v1, p1, s1), Decimal256(v2, p2, s2)) => {
133                if p1.eq(p2) && s1.eq(s2) {
134                    v1.cmp(v2)
135                } else {
136                    // Two decimal values can only be compared if they have the same precision and scale.
137                    panic!("Attempt to compare decimals with unequal precision / scale")
138                }
139            }
140            (Decimal256(v1, _, _), Null) => {
141                if v1.is_none() {
142                    Ordering::Equal
143                } else {
144                    Ordering::Greater
145                }
146            }
147            (Decimal256(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
148            (Boolean(v1), Boolean(v2)) => v1.cmp(v2),
149            (Boolean(v1), Null) => {
150                if v1.is_none() {
151                    Ordering::Equal
152                } else {
153                    Ordering::Greater
154                }
155            }
156            (Boolean(_), _) => panic!("Attempt to compare boolean with non-boolean"),
157            (Float32(v1), Float32(v2)) => match (v1, v2) {
158                (Some(f1), Some(f2)) => f1.total_cmp(f2),
159                (None, Some(_)) => Ordering::Less,
160                (Some(_), None) => Ordering::Greater,
161                (None, None) => Ordering::Equal,
162            },
163            (Float32(v1), Null) => {
164                if v1.is_none() {
165                    Ordering::Equal
166                } else {
167                    Ordering::Greater
168                }
169            }
170            (Float32(_), _) => panic!("Attempt to compare f32 with non-f32"),
171            (Float64(v1), Float64(v2)) => match (v1, v2) {
172                (Some(f1), Some(f2)) => f1.total_cmp(f2),
173                (None, Some(_)) => Ordering::Less,
174                (Some(_), None) => Ordering::Greater,
175                (None, None) => Ordering::Equal,
176            },
177            (Float64(v1), Null) => {
178                if v1.is_none() {
179                    Ordering::Equal
180                } else {
181                    Ordering::Greater
182                }
183            }
184            (Float64(_), _) => panic!("Attempt to compare f64 with non-f64"),
185            (Float16(v1), Float16(v2)) => match (v1, v2) {
186                (Some(f1), Some(f2)) => f1.total_cmp(f2),
187                (None, Some(_)) => Ordering::Less,
188                (Some(_), None) => Ordering::Greater,
189                (None, None) => Ordering::Equal,
190            },
191            (Float16(v1), Null) => {
192                if v1.is_none() {
193                    Ordering::Equal
194                } else {
195                    Ordering::Greater
196                }
197            }
198            (Float16(_), _) => panic!("Attempt to compare f16 with non-f16"),
199            (Int8(v1), Int8(v2)) => v1.cmp(v2),
200            (Int8(v1), Null) => {
201                if v1.is_none() {
202                    Ordering::Equal
203                } else {
204                    Ordering::Greater
205                }
206            }
207            (Int8(_), _) => panic!("Attempt to compare Int8 with non-Int8"),
208            (Int16(v1), Int16(v2)) => v1.cmp(v2),
209            (Int16(v1), Null) => {
210                if v1.is_none() {
211                    Ordering::Equal
212                } else {
213                    Ordering::Greater
214                }
215            }
216            (Int16(_), _) => panic!("Attempt to compare Int16 with non-Int16"),
217            (Int32(v1), Int32(v2)) => v1.cmp(v2),
218            (Int32(v1), Null) => {
219                if v1.is_none() {
220                    Ordering::Equal
221                } else {
222                    Ordering::Greater
223                }
224            }
225            (Int32(_), _) => panic!("Attempt to compare Int32 with non-Int32"),
226            (Int64(v1), Int64(v2)) => v1.cmp(v2),
227            (Int64(v1), Null) => {
228                if v1.is_none() {
229                    Ordering::Equal
230                } else {
231                    Ordering::Greater
232                }
233            }
234            (Int64(_), _) => panic!("Attempt to compare Int16 with non-Int64"),
235            (UInt8(v1), UInt8(v2)) => v1.cmp(v2),
236            (UInt8(v1), Null) => {
237                if v1.is_none() {
238                    Ordering::Equal
239                } else {
240                    Ordering::Greater
241                }
242            }
243            (UInt8(_), _) => panic!("Attempt to compare UInt8 with non-UInt8"),
244            (UInt16(v1), UInt16(v2)) => v1.cmp(v2),
245            (UInt16(v1), Null) => {
246                if v1.is_none() {
247                    Ordering::Equal
248                } else {
249                    Ordering::Greater
250                }
251            }
252            (UInt16(_), _) => panic!("Attempt to compare UInt16 with non-UInt16"),
253            (UInt32(v1), UInt32(v2)) => v1.cmp(v2),
254            (UInt32(v1), Null) => {
255                if v1.is_none() {
256                    Ordering::Equal
257                } else {
258                    Ordering::Greater
259                }
260            }
261            (UInt32(_), _) => panic!("Attempt to compare UInt32 with non-UInt32"),
262            (UInt64(v1), UInt64(v2)) => v1.cmp(v2),
263            (UInt64(v1), Null) => {
264                if v1.is_none() {
265                    Ordering::Equal
266                } else {
267                    Ordering::Greater
268                }
269            }
270            (UInt64(_), _) => panic!("Attempt to compare Int16 with non-UInt64"),
271            (Utf8(v1) | Utf8View(v1) | LargeUtf8(v1), Utf8(v2) | Utf8View(v2) | LargeUtf8(v2)) => {
272                v1.cmp(v2)
273            }
274            (Utf8(v1) | Utf8View(v1) | LargeUtf8(v1), Null) => {
275                if v1.is_none() {
276                    Ordering::Equal
277                } else {
278                    Ordering::Greater
279                }
280            }
281            (Utf8(_) | Utf8View(_) | LargeUtf8(_), _) => {
282                panic!("Attempt to compare Utf8 with non-Utf8")
283            }
284            (
285                Binary(v1) | LargeBinary(v1) | BinaryView(v1),
286                Binary(v2) | LargeBinary(v2) | BinaryView(v2),
287            ) => v1.cmp(v2),
288            (Binary(v1) | LargeBinary(v1) | BinaryView(v1), Null) => {
289                if v1.is_none() {
290                    Ordering::Equal
291                } else {
292                    Ordering::Greater
293                }
294            }
295            (Binary(_) | LargeBinary(_) | BinaryView(_), _) => {
296                panic!("Attempt to compare Binary with non-Binary")
297            }
298            (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.cmp(v2),
299            (FixedSizeBinary(_, v1), Null) => {
300                if v1.is_none() {
301                    Ordering::Equal
302                } else {
303                    Ordering::Greater
304                }
305            }
306            (FixedSizeBinary(_, _), _) => {
307                panic!("Attempt to compare FixedSizeBinary with non-FixedSizeBinary")
308            }
309            (FixedSizeList(left), FixedSizeList(right)) => {
310                if left.eq(right) {
311                    todo!()
312                } else {
313                    panic!(
314                        "Attempt to compare fixed size list elements with different widths/fields"
315                    )
316                }
317            }
318            (FixedSizeList(left), Null) => {
319                if left.is_null(0) {
320                    Ordering::Equal
321                } else {
322                    Ordering::Greater
323                }
324            }
325            (FixedSizeList(_), _) => {
326                panic!("Attempt to compare FixedSizeList with non-FixedSizeList")
327            }
328            (List(_), List(_)) => todo!(),
329            (List(left), Null) => {
330                if left.is_null(0) {
331                    Ordering::Equal
332                } else {
333                    Ordering::Greater
334                }
335            }
336            (List(_), _) => {
337                panic!("Attempt to compare List with non-List")
338            }
339            (LargeList(_), _) => todo!(),
340            (Map(_), Map(_)) => todo!(),
341            (Map(left), Null) => {
342                if left.is_null(0) {
343                    Ordering::Equal
344                } else {
345                    Ordering::Greater
346                }
347            }
348            (Map(_), _) => {
349                panic!("Attempt to compare Map with non-Map")
350            }
351            (Date32(v1), Date32(v2)) => v1.cmp(v2),
352            (Date32(v1), Null) => {
353                if v1.is_none() {
354                    Ordering::Equal
355                } else {
356                    Ordering::Greater
357                }
358            }
359            (Date32(_), _) => panic!("Attempt to compare Date32 with non-Date32"),
360            (Date64(v1), Date64(v2)) => v1.cmp(v2),
361            (Date64(v1), Null) => {
362                if v1.is_none() {
363                    Ordering::Equal
364                } else {
365                    Ordering::Greater
366                }
367            }
368            (Date64(_), _) => panic!("Attempt to compare Date64 with non-Date64"),
369            (Time32Second(v1), Time32Second(v2)) => v1.cmp(v2),
370            (Time32Second(v1), Null) => {
371                if v1.is_none() {
372                    Ordering::Equal
373                } else {
374                    Ordering::Greater
375                }
376            }
377            (Time32Second(_), _) => panic!("Attempt to compare Time32Second with non-Time32Second"),
378            (Time32Millisecond(v1), Time32Millisecond(v2)) => v1.cmp(v2),
379            (Time32Millisecond(v1), Null) => {
380                if v1.is_none() {
381                    Ordering::Equal
382                } else {
383                    Ordering::Greater
384                }
385            }
386            (Time32Millisecond(_), _) => {
387                panic!("Attempt to compare Time32Millisecond with non-Time32Millisecond")
388            }
389            (Time64Microsecond(v1), Time64Microsecond(v2)) => v1.cmp(v2),
390            (Time64Microsecond(v1), Null) => {
391                if v1.is_none() {
392                    Ordering::Equal
393                } else {
394                    Ordering::Greater
395                }
396            }
397            (Time64Microsecond(_), _) => {
398                panic!("Attempt to compare Time64Microsecond with non-Time64Microsecond")
399            }
400            (Time64Nanosecond(v1), Time64Nanosecond(v2)) => v1.cmp(v2),
401            (Time64Nanosecond(v1), Null) => {
402                if v1.is_none() {
403                    Ordering::Equal
404                } else {
405                    Ordering::Greater
406                }
407            }
408            (Time64Nanosecond(_), _) => {
409                panic!("Attempt to compare Time64Nanosecond with non-Time64Nanosecond")
410            }
411            (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.cmp(v2),
412            (TimestampSecond(v1, _), Null) => {
413                if v1.is_none() {
414                    Ordering::Equal
415                } else {
416                    Ordering::Greater
417                }
418            }
419            (TimestampSecond(_, _), _) => {
420                panic!("Attempt to compare TimestampSecond with non-TimestampSecond")
421            }
422            (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.cmp(v2),
423            (TimestampMillisecond(v1, _), Null) => {
424                if v1.is_none() {
425                    Ordering::Equal
426                } else {
427                    Ordering::Greater
428                }
429            }
430            (TimestampMillisecond(_, _), _) => {
431                panic!("Attempt to compare TimestampMillisecond with non-TimestampMillisecond")
432            }
433            (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => v1.cmp(v2),
434            (TimestampMicrosecond(v1, _), Null) => {
435                if v1.is_none() {
436                    Ordering::Equal
437                } else {
438                    Ordering::Greater
439                }
440            }
441            (TimestampMicrosecond(_, _), _) => {
442                panic!("Attempt to compare TimestampMicrosecond with non-TimestampMicrosecond")
443            }
444            (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => v1.cmp(v2),
445            (TimestampNanosecond(v1, _), Null) => {
446                if v1.is_none() {
447                    Ordering::Equal
448                } else {
449                    Ordering::Greater
450                }
451            }
452            (TimestampNanosecond(_, _), _) => {
453                panic!("Attempt to compare TimestampNanosecond with non-TimestampNanosecond")
454            }
455            (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.cmp(v2),
456            (IntervalYearMonth(v1), Null) => {
457                if v1.is_none() {
458                    Ordering::Equal
459                } else {
460                    Ordering::Greater
461                }
462            }
463            (IntervalYearMonth(_), _) => {
464                panic!("Attempt to compare IntervalYearMonth with non-IntervalYearMonth")
465            }
466            (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.cmp(v2),
467            (IntervalDayTime(v1), Null) => {
468                if v1.is_none() {
469                    Ordering::Equal
470                } else {
471                    Ordering::Greater
472                }
473            }
474            (IntervalDayTime(_), _) => {
475                panic!("Attempt to compare IntervalDayTime with non-IntervalDayTime")
476            }
477            (IntervalMonthDayNano(v1), IntervalMonthDayNano(v2)) => v1.cmp(v2),
478            (IntervalMonthDayNano(v1), Null) => {
479                if v1.is_none() {
480                    Ordering::Equal
481                } else {
482                    Ordering::Greater
483                }
484            }
485            (IntervalMonthDayNano(_), _) => {
486                panic!("Attempt to compare IntervalMonthDayNano with non-IntervalMonthDayNano")
487            }
488            (DurationSecond(v1), DurationSecond(v2)) => v1.cmp(v2),
489            (DurationSecond(v1), Null) => {
490                if v1.is_none() {
491                    Ordering::Equal
492                } else {
493                    Ordering::Greater
494                }
495            }
496            (DurationSecond(_), _) => {
497                panic!("Attempt to compare DurationSecond with non-DurationSecond")
498            }
499            (DurationMillisecond(v1), DurationMillisecond(v2)) => v1.cmp(v2),
500            (DurationMillisecond(v1), Null) => {
501                if v1.is_none() {
502                    Ordering::Equal
503                } else {
504                    Ordering::Greater
505                }
506            }
507            (DurationMillisecond(_), _) => {
508                panic!("Attempt to compare DurationMillisecond with non-DurationMillisecond")
509            }
510            (DurationMicrosecond(v1), DurationMicrosecond(v2)) => v1.cmp(v2),
511            (DurationMicrosecond(v1), Null) => {
512                if v1.is_none() {
513                    Ordering::Equal
514                } else {
515                    Ordering::Greater
516                }
517            }
518            (DurationMicrosecond(_), _) => {
519                panic!("Attempt to compare DurationMicrosecond with non-DurationMicrosecond")
520            }
521            (DurationNanosecond(v1), DurationNanosecond(v2)) => v1.cmp(v2),
522            (DurationNanosecond(v1), Null) => {
523                if v1.is_none() {
524                    Ordering::Equal
525                } else {
526                    Ordering::Greater
527                }
528            }
529            (DurationNanosecond(_), _) => {
530                panic!("Attempt to compare DurationNanosecond with non-DurationNanosecond")
531            }
532            (Struct(_arr), Struct(_arr2)) => todo!(),
533            (Struct(arr), Null) => {
534                if arr.is_empty() {
535                    Ordering::Equal
536                } else {
537                    Ordering::Greater
538                }
539            }
540            (Struct(_arr), _) => panic!("Attempt to compare Struct with non-Struct"),
541            (Dictionary(_k1, _v1), Dictionary(_k2, _v2)) => todo!(),
542            (Dictionary(_, v1), Null) => Self(*v1.clone()).cmp(&Self(ScalarValue::Null)),
543            (Dictionary(_, _), _) => panic!("Attempt to compare Dictionary with non-Dictionary"),
544            // What would a btree of unions even look like?  May not be possible.
545            (Union(_, _, _), _) => todo!("Support for union scalars"),
546            (Null, Null) => Ordering::Equal,
547            (Null, _) => todo!(),
548        }
549    }
550}
551
552#[derive(Debug, DeepSizeOf, PartialEq, Eq)]
553struct PageRecord {
554    max: OrderableScalarValue,
555    page_number: u32,
556}
557
558trait BTreeMapExt<K, V> {
559    fn largest_node_less(&self, key: &K) -> Option<(&K, &V)>;
560}
561
562impl<K: Ord, V> BTreeMapExt<K, V> for BTreeMap<K, V> {
563    fn largest_node_less(&self, key: &K) -> Option<(&K, &V)> {
564        self.range((Bound::Unbounded, Bound::Excluded(key)))
565            .next_back()
566    }
567}
568
569/// An in-memory structure that can quickly satisfy scalar queries using a btree of ScalarValue
570#[derive(Debug, DeepSizeOf, PartialEq, Eq)]
571pub struct BTreeLookup {
572    tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>,
573    /// Pages where the value may be null
574    null_pages: Vec<u32>,
575}
576
577impl BTreeLookup {
578    fn new(tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>, null_pages: Vec<u32>) -> Self {
579        Self { tree, null_pages }
580    }
581
582    // All pages that could have a value equal to val
583    fn pages_eq(&self, query: &OrderableScalarValue) -> Vec<u32> {
584        if query.0.is_null() {
585            self.pages_null()
586        } else {
587            self.pages_between((Bound::Included(query), Bound::Excluded(query)))
588        }
589    }
590
591    // All pages that could have a value equal to one of the values
592    fn pages_in(&self, values: impl IntoIterator<Item = OrderableScalarValue>) -> Vec<u32> {
593        let page_lists = values
594            .into_iter()
595            .map(|val| self.pages_eq(&val))
596            .collect::<Vec<_>>();
597        let total_size = page_lists.iter().map(|set| set.len()).sum();
598        let mut heap = BinaryHeap::with_capacity(total_size);
599        for page_list in page_lists {
600            heap.extend(page_list);
601        }
602        let mut all_pages = heap.into_sorted_vec();
603        all_pages.dedup();
604        all_pages
605    }
606
607    // All pages that could have a value in the range
608    fn pages_between(
609        &self,
610        range: (Bound<&OrderableScalarValue>, Bound<&OrderableScalarValue>),
611    ) -> Vec<u32> {
612        // We need to grab a little bit left of the given range because the query might be 7
613        // and the first page might be something like 5-10.
614        let lower_bound = match range.0 {
615            Bound::Unbounded => Bound::Unbounded,
616            // It doesn't matter if the bound is exclusive or inclusive.  We are going to grab
617            // the first node whose min is strictly less than the given bound.  Then we grab
618            // all nodes greater than or equal to that
619            //
620            // We have to peek a bit to the left because we might have something like a lower
621            // bound of 7 and there is a page [5-10] we want to search for.
622            Bound::Included(lower) => self
623                .tree
624                .largest_node_less(lower)
625                .map(|val| Bound::Included(val.0))
626                .unwrap_or(Bound::Unbounded),
627            Bound::Excluded(lower) => self
628                .tree
629                .largest_node_less(lower)
630                .map(|val| Bound::Included(val.0))
631                .unwrap_or(Bound::Unbounded),
632        };
633        let upper_bound = match range.1 {
634            Bound::Unbounded => Bound::Unbounded,
635            Bound::Included(upper) => Bound::Included(upper),
636            // Even if the upper bound is excluded we need to include it on an [x, x) query.  This is because the
637            // query might be [x, x).  Our lower bound might find some [a-x] bucket and we still
638            // want to include any [x, z] bucket.
639            //
640            // We could be slightly more accurate here and only include the upper bound if the lower bound
641            // is defined, inclusive, and equal to the upper bound.  However, let's keep it simple for now.  This
642            // should only affect the probably rare case that our query is a true range query and the value
643            // matches an upper bound.  This will all be moot if/when we merge pages.
644            Bound::Excluded(upper) => Bound::Included(upper),
645        };
646
647        match (lower_bound, upper_bound) {
648            (Bound::Excluded(lower), Bound::Excluded(upper))
649            | (Bound::Excluded(lower), Bound::Included(upper))
650            | (Bound::Included(lower), Bound::Excluded(upper)) => {
651                // It's not really clear what (Included(5), Excluded(5)) would mean so we
652                // interpret it as an empty range which matches rust's BTreeMap behavior
653                if lower >= upper {
654                    return vec![];
655                }
656            }
657            (Bound::Included(lower), Bound::Included(upper)) => {
658                if lower > upper {
659                    return vec![];
660                }
661            }
662            _ => {}
663        }
664
665        let candidates = self
666            .tree
667            .range((lower_bound, upper_bound))
668            .flat_map(|val| val.1);
669        match lower_bound {
670            Bound::Unbounded => candidates.map(|val| val.page_number).collect(),
671            Bound::Included(lower_bound) => candidates
672                .filter(|val| val.max.cmp(lower_bound) != Ordering::Less)
673                .map(|val| val.page_number)
674                .collect(),
675            Bound::Excluded(lower_bound) => candidates
676                .filter(|val| val.max.cmp(lower_bound) == Ordering::Greater)
677                .map(|val| val.page_number)
678                .collect(),
679        }
680    }
681
682    fn pages_null(&self) -> Vec<u32> {
683        self.null_pages.clone()
684    }
685}
686
687// We only need to open a file reader for pages if we need to load a page.  If all
688// pages are cached we don't open it.  If we do open it we should only open it once.
689#[derive(Clone)]
690struct LazyIndexReader {
691    index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
692    store: Arc<dyn IndexStore>,
693}
694
695impl LazyIndexReader {
696    fn new(store: Arc<dyn IndexStore>) -> Self {
697        Self {
698            index_reader: Arc::new(tokio::sync::Mutex::new(None)),
699            store,
700        }
701    }
702
703    async fn get(&self) -> Result<Arc<dyn IndexReader>> {
704        let mut reader = self.index_reader.lock().await;
705        if reader.is_none() {
706            let index_reader = self.store.open_index_file(BTREE_PAGES_NAME).await?;
707            *reader = Some(index_reader);
708        }
709        Ok(reader.as_ref().unwrap().clone())
710    }
711}
712
713/// A btree index satisfies scalar queries using a b tree
714///
715/// The upper layers of the btree are expected to be cached and, when unloaded,
716/// are stored in a btree structure in memory.  The leaves of the btree are left
717/// to be searched by some other kind of index (currently a flat search).
718///
719/// This strikes a balance between an expensive memory structure containing all
720/// of the values and an expensive disk structure that can't be efficiently searched.
721///
722/// For example, given 1Bi values we can store 256Ki leaves of size 4Ki.  We only
723/// need memory space for 256Ki leaves (depends on the data type but usually a few MiB
724/// at most) and can narrow our search to 4Ki values.
725///
726// Cache key implementation for type-safe cache access
727#[derive(Debug, Clone, DeepSizeOf)]
728pub struct CachedScalarIndex(Arc<dyn ScalarIndex>);
729
730impl CachedScalarIndex {
731    pub fn new(index: Arc<dyn ScalarIndex>) -> Self {
732        Self(index)
733    }
734
735    pub fn into_inner(self) -> Arc<dyn ScalarIndex> {
736        self.0
737    }
738}
739
740#[derive(Debug, Clone)]
741pub struct BTreePageKey {
742    pub page_number: u32,
743}
744
745impl CacheKey for BTreePageKey {
746    type ValueType = CachedScalarIndex;
747
748    fn key(&self) -> std::borrow::Cow<'_, str> {
749        format!("page-{}", self.page_number).into()
750    }
751}
752
753/// Note: this is very similar to the IVF index except we store the IVF part in a btree
754/// for faster lookup
755#[derive(Clone, Debug)]
756pub struct BTreeIndex {
757    page_lookup: Arc<BTreeLookup>,
758    index_cache: WeakLanceCache,
759    store: Arc<dyn IndexStore>,
760    sub_index: Arc<dyn BTreeSubIndex>,
761    batch_size: u64,
762    frag_reuse_index: Option<Arc<FragReuseIndex>>,
763}
764
765impl DeepSizeOf for BTreeIndex {
766    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
767        // We don't include the index cache, or anything stored in it. For example:
768        // sub_index and fri.
769        self.page_lookup.deep_size_of_children(context) + self.store.deep_size_of_children(context)
770    }
771}
772
773impl BTreeIndex {
774    fn new(
775        tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>,
776        null_pages: Vec<u32>,
777        store: Arc<dyn IndexStore>,
778        index_cache: WeakLanceCache,
779        sub_index: Arc<dyn BTreeSubIndex>,
780        batch_size: u64,
781        frag_reuse_index: Option<Arc<FragReuseIndex>>,
782    ) -> Self {
783        let page_lookup = Arc::new(BTreeLookup::new(tree, null_pages));
784        Self {
785            page_lookup,
786            store,
787            index_cache,
788            sub_index,
789            batch_size,
790            frag_reuse_index,
791        }
792    }
793
794    async fn lookup_page(
795        &self,
796        page_number: u32,
797        index_reader: LazyIndexReader,
798        metrics: &dyn MetricsCollector,
799    ) -> Result<Arc<dyn ScalarIndex>> {
800        self.index_cache
801            .get_or_insert_with_key(BTreePageKey { page_number }, move || async move {
802                let result = self.read_page(page_number, index_reader, metrics).await?;
803                Ok(CachedScalarIndex::new(result))
804            })
805            .await
806            .map(|v| v.as_ref().clone().into_inner())
807    }
808
809    async fn read_page(
810        &self,
811        page_number: u32,
812        index_reader: LazyIndexReader,
813        metrics: &dyn MetricsCollector,
814    ) -> Result<Arc<dyn ScalarIndex>> {
815        metrics.record_part_load();
816        info!(target: TRACE_IO_EVENTS, r#type=IO_TYPE_LOAD_SCALAR_PART, index_type="btree", part_id=page_number);
817        let index_reader = index_reader.get().await?;
818        let mut serialized_page = index_reader
819            .read_record_batch(page_number as u64, self.batch_size)
820            .await?;
821        if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
822            serialized_page =
823                frag_reuse_index_ref.remap_row_ids_record_batch(serialized_page, 1)?;
824        }
825        let result = self.sub_index.load_subindex(serialized_page).await?;
826        Ok(result)
827    }
828
829    async fn search_page(
830        &self,
831        query: &SargableQuery,
832        page_number: u32,
833        index_reader: LazyIndexReader,
834        metrics: &dyn MetricsCollector,
835    ) -> Result<RowIdTreeMap> {
836        let subindex = self.lookup_page(page_number, index_reader, metrics).await?;
837        // TODO: If this is an IN query we can perhaps simplify the subindex query by restricting it to the
838        // values that might be in the page.  E.g. if we are searching for X IN [5, 3, 7] and five is in pages
839        // 1 and 2 and three is in page 2 and seven is in pages 8 and 9, then when searching page 2 we only need
840        // to search for X IN [5, 3]
841        match subindex.search(query, metrics).await? {
842            SearchResult::Exact(map) => Ok(map),
843            _ => Err(Error::Internal {
844                message: "BTree sub-indices need to return exact results".to_string(),
845                location: location!(),
846            }),
847        }
848    }
849
850    fn try_from_serialized(
851        data: RecordBatch,
852        store: Arc<dyn IndexStore>,
853        index_cache: &LanceCache,
854        batch_size: u64,
855        frag_reuse_index: Option<Arc<FragReuseIndex>>,
856    ) -> Result<Self> {
857        let mut map = BTreeMap::<OrderableScalarValue, Vec<PageRecord>>::new();
858        let mut null_pages = Vec::<u32>::new();
859
860        if data.num_rows() == 0 {
861            let data_type = data.column(0).data_type().clone();
862            let sub_index = Arc::new(FlatIndexMetadata::new(data_type));
863            return Ok(Self::new(
864                map,
865                null_pages,
866                store,
867                WeakLanceCache::from(index_cache),
868                sub_index,
869                batch_size,
870                frag_reuse_index,
871            ));
872        }
873
874        let mins = data.column(0);
875        let maxs = data.column(1);
876        let null_counts = data
877            .column(2)
878            .as_any()
879            .downcast_ref::<UInt32Array>()
880            .unwrap();
881        let page_numbers = data
882            .column(3)
883            .as_any()
884            .downcast_ref::<UInt32Array>()
885            .unwrap();
886
887        for idx in 0..data.num_rows() {
888            let min = OrderableScalarValue(ScalarValue::try_from_array(&mins, idx)?);
889            let max = OrderableScalarValue(ScalarValue::try_from_array(&maxs, idx)?);
890            let null_count = null_counts.values()[idx];
891            let page_number = page_numbers.values()[idx];
892
893            // If the page is entirely null don't even bother putting it in the tree
894            if !max.0.is_null() {
895                map.entry(min)
896                    .or_default()
897                    .push(PageRecord { max, page_number });
898            }
899
900            if null_count > 0 {
901                null_pages.push(page_number);
902            }
903        }
904
905        let last_max = ScalarValue::try_from_array(&maxs, data.num_rows() - 1)?;
906        map.entry(OrderableScalarValue(last_max)).or_default();
907
908        let data_type = mins.data_type();
909
910        // TODO: Support other page types?
911        let sub_index = Arc::new(FlatIndexMetadata::new(data_type.clone()));
912
913        Ok(Self::new(
914            map,
915            null_pages,
916            store,
917            WeakLanceCache::from(index_cache),
918            sub_index,
919            batch_size,
920            frag_reuse_index,
921        ))
922    }
923
924    async fn load(
925        store: Arc<dyn IndexStore>,
926        frag_reuse_index: Option<Arc<FragReuseIndex>>,
927        index_cache: &LanceCache,
928    ) -> Result<Arc<Self>> {
929        let page_lookup_file = store.open_index_file(BTREE_LOOKUP_NAME).await?;
930        let num_rows_in_lookup = page_lookup_file.num_rows();
931        let serialized_lookup = page_lookup_file
932            .read_range(0..num_rows_in_lookup, None)
933            .await?;
934        let file_schema = page_lookup_file.schema();
935        let batch_size = file_schema
936            .metadata
937            .get(BATCH_SIZE_META_KEY)
938            .map(|bs| bs.parse().unwrap_or(DEFAULT_BTREE_BATCH_SIZE))
939            .unwrap_or(DEFAULT_BTREE_BATCH_SIZE);
940        Ok(Arc::new(Self::try_from_serialized(
941            serialized_lookup,
942            store,
943            index_cache,
944            batch_size,
945            frag_reuse_index,
946        )?))
947    }
948
949    /// Create a stream of all the data in the index, in the same format used to train the index
950    async fn into_data_stream(self) -> Result<SendableRecordBatchStream> {
951        let reader = self.store.open_index_file(BTREE_PAGES_NAME).await?;
952        let schema = self.sub_index.schema().clone();
953        let value_field = schema.field(0).clone().with_name(VALUE_COLUMN_NAME);
954        let row_id_field = schema.field(1).clone().with_name(ROW_ID);
955        let new_schema = Arc::new(Schema::new(vec![value_field, row_id_field]));
956        let new_schema_clone = new_schema.clone();
957        let reader_stream = IndexReaderStream::new(reader, self.batch_size).await;
958        let batches = reader_stream
959            .map(|fut| fut.map_err(DataFusionError::from))
960            .buffered(self.store.io_parallelism())
961            .map_ok(move |batch| {
962                RecordBatch::try_new(
963                    new_schema.clone(),
964                    vec![batch.column(0).clone(), batch.column(1).clone()],
965                )
966                .unwrap()
967            })
968            .boxed();
969        Ok(Box::pin(RecordBatchStreamAdapter::new(
970            new_schema_clone,
971            batches,
972        )))
973    }
974
975    async fn into_old_data(self) -> Result<Arc<dyn ExecutionPlan>> {
976        let stream = self.into_data_stream().await?;
977        Ok(Arc::new(OneShotExec::new(stream)))
978    }
979
980    async fn combine_old_new(
981        self,
982        new_data: SendableRecordBatchStream,
983        chunk_size: u64,
984    ) -> Result<SendableRecordBatchStream> {
985        let value_column_index = new_data.schema().index_of(VALUE_COLUMN_NAME)?;
986
987        let new_input = Arc::new(OneShotExec::new(new_data));
988        let old_input = self.into_old_data().await?;
989        debug_assert_eq!(
990            old_input.schema().flattened_fields().len(),
991            new_input.schema().flattened_fields().len()
992        );
993
994        let sort_expr = PhysicalSortExpr {
995            expr: Arc::new(Column::new(VALUE_COLUMN_NAME, value_column_index)),
996            options: SortOptions {
997                descending: false,
998                nulls_first: true,
999            },
1000        };
1001        // The UnionExec creates multiple partitions but the SortPreservingMergeExec merges
1002        // them back into a single partition.
1003        let all_data = Arc::new(UnionExec::new(vec![old_input, new_input]));
1004        let ordered = Arc::new(SortPreservingMergeExec::new([sort_expr].into(), all_data));
1005
1006        let unchunked = execute_plan(
1007            ordered,
1008            LanceExecutionOptions {
1009                use_spilling: true,
1010                ..Default::default()
1011            },
1012        )?;
1013        Ok(chunk_concat_stream(unchunked, chunk_size as usize))
1014    }
1015}
1016
1017fn wrap_bound(bound: &Bound<ScalarValue>) -> Bound<OrderableScalarValue> {
1018    match bound {
1019        Bound::Unbounded => Bound::Unbounded,
1020        Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
1021        Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
1022    }
1023}
1024
1025fn serialize_with_display<T: Display, S: Serializer>(
1026    value: &Option<T>,
1027    serializer: S,
1028) -> std::result::Result<S::Ok, S::Error> {
1029    if let Some(value) = value {
1030        serializer.collect_str(value)
1031    } else {
1032        serializer.collect_str("N/A")
1033    }
1034}
1035
1036#[derive(Serialize)]
1037struct BTreeStatistics {
1038    #[serde(serialize_with = "serialize_with_display")]
1039    min: Option<OrderableScalarValue>,
1040    #[serde(serialize_with = "serialize_with_display")]
1041    max: Option<OrderableScalarValue>,
1042    num_pages: u32,
1043}
1044
1045#[async_trait]
1046impl Index for BTreeIndex {
1047    fn as_any(&self) -> &dyn Any {
1048        self
1049    }
1050
1051    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
1052        self
1053    }
1054
1055    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
1056        Err(Error::NotSupported {
1057            source: "BTreeIndex is not vector index".into(),
1058            location: location!(),
1059        })
1060    }
1061
1062    async fn prewarm(&self) -> Result<()> {
1063        let index_reader = LazyIndexReader::new(self.store.clone());
1064        let reader = index_reader.get().await?;
1065        let num_rows = reader.num_rows();
1066        let batch_size = self.batch_size as usize;
1067        let num_pages = num_rows.div_ceil(batch_size);
1068        let mut pages = stream::iter(0..num_pages)
1069            .map(|page_idx| {
1070                let index_reader = index_reader.clone();
1071                let page_idx = page_idx as u32;
1072                async move {
1073                    let page = self
1074                        .read_page(page_idx, index_reader, &NoOpMetricsCollector)
1075                        .await?;
1076                    Result::Ok((page_idx, page))
1077                }
1078            })
1079            .buffer_unordered(get_num_compute_intensive_cpus());
1080
1081        while let Some((page_idx, page)) = pages.try_next().await? {
1082            let inserted = self
1083                .index_cache
1084                .insert_with_key(
1085                    &BTreePageKey {
1086                        page_number: page_idx,
1087                    },
1088                    Arc::new(CachedScalarIndex::new(page)),
1089                )
1090                .await;
1091
1092            if !inserted {
1093                return Err(Error::Internal {
1094                    message: "Failed to prewarm index: cache is no longer available".to_string(),
1095                    location: location!(),
1096                });
1097            }
1098        }
1099
1100        Ok(())
1101    }
1102
1103    fn index_type(&self) -> IndexType {
1104        IndexType::BTree
1105    }
1106
1107    fn statistics(&self) -> Result<serde_json::Value> {
1108        let min = self
1109            .page_lookup
1110            .tree
1111            .first_key_value()
1112            .map(|(k, _)| k.clone());
1113        let max = self
1114            .page_lookup
1115            .tree
1116            .last_key_value()
1117            .map(|(k, _)| k.clone());
1118        serde_json::to_value(&BTreeStatistics {
1119            num_pages: self.page_lookup.tree.len() as u32,
1120            min,
1121            max,
1122        })
1123        .map_err(|err| err.into())
1124    }
1125
1126    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
1127        let mut frag_ids = RoaringBitmap::default();
1128
1129        let sub_index_reader = self.store.open_index_file(BTREE_PAGES_NAME).await?;
1130        let mut reader_stream = IndexReaderStream::new(sub_index_reader, self.batch_size)
1131            .await
1132            .buffered(self.store.io_parallelism());
1133        while let Some(serialized) = reader_stream.try_next().await? {
1134            let page = self.sub_index.load_subindex(serialized).await?;
1135            frag_ids |= page.calculate_included_frags().await?;
1136        }
1137
1138        Ok(frag_ids)
1139    }
1140}
1141
1142#[async_trait]
1143impl ScalarIndex for BTreeIndex {
1144    async fn search(
1145        &self,
1146        query: &dyn AnyQuery,
1147        metrics: &dyn MetricsCollector,
1148    ) -> Result<SearchResult> {
1149        let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
1150        let pages = match query {
1151            SargableQuery::Equals(val) => self
1152                .page_lookup
1153                .pages_eq(&OrderableScalarValue(val.clone())),
1154            SargableQuery::Range(start, end) => self
1155                .page_lookup
1156                .pages_between((wrap_bound(start).as_ref(), wrap_bound(end).as_ref())),
1157            SargableQuery::IsIn(values) => self
1158                .page_lookup
1159                .pages_in(values.iter().map(|val| OrderableScalarValue(val.clone()))),
1160            SargableQuery::FullTextSearch(_) => return Err(Error::invalid_input(
1161                "full text search is not supported for BTree index, build a inverted index for it",
1162                location!(),
1163            )),
1164            SargableQuery::IsNull() => self.page_lookup.pages_null(),
1165        };
1166        let lazy_index_reader = LazyIndexReader::new(self.store.clone());
1167        let page_tasks = pages
1168            .into_iter()
1169            .map(|page_index| {
1170                self.search_page(query, page_index, lazy_index_reader.clone(), metrics)
1171                    .boxed()
1172            })
1173            .collect::<Vec<_>>();
1174        debug!("Searching {} btree pages", page_tasks.len());
1175        let row_ids = stream::iter(page_tasks)
1176            // I/O and compute mixed here but important case is index in cache so
1177            // use compute intensive thread count
1178            .buffered(get_num_compute_intensive_cpus())
1179            .try_collect::<RowIdTreeMap>()
1180            .await?;
1181        Ok(SearchResult::Exact(row_ids))
1182    }
1183
1184    fn can_remap(&self) -> bool {
1185        true
1186    }
1187
1188    async fn remap(
1189        &self,
1190        mapping: &HashMap<u64, Option<u64>>,
1191        dest_store: &dyn IndexStore,
1192    ) -> Result<CreatedIndex> {
1193        // Remap and write the pages
1194        let mut sub_index_file = dest_store
1195            .new_index_file(BTREE_PAGES_NAME, self.sub_index.schema().clone())
1196            .await?;
1197
1198        let sub_index_reader = self.store.open_index_file(BTREE_PAGES_NAME).await?;
1199        let mut reader_stream = IndexReaderStream::new(sub_index_reader, self.batch_size)
1200            .await
1201            .buffered(self.store.io_parallelism());
1202        while let Some(serialized) = reader_stream.try_next().await? {
1203            let remapped = self.sub_index.remap_subindex(serialized, mapping).await?;
1204            sub_index_file.write_record_batch(remapped).await?;
1205        }
1206
1207        sub_index_file.finish().await?;
1208
1209        // Copy the lookup file as-is
1210        self.store
1211            .copy_index_file(BTREE_LOOKUP_NAME, dest_store)
1212            .await?;
1213
1214        Ok(CreatedIndex {
1215            index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1216                .unwrap(),
1217            index_version: BTREE_INDEX_VERSION,
1218        })
1219    }
1220
1221    async fn update(
1222        &self,
1223        new_data: SendableRecordBatchStream,
1224        dest_store: &dyn IndexStore,
1225    ) -> Result<CreatedIndex> {
1226        // Merge the existing index data with the new data and then retrain the index on the merged stream
1227        let merged_data_source = self
1228            .clone()
1229            .combine_old_new(new_data, self.batch_size)
1230            .await?;
1231        train_btree_index(
1232            merged_data_source,
1233            self.sub_index.as_ref(),
1234            dest_store,
1235            self.batch_size,
1236            None,
1237        )
1238        .await?;
1239
1240        Ok(CreatedIndex {
1241            index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1242                .unwrap(),
1243            index_version: BTREE_INDEX_VERSION,
1244        })
1245    }
1246
1247    fn update_criteria(&self) -> UpdateCriteria {
1248        UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::Values).with_row_id())
1249    }
1250
1251    fn derive_index_params(&self) -> Result<ScalarIndexParams> {
1252        let params = serde_json::to_value(BTreeParameters {
1253            zone_size: Some(self.batch_size),
1254        })?;
1255        Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::BTree).with_params(&params))
1256    }
1257}
1258
1259struct BatchStats {
1260    min: ScalarValue,
1261    max: ScalarValue,
1262    null_count: u32,
1263}
1264
1265fn analyze_batch(batch: &RecordBatch) -> Result<BatchStats> {
1266    let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
1267    if values.is_empty() {
1268        return Err(Error::Internal {
1269            message: "received an empty batch in btree training".to_string(),
1270            location: location!(),
1271        });
1272    }
1273    let min = ScalarValue::try_from_array(&values, 0).map_err(|e| Error::Internal {
1274        message: format!("failed to get min value from batch: {}", e),
1275        location: location!(),
1276    })?;
1277    let max =
1278        ScalarValue::try_from_array(&values, values.len() - 1).map_err(|e| Error::Internal {
1279            message: format!("failed to get max value from batch: {}", e),
1280            location: location!(),
1281        })?;
1282
1283    Ok(BatchStats {
1284        min,
1285        max,
1286        null_count: values.null_count() as u32,
1287    })
1288}
1289
1290/// A trait that must be implemented by anything that wishes to act as a btree subindex
1291#[async_trait]
1292pub trait BTreeSubIndex: Debug + Send + Sync + DeepSizeOf {
1293    /// Trains the subindex on a single batch of data and serializes it to Arrow
1294    async fn train(&self, batch: RecordBatch) -> Result<RecordBatch>;
1295
1296    /// Deserialize a subindex from Arrow
1297    async fn load_subindex(&self, serialized: RecordBatch) -> Result<Arc<dyn ScalarIndex>>;
1298
1299    /// Retrieve the data used to originally train this page
1300    ///
1301    /// In order to perform an update we need to merge the old data in with the new data which
1302    /// means we need to access the new data.  Right now this is convenient for flat indices but
1303    /// we may need to take a different approach if we ever decide to use a sub-index other than
1304    /// flat
1305    async fn retrieve_data(&self, serialized: RecordBatch) -> Result<RecordBatch>;
1306
1307    /// The schema of the subindex when serialized to Arrow
1308    fn schema(&self) -> &Arc<Schema>;
1309
1310    /// Given a serialized page, deserialize it, remap the row ids, and re-serialize it
1311    async fn remap_subindex(
1312        &self,
1313        serialized: RecordBatch,
1314        mapping: &HashMap<u64, Option<u64>>,
1315    ) -> Result<RecordBatch>;
1316}
1317
1318struct EncodedBatch {
1319    stats: BatchStats,
1320    page_number: u32,
1321}
1322
1323async fn train_btree_page(
1324    batch: RecordBatch,
1325    batch_idx: u32,
1326    sub_index_trainer: &dyn BTreeSubIndex,
1327    writer: &mut dyn IndexWriter,
1328) -> Result<EncodedBatch> {
1329    let stats = analyze_batch(&batch)?;
1330    let trained = sub_index_trainer.train(batch).await?;
1331    writer.write_record_batch(trained).await?;
1332    Ok(EncodedBatch {
1333        stats,
1334        page_number: batch_idx,
1335    })
1336}
1337
1338fn btree_stats_as_batch(stats: Vec<EncodedBatch>, value_type: &DataType) -> Result<RecordBatch> {
1339    let mins = if stats.is_empty() {
1340        new_empty_array(value_type)
1341    } else {
1342        ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.min.clone()))?
1343    };
1344    let maxs = if stats.is_empty() {
1345        new_empty_array(value_type)
1346    } else {
1347        ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.max.clone()))?
1348    };
1349    let null_counts = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.stats.null_count));
1350    let page_numbers = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.page_number));
1351
1352    let schema = Arc::new(Schema::new(vec![
1353        // min and max can be null if the entire batch is null values
1354        Field::new("min", mins.data_type().clone(), true),
1355        Field::new("max", maxs.data_type().clone(), true),
1356        Field::new("null_count", null_counts.data_type().clone(), false),
1357        Field::new("page_idx", page_numbers.data_type().clone(), false),
1358    ]));
1359
1360    let columns = vec![
1361        mins,
1362        maxs,
1363        Arc::new(null_counts) as Arc<dyn Array>,
1364        Arc::new(page_numbers) as Arc<dyn Array>,
1365    ];
1366
1367    Ok(RecordBatch::try_new(schema, columns)?)
1368}
1369
1370/// Train a btree index from a stream of sorted page-size batches of values and row ids
1371///
1372/// Note: This is likely to change.  It is unreasonable to expect the caller to do the sorting
1373/// and re-chunking into page-size batches.  This is left for simplicity as this feature is still
1374/// a work in progress
1375pub async fn train_btree_index(
1376    batches_source: SendableRecordBatchStream,
1377    sub_index_trainer: &dyn BTreeSubIndex,
1378    index_store: &dyn IndexStore,
1379    batch_size: u64,
1380    fragment_ids: Option<Vec<u32>>,
1381) -> Result<()> {
1382    let fragment_mask = fragment_ids.as_ref().and_then(|frag_ids| {
1383        if !frag_ids.is_empty() {
1384            // Create a mask with fragment_id in high 32 bits for distributed indexing
1385            // This mask is used to filter partitions belonging to specific fragments
1386            // If multiple fragments processed, use first fragment_id <<32 as mask
1387            Some((frag_ids[0] as u64) << 32)
1388        } else {
1389            None
1390        }
1391    });
1392
1393    let mut sub_index_file;
1394    if fragment_mask.is_none() {
1395        sub_index_file = index_store
1396            .new_index_file(BTREE_PAGES_NAME, sub_index_trainer.schema().clone())
1397            .await?;
1398    } else {
1399        sub_index_file = index_store
1400            .new_index_file(
1401                part_page_data_file_path(fragment_mask.unwrap()).as_str(),
1402                sub_index_trainer.schema().clone(),
1403            )
1404            .await?;
1405    }
1406
1407    let mut encoded_batches = Vec::new();
1408    let mut batch_idx = 0;
1409
1410    let value_type = batches_source
1411        .schema()
1412        .field_with_name(VALUE_COLUMN_NAME)?
1413        .data_type()
1414        .clone();
1415
1416    let mut batches_source = chunk_concat_stream(batches_source, batch_size as usize);
1417
1418    while let Some(batch) = batches_source.try_next().await? {
1419        encoded_batches.push(
1420            train_btree_page(batch, batch_idx, sub_index_trainer, sub_index_file.as_mut()).await?,
1421        );
1422        batch_idx += 1;
1423    }
1424    sub_index_file.finish().await?;
1425    let record_batch = btree_stats_as_batch(encoded_batches, &value_type)?;
1426    let mut file_schema = record_batch.schema().as_ref().clone();
1427    file_schema
1428        .metadata
1429        .insert(BATCH_SIZE_META_KEY.to_string(), batch_size.to_string());
1430    let mut btree_index_file;
1431    if fragment_mask.is_none() {
1432        btree_index_file = index_store
1433            .new_index_file(BTREE_LOOKUP_NAME, Arc::new(file_schema))
1434            .await?;
1435    } else {
1436        btree_index_file = index_store
1437            .new_index_file(
1438                part_lookup_file_path(fragment_mask.unwrap()).as_str(),
1439                Arc::new(file_schema),
1440            )
1441            .await?;
1442    }
1443    btree_index_file.write_record_batch(record_batch).await?;
1444    btree_index_file.finish().await?;
1445    Ok(())
1446}
1447
1448pub async fn merge_index_files(
1449    object_store: &ObjectStore,
1450    index_dir: &Path,
1451    store: Arc<dyn IndexStore>,
1452    batch_readhead: Option<usize>,
1453) -> Result<()> {
1454    // List all partition page / lookup files in the index directory
1455    let (part_page_files, part_lookup_files) =
1456        list_page_lookup_files(object_store, index_dir).await?;
1457    merge_metadata_files(store, &part_page_files, &part_lookup_files, batch_readhead).await
1458}
1459
1460/// List and filter files from the index directory
1461/// Returns (page_files, lookup_files)
1462async fn list_page_lookup_files(
1463    object_store: &ObjectStore,
1464    index_dir: &Path,
1465) -> Result<(Vec<String>, Vec<String>)> {
1466    let mut part_page_files = Vec::new();
1467    let mut part_lookup_files = Vec::new();
1468
1469    let mut list_stream = object_store.list(Some(index_dir.clone()));
1470
1471    while let Some(item) = list_stream.next().await {
1472        match item {
1473            Ok(meta) => {
1474                let file_name = meta.location.filename().unwrap_or_default();
1475                // Filter files matching the pattern part_*_page_data.lance
1476                if file_name.starts_with("part_") && file_name.ends_with("_page_data.lance") {
1477                    part_page_files.push(file_name.to_string());
1478                }
1479                // Filter files matching the pattern part_*_page_lookup.lance
1480                if file_name.starts_with("part_") && file_name.ends_with("_page_lookup.lance") {
1481                    part_lookup_files.push(file_name.to_string());
1482                }
1483            }
1484            Err(_) => continue,
1485        }
1486    }
1487
1488    if part_page_files.is_empty() || part_lookup_files.is_empty() {
1489        return Err(Error::Internal {
1490            message: format!(
1491                "No partition metadata files found in index directory: {} (page_files: {}, lookup_files: {})",
1492                index_dir, part_page_files.len(), part_lookup_files.len()
1493            ),
1494            location: location!(),
1495        });
1496    }
1497
1498    Ok((part_page_files, part_lookup_files))
1499}
1500
1501/// Merge multiple partition page / lookup files into a complete metadata file
1502///
1503/// In a distributed environment, each worker node writes partition page / lookup files for the partitions it processes,
1504/// and this function merges these files into a final metadata file.
1505async fn merge_metadata_files(
1506    store: Arc<dyn IndexStore>,
1507    part_page_files: &[String],
1508    part_lookup_files: &[String],
1509    batch_readhead: Option<usize>,
1510) -> Result<()> {
1511    if part_lookup_files.is_empty() || part_page_files.is_empty() {
1512        return Err(Error::Internal {
1513            message: "No partition files provided for merging".to_string(),
1514            location: location!(),
1515        });
1516    }
1517
1518    // Step 1: Create lookup map for page files by partition ID
1519    if part_lookup_files.len() != part_page_files.len() {
1520        return Err(Error::Internal {
1521            message: format!(
1522                "Number of partition lookup files ({}) does not match number of partition page files ({})",
1523                part_lookup_files.len(),
1524                part_page_files.len()
1525            ),
1526            location: location!(),
1527        });
1528    }
1529    let mut page_files_map = HashMap::new();
1530    for page_file in part_page_files {
1531        let partition_id = extract_partition_id(page_file)?;
1532        page_files_map.insert(partition_id, page_file);
1533    }
1534
1535    // Step 2: Validate that all lookup files have corresponding page files
1536    for lookup_file in part_lookup_files {
1537        let partition_id = extract_partition_id(lookup_file)?;
1538        if !page_files_map.contains_key(&partition_id) {
1539            return Err(Error::Internal {
1540                message: format!(
1541                    "No corresponding page file found for lookup file: {} (partition_id: {})",
1542                    lookup_file, partition_id
1543                ),
1544                location: location!(),
1545            });
1546        }
1547    }
1548
1549    // Step 3: Extract metadata from lookup files
1550    let first_lookup_reader = store.open_index_file(&part_lookup_files[0]).await?;
1551    let batch_size = first_lookup_reader
1552        .schema()
1553        .metadata
1554        .get(BATCH_SIZE_META_KEY)
1555        .map(|bs| bs.parse().unwrap_or(DEFAULT_BTREE_BATCH_SIZE))
1556        .unwrap_or(DEFAULT_BTREE_BATCH_SIZE);
1557
1558    // Get the value type from lookup schema (min column)
1559    let value_type = first_lookup_reader
1560        .schema()
1561        .fields
1562        .first()
1563        .unwrap()
1564        .data_type();
1565
1566    // Get page schema first
1567    let partition_id = extract_partition_id(part_lookup_files[0].as_str())?;
1568    let page_file = page_files_map.get(&partition_id).unwrap();
1569    let page_reader = store.open_index_file(page_file).await?;
1570    let page_schema = page_reader.schema().clone();
1571
1572    let arrow_schema = Arc::new(Schema::from(&page_schema));
1573    let mut page_file = store
1574        .new_index_file(BTREE_PAGES_NAME, arrow_schema.clone())
1575        .await?;
1576
1577    // Step 4: Merge pages and create lookup entries
1578    let lookup_entries = merge_pages(
1579        part_lookup_files,
1580        &page_files_map,
1581        &store,
1582        batch_size,
1583        &mut page_file,
1584        arrow_schema.clone(),
1585        batch_readhead,
1586    )
1587    .await?;
1588
1589    page_file.finish().await?;
1590
1591    // Step 5: Generate new lookup file based on reorganized pages
1592    // Add batch_size to schema metadata
1593    let mut metadata = HashMap::new();
1594    metadata.insert(BATCH_SIZE_META_KEY.to_string(), batch_size.to_string());
1595
1596    let lookup_schema_with_metadata = Arc::new(Schema::new_with_metadata(
1597        vec![
1598            Field::new("min", value_type.clone(), true),
1599            Field::new("max", value_type, true),
1600            Field::new("null_count", DataType::UInt32, false),
1601            Field::new("page_idx", DataType::UInt32, false),
1602        ],
1603        metadata,
1604    ));
1605
1606    let lookup_batch = RecordBatch::try_new(
1607        lookup_schema_with_metadata.clone(),
1608        vec![
1609            ScalarValue::iter_to_array(lookup_entries.iter().map(|(min, _, _, _)| min.clone()))?,
1610            ScalarValue::iter_to_array(lookup_entries.iter().map(|(_, max, _, _)| max.clone()))?,
1611            Arc::new(UInt32Array::from_iter_values(
1612                lookup_entries
1613                    .iter()
1614                    .map(|(_, _, null_count, _)| *null_count),
1615            )),
1616            Arc::new(UInt32Array::from_iter_values(
1617                lookup_entries.iter().map(|(_, _, _, page_idx)| *page_idx),
1618            )),
1619        ],
1620    )?;
1621
1622    let mut lookup_file = store
1623        .new_index_file(BTREE_LOOKUP_NAME, lookup_schema_with_metadata)
1624        .await?;
1625    lookup_file.write_record_batch(lookup_batch).await?;
1626    lookup_file.finish().await?;
1627
1628    // After successfully writing the merged files, delete all partition files
1629    // Only perform deletion after files are successfully written, ensuring debug information is not lost in case of failure
1630    cleanup_partition_files(&store, part_lookup_files, part_page_files).await;
1631
1632    Ok(())
1633}
1634
1635/// Merge pages using Datafusion's SortPreservingMergeExec
1636/// which implements a K-way merge algorithm with fixed-size output batches
1637async fn merge_pages(
1638    part_lookup_files: &[String],
1639    page_files_map: &HashMap<u64, &String>,
1640    store: &Arc<dyn IndexStore>,
1641    batch_size: u64,
1642    page_file: &mut Box<dyn IndexWriter>,
1643    arrow_schema: Arc<Schema>,
1644    batch_readhead: Option<usize>,
1645) -> Result<Vec<(ScalarValue, ScalarValue, u32, u32)>> {
1646    let mut lookup_entries = Vec::new();
1647    let mut page_idx = 0u32;
1648
1649    debug!(
1650        "Starting SortPreservingMerge with {} partitions",
1651        part_lookup_files.len()
1652    );
1653
1654    let value_field = arrow_schema.field(0).clone().with_name(VALUE_COLUMN_NAME);
1655    let row_id_field = arrow_schema.field(1).clone().with_name(ROW_ID);
1656    let stream_schema = Arc::new(Schema::new(vec![value_field, row_id_field]));
1657
1658    // Create execution plans for each stream
1659    let mut inputs: Vec<Arc<dyn ExecutionPlan>> = Vec::new();
1660    for lookup_file in part_lookup_files {
1661        let partition_id = extract_partition_id(lookup_file)?;
1662        let page_file_name =
1663            (*page_files_map
1664                .get(&partition_id)
1665                .ok_or_else(|| Error::Internal {
1666                    message: format!("Page file not found for partition ID: {}", partition_id),
1667                    location: location!(),
1668                })?)
1669            .clone();
1670
1671        let reader = store.open_index_file(&page_file_name).await?;
1672
1673        let reader_stream = IndexReaderStream::new(reader, batch_size).await;
1674
1675        let stream = reader_stream
1676            .map(|fut| fut.map_err(DataFusionError::from))
1677            .buffered(batch_readhead.unwrap_or(1))
1678            .boxed();
1679
1680        let sendable_stream =
1681            Box::pin(RecordBatchStreamAdapter::new(stream_schema.clone(), stream));
1682        inputs.push(Arc::new(OneShotExec::new(sendable_stream)));
1683    }
1684
1685    // Create Union execution plan to combine all partitions
1686    let union_inputs = Arc::new(UnionExec::new(inputs));
1687
1688    // Create SortPreservingMerge execution plan
1689    let value_column_index = stream_schema.index_of(VALUE_COLUMN_NAME)?;
1690    let sort_expr = PhysicalSortExpr {
1691        expr: Arc::new(Column::new(VALUE_COLUMN_NAME, value_column_index)),
1692        options: SortOptions {
1693            descending: false,
1694            nulls_first: true,
1695        },
1696    };
1697
1698    let merge_exec = Arc::new(SortPreservingMergeExec::new(
1699        [sort_expr].into(),
1700        union_inputs,
1701    ));
1702
1703    let unchunked = execute_plan(
1704        merge_exec,
1705        LanceExecutionOptions {
1706            use_spilling: false,
1707            ..Default::default()
1708        },
1709    )?;
1710
1711    // Use chunk_concat_stream to ensure fixed batch sizes
1712    let mut chunked_stream = chunk_concat_stream(unchunked, batch_size as usize);
1713
1714    // Process chunked stream
1715    while let Some(batch) = chunked_stream.try_next().await? {
1716        let writer_batch = RecordBatch::try_new(
1717            arrow_schema.clone(),
1718            vec![batch.column(0).clone(), batch.column(1).clone()],
1719        )?;
1720
1721        page_file.write_record_batch(writer_batch).await?;
1722
1723        let min_val = ScalarValue::try_from_array(batch.column(0), 0)?;
1724        let max_val = ScalarValue::try_from_array(batch.column(0), batch.num_rows() - 1)?;
1725        let null_count = batch.column(0).null_count() as u32;
1726
1727        lookup_entries.push((min_val, max_val, null_count, page_idx));
1728        page_idx += 1;
1729    }
1730
1731    Ok(lookup_entries)
1732}
1733
1734/// Extract partition ID from partition file name
1735/// Expected format: "part_{partition_id}_{suffix}.lance"
1736fn extract_partition_id(filename: &str) -> Result<u64> {
1737    if !filename.starts_with("part_") {
1738        return Err(Error::Internal {
1739            message: format!("Invalid partition file name format: {}", filename),
1740            location: location!(),
1741        });
1742    }
1743
1744    let parts: Vec<&str> = filename.split('_').collect();
1745    if parts.len() < 3 {
1746        return Err(Error::Internal {
1747            message: format!("Invalid partition file name format: {}", filename),
1748            location: location!(),
1749        });
1750    }
1751
1752    parts[1].parse::<u64>().map_err(|_| Error::Internal {
1753        message: format!("Failed to parse partition ID from filename: {}", filename),
1754        location: location!(),
1755    })
1756}
1757
1758/// Clean up partition files after successful merge
1759///
1760/// This function safely deletes partition lookup and page files after a successful merge operation.
1761/// File deletion failures are logged but do not affect the overall success of the merge operation.
1762async fn cleanup_partition_files(
1763    store: &Arc<dyn IndexStore>,
1764    part_lookup_files: &[String],
1765    part_page_files: &[String],
1766) {
1767    // Clean up partition lookup files
1768    for file_name in part_lookup_files {
1769        cleanup_single_file(
1770            store,
1771            file_name,
1772            "part_",
1773            "_page_lookup.lance",
1774            "partition lookup",
1775        )
1776        .await;
1777    }
1778
1779    // Clean up partition page files
1780    for file_name in part_page_files {
1781        cleanup_single_file(
1782            store,
1783            file_name,
1784            "part_",
1785            "_page_data.lance",
1786            "partition page",
1787        )
1788        .await;
1789    }
1790}
1791
1792/// Helper function to clean up a single partition file
1793///
1794/// Performs safety checks on the filename pattern before attempting deletion.
1795async fn cleanup_single_file(
1796    store: &Arc<dyn IndexStore>,
1797    file_name: &str,
1798    expected_prefix: &str,
1799    expected_suffix: &str,
1800    file_type: &str,
1801) {
1802    if file_name.starts_with(expected_prefix) && file_name.ends_with(expected_suffix) {
1803        match store.delete_index_file(file_name).await {
1804            Ok(()) => {
1805                debug!("Successfully deleted {} file: {}", file_type, file_name);
1806            }
1807            Err(e) => {
1808                warn!(
1809                    "Failed to delete {} file '{}': {}. \
1810                    This does not affect the merge operation, but may leave \
1811                    partition files that should be cleaned up manually.",
1812                    file_type, file_name, e
1813                );
1814            }
1815        }
1816    } else {
1817        // If the filename doesn't match the expected format, log a warning but don't attempt deletion
1818        warn!(
1819            "Skipping deletion of file '{}' as it does not match the expected \
1820            {} file pattern ({}*{})",
1821            file_name, file_type, expected_prefix, expected_suffix
1822        );
1823    }
1824}
1825
1826pub(crate) fn part_page_data_file_path(partition_id: u64) -> String {
1827    format!("part_{}_{}", partition_id, BTREE_PAGES_NAME)
1828}
1829
1830pub(crate) fn part_lookup_file_path(partition_id: u64) -> String {
1831    format!("part_{}_{}", partition_id, BTREE_LOOKUP_NAME)
1832}
1833
1834/// A stream that reads the original training data back out of the index
1835///
1836/// This is used for updating the index
1837struct IndexReaderStream {
1838    reader: Arc<dyn IndexReader>,
1839    batch_size: u64,
1840    num_batches: u32,
1841    batch_idx: u32,
1842}
1843
1844impl IndexReaderStream {
1845    async fn new(reader: Arc<dyn IndexReader>, batch_size: u64) -> Self {
1846        let num_batches = reader.num_batches(batch_size).await;
1847        Self {
1848            reader,
1849            batch_size,
1850            num_batches,
1851            batch_idx: 0,
1852        }
1853    }
1854}
1855
1856impl Stream for IndexReaderStream {
1857    type Item = BoxFuture<'static, Result<RecordBatch>>;
1858
1859    fn poll_next(
1860        self: std::pin::Pin<&mut Self>,
1861        _cx: &mut std::task::Context<'_>,
1862    ) -> std::task::Poll<Option<Self::Item>> {
1863        let this = self.get_mut();
1864        if this.batch_idx >= this.num_batches {
1865            return std::task::Poll::Ready(None);
1866        }
1867        let batch_num = this.batch_idx;
1868        this.batch_idx += 1;
1869        let reader_copy = this.reader.clone();
1870        let batch_size = this.batch_size;
1871        let read_task = async move {
1872            reader_copy
1873                .read_record_batch(batch_num as u64, batch_size)
1874                .await
1875        }
1876        .boxed();
1877        std::task::Poll::Ready(Some(read_task))
1878    }
1879}
1880
1881/// Parameters for a btree index
1882#[derive(Debug, Serialize, Deserialize)]
1883pub struct BTreeParameters {
1884    /// The number of rows to include in each zone
1885    pub zone_size: Option<u64>,
1886}
1887
1888struct BTreeTrainingRequest {
1889    parameters: BTreeParameters,
1890    criteria: TrainingCriteria,
1891}
1892
1893impl BTreeTrainingRequest {
1894    pub fn new(parameters: BTreeParameters) -> Self {
1895        Self {
1896            parameters,
1897            // BTree indexes need data sorted by the value column
1898            criteria: TrainingCriteria::new(TrainingOrdering::Values).with_row_id(),
1899        }
1900    }
1901}
1902
1903impl TrainingRequest for BTreeTrainingRequest {
1904    fn as_any(&self) -> &dyn std::any::Any {
1905        self
1906    }
1907
1908    fn criteria(&self) -> &TrainingCriteria {
1909        &self.criteria
1910    }
1911}
1912
1913#[derive(Debug, Default)]
1914pub struct BTreeIndexPlugin;
1915
1916#[async_trait]
1917impl ScalarIndexPlugin for BTreeIndexPlugin {
1918    fn name(&self) -> &str {
1919        "BTree"
1920    }
1921
1922    fn new_training_request(
1923        &self,
1924        params: &str,
1925        field: &Field,
1926    ) -> Result<Box<dyn TrainingRequest>> {
1927        if field.data_type().is_nested() {
1928            return Err(Error::InvalidInput {
1929                source: "A btree index can only be created on a non-nested field.".into(),
1930                location: location!(),
1931            });
1932        }
1933
1934        let params = serde_json::from_str::<BTreeParameters>(params)?;
1935        Ok(Box::new(BTreeTrainingRequest::new(params)))
1936    }
1937
1938    fn provides_exact_answer(&self) -> bool {
1939        true
1940    }
1941
1942    fn version(&self) -> u32 {
1943        BTREE_INDEX_VERSION
1944    }
1945
1946    fn new_query_parser(
1947        &self,
1948        index_name: String,
1949        _index_details: &prost_types::Any,
1950    ) -> Option<Box<dyn ScalarQueryParser>> {
1951        Some(Box::new(SargableQueryParser::new(index_name, false)))
1952    }
1953
1954    async fn train_index(
1955        &self,
1956        data: SendableRecordBatchStream,
1957        index_store: &dyn IndexStore,
1958        request: Box<dyn TrainingRequest>,
1959        fragment_ids: Option<Vec<u32>>,
1960    ) -> Result<CreatedIndex> {
1961        let request = request
1962            .as_any()
1963            .downcast_ref::<BTreeTrainingRequest>()
1964            .unwrap();
1965        let value_type = data
1966            .schema()
1967            .field_with_name(VALUE_COLUMN_NAME)?
1968            .data_type()
1969            .clone();
1970        let flat_index_trainer = FlatIndexMetadata::new(value_type);
1971        train_btree_index(
1972            data,
1973            &flat_index_trainer,
1974            index_store,
1975            request
1976                .parameters
1977                .zone_size
1978                .unwrap_or(DEFAULT_BTREE_BATCH_SIZE),
1979            fragment_ids,
1980        )
1981        .await?;
1982        Ok(CreatedIndex {
1983            index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1984                .unwrap(),
1985            index_version: BTREE_INDEX_VERSION,
1986        })
1987    }
1988
1989    async fn load_index(
1990        &self,
1991        index_store: Arc<dyn IndexStore>,
1992        _index_details: &prost_types::Any,
1993        frag_reuse_index: Option<Arc<FragReuseIndex>>,
1994        cache: &LanceCache,
1995    ) -> Result<Arc<dyn ScalarIndex>> {
1996        Ok(BTreeIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
1997    }
1998}
1999
2000#[cfg(test)]
2001mod tests {
2002    use std::sync::atomic::Ordering;
2003    use std::{collections::HashMap, sync::Arc};
2004
2005    use arrow::datatypes::{Float32Type, Float64Type, Int32Type, UInt64Type};
2006    use arrow_array::FixedSizeListArray;
2007    use arrow_schema::DataType;
2008    use datafusion::{
2009        execution::{SendableRecordBatchStream, TaskContext},
2010        physical_plan::{sorts::sort::SortExec, stream::RecordBatchStreamAdapter, ExecutionPlan},
2011    };
2012    use datafusion_common::{DataFusionError, ScalarValue};
2013    use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
2014    use deepsize::DeepSizeOf;
2015    use futures::TryStreamExt;
2016    use lance_core::utils::tempfile::TempObjDir;
2017    use lance_core::{cache::LanceCache, utils::mask::RowIdTreeMap};
2018    use lance_datafusion::{chunker::break_stream, datagen::DatafusionDatagenExt};
2019    use lance_datagen::{array, gen_batch, ArrayGeneratorExt, BatchCount, RowCount};
2020    use lance_io::object_store::ObjectStore;
2021
2022    use crate::metrics::LocalMetricsCollector;
2023    use crate::{
2024        metrics::NoOpMetricsCollector,
2025        scalar::{
2026            btree::{BTreeIndex, BTREE_PAGES_NAME},
2027            flat::FlatIndexMetadata,
2028            lance_format::LanceIndexStore,
2029            IndexStore, SargableQuery, ScalarIndex, SearchResult,
2030        },
2031    };
2032
2033    use super::{
2034        part_lookup_file_path, part_page_data_file_path, train_btree_index, OrderableScalarValue,
2035        DEFAULT_BTREE_BATCH_SIZE,
2036    };
2037    #[test]
2038    fn test_scalar_value_size() {
2039        let size_of_i32 = OrderableScalarValue(ScalarValue::Int32(Some(0))).deep_size_of();
2040        let size_of_many_i32 = OrderableScalarValue(ScalarValue::FixedSizeList(Arc::new(
2041            FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
2042                vec![Some(vec![Some(0); 128])],
2043                128,
2044            ),
2045        )))
2046        .deep_size_of();
2047
2048        // deep_size_of should account for the rust type overhead
2049        assert!(size_of_i32 > 4);
2050        assert!(size_of_many_i32 > 128 * 4);
2051    }
2052
2053    #[tokio::test]
2054    async fn test_null_ids() {
2055        let tmpdir = TempObjDir::default();
2056        let test_store = Arc::new(LanceIndexStore::new(
2057            Arc::new(ObjectStore::local()),
2058            tmpdir.clone(),
2059            Arc::new(LanceCache::no_cache()),
2060        ));
2061
2062        // Generate 50,000 rows of random data with 80% nulls
2063        let stream = gen_batch()
2064            .col(
2065                "value",
2066                array::rand::<Float32Type>().with_nulls(&[true, false, false, false, false]),
2067            )
2068            .col("_rowid", array::step::<UInt64Type>())
2069            .into_df_stream(RowCount::from(5000), BatchCount::from(10));
2070        let sub_index_trainer = FlatIndexMetadata::new(DataType::Float32);
2071
2072        train_btree_index(stream, &sub_index_trainer, test_store.as_ref(), 5000, None)
2073            .await
2074            .unwrap();
2075
2076        let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2077            .await
2078            .unwrap();
2079
2080        assert_eq!(index.page_lookup.null_pages.len(), 10);
2081
2082        let remap_dir = TempObjDir::default();
2083        let remap_store = Arc::new(LanceIndexStore::new(
2084            Arc::new(ObjectStore::local()),
2085            remap_dir.clone(),
2086            Arc::new(LanceCache::no_cache()),
2087        ));
2088
2089        // Remap with a no-op mapping.  The remapped index should be identical to the original
2090        index
2091            .remap(&HashMap::default(), remap_store.as_ref())
2092            .await
2093            .unwrap();
2094
2095        let remap_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache())
2096            .await
2097            .unwrap();
2098
2099        assert_eq!(remap_index.page_lookup, index.page_lookup);
2100
2101        let original_pages = test_store.open_index_file(BTREE_PAGES_NAME).await.unwrap();
2102        let remapped_pages = remap_store.open_index_file(BTREE_PAGES_NAME).await.unwrap();
2103
2104        assert_eq!(original_pages.num_rows(), remapped_pages.num_rows());
2105
2106        let original_data = original_pages
2107            .read_record_batch(0, original_pages.num_rows() as u64)
2108            .await
2109            .unwrap();
2110        let remapped_data = remapped_pages
2111            .read_record_batch(0, remapped_pages.num_rows() as u64)
2112            .await
2113            .unwrap();
2114
2115        assert_eq!(original_data, remapped_data);
2116    }
2117
2118    #[tokio::test]
2119    async fn test_nan_ordering() {
2120        let tmpdir = TempObjDir::default();
2121        let test_store = Arc::new(LanceIndexStore::new(
2122            Arc::new(ObjectStore::local()),
2123            tmpdir.clone(),
2124            Arc::new(LanceCache::no_cache()),
2125        ));
2126
2127        let values = vec![
2128            0.0,
2129            1.0,
2130            2.0,
2131            3.0,
2132            f64::NAN,
2133            f64::NEG_INFINITY,
2134            f64::INFINITY,
2135        ];
2136
2137        // This is a bit overkill but we've had bugs in the past where DF's sort
2138        // didn't agree with Arrow's sort so we do an end-to-end test here
2139        // and use DF to sort the data like we would in a real dataset.
2140        let data = gen_batch()
2141            .col("value", array::cycle::<Float64Type>(values.clone()))
2142            .col("_rowid", array::step::<UInt64Type>())
2143            .into_df_exec(RowCount::from(10), BatchCount::from(100));
2144        let schema = data.schema();
2145        let sort_expr = PhysicalSortExpr::new_default(col("value", schema.as_ref()).unwrap());
2146        let plan = Arc::new(SortExec::new([sort_expr].into(), data));
2147        let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap();
2148        let stream = break_stream(stream, 64);
2149        let stream = stream.map_err(DataFusionError::from);
2150        let stream =
2151            Box::pin(RecordBatchStreamAdapter::new(schema, stream)) as SendableRecordBatchStream;
2152
2153        let sub_index_trainer = FlatIndexMetadata::new(DataType::Float64);
2154
2155        train_btree_index(stream, &sub_index_trainer, test_store.as_ref(), 64, None)
2156            .await
2157            .unwrap();
2158
2159        let index = BTreeIndex::load(test_store, None, &LanceCache::no_cache())
2160            .await
2161            .unwrap();
2162
2163        for (idx, value) in values.into_iter().enumerate() {
2164            let query = SargableQuery::Equals(ScalarValue::Float64(Some(value)));
2165            let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2166            assert_eq!(
2167                result,
2168                SearchResult::Exact(RowIdTreeMap::from_iter(((idx as u64)..1000).step_by(7)))
2169            );
2170        }
2171    }
2172
2173    #[tokio::test]
2174    async fn test_page_cache() {
2175        let tmpdir = TempObjDir::default();
2176        let test_store = Arc::new(LanceIndexStore::new(
2177            Arc::new(ObjectStore::local()),
2178            tmpdir.clone(),
2179            Arc::new(LanceCache::no_cache()),
2180        ));
2181
2182        let data = gen_batch()
2183            .col("value", array::step::<Float32Type>())
2184            .col("_rowid", array::step::<UInt64Type>())
2185            .into_df_exec(RowCount::from(1000), BatchCount::from(10));
2186        let schema = data.schema();
2187        let sort_expr = PhysicalSortExpr::new_default(col("value", schema.as_ref()).unwrap());
2188        let plan = Arc::new(SortExec::new([sort_expr].into(), data));
2189        let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap();
2190        let stream = break_stream(stream, 64);
2191        let stream = stream.map_err(DataFusionError::from);
2192        let stream =
2193            Box::pin(RecordBatchStreamAdapter::new(schema, stream)) as SendableRecordBatchStream;
2194        let sub_index_trainer = FlatIndexMetadata::new(DataType::Float32);
2195
2196        train_btree_index(stream, &sub_index_trainer, test_store.as_ref(), 64, None)
2197            .await
2198            .unwrap();
2199
2200        let cache = Arc::new(LanceCache::with_capacity(100 * 1024 * 1024));
2201        let index = BTreeIndex::load(test_store, None, cache.as_ref())
2202            .await
2203            .unwrap();
2204
2205        let query = SargableQuery::Equals(ScalarValue::Float32(Some(0.0)));
2206        let metrics = LocalMetricsCollector::default();
2207        let query1 = index.search(&query, &metrics);
2208        let query2 = index.search(&query, &metrics);
2209        tokio::join!(query1, query2).0.unwrap();
2210        assert_eq!(metrics.parts_loaded.load(Ordering::Relaxed), 1);
2211    }
2212
2213    #[tokio::test]
2214    async fn test_fragment_btree_index_consistency() {
2215        // Setup stores for both indexes
2216        let full_tmpdir = TempObjDir::default();
2217        let full_store = Arc::new(LanceIndexStore::new(
2218            Arc::new(ObjectStore::local()),
2219            full_tmpdir.clone(),
2220            Arc::new(LanceCache::no_cache()),
2221        ));
2222
2223        let fragment_tmpdir = TempObjDir::default();
2224        let fragment_store = Arc::new(LanceIndexStore::new(
2225            Arc::new(ObjectStore::local()),
2226            fragment_tmpdir.clone(),
2227            Arc::new(LanceCache::no_cache()),
2228        ));
2229
2230        let sub_index_trainer = FlatIndexMetadata::new(DataType::Int32);
2231
2232        // Method 1: Build complete index directly using the same data
2233        // Create deterministic data for comparison - use 2 * DEFAULT_BTREE_BATCH_SIZE for testing
2234        let total_count = 2 * DEFAULT_BTREE_BATCH_SIZE;
2235        let full_data_gen = gen_batch()
2236            .col("value", array::step::<Int32Type>())
2237            .col("_rowid", array::step::<UInt64Type>())
2238            .into_df_stream(RowCount::from(total_count / 2), BatchCount::from(2));
2239        let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
2240            full_data_gen.schema(),
2241            full_data_gen,
2242        ));
2243
2244        train_btree_index(
2245            full_data_source,
2246            &sub_index_trainer,
2247            full_store.as_ref(),
2248            DEFAULT_BTREE_BATCH_SIZE,
2249            None,
2250        )
2251        .await
2252        .unwrap();
2253
2254        // Method 2: Build fragment-based index using the same data split into fragments
2255        // Create fragment 1 index - first half of the data (0 to DEFAULT_BTREE_BATCH_SIZE-1)
2256        let half_count = DEFAULT_BTREE_BATCH_SIZE;
2257        let fragment1_gen = gen_batch()
2258            .col("value", array::step::<Int32Type>())
2259            .col("_rowid", array::step::<UInt64Type>())
2260            .into_df_stream(RowCount::from(half_count), BatchCount::from(1));
2261        let fragment1_data_source = Box::pin(RecordBatchStreamAdapter::new(
2262            fragment1_gen.schema(),
2263            fragment1_gen,
2264        ));
2265
2266        train_btree_index(
2267            fragment1_data_source,
2268            &sub_index_trainer,
2269            fragment_store.as_ref(),
2270            DEFAULT_BTREE_BATCH_SIZE,
2271            Some(vec![1]), // fragment_id = 1
2272        )
2273        .await
2274        .unwrap();
2275
2276        // Create fragment 2 index - second half of the data (DEFAULT_BTREE_BATCH_SIZE to 2*DEFAULT_BTREE_BATCH_SIZE-1)
2277        let start_val = DEFAULT_BTREE_BATCH_SIZE as i32;
2278        let end_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2279        let values_second_half: Vec<i32> = (start_val..end_val).collect();
2280        let row_ids_second_half: Vec<u64> = (start_val as u64..end_val as u64).collect();
2281        let fragment2_gen = gen_batch()
2282            .col("value", array::cycle::<Int32Type>(values_second_half))
2283            .col("_rowid", array::cycle::<UInt64Type>(row_ids_second_half))
2284            .into_df_stream(RowCount::from(half_count), BatchCount::from(1));
2285        let fragment2_data_source = Box::pin(RecordBatchStreamAdapter::new(
2286            fragment2_gen.schema(),
2287            fragment2_gen,
2288        ));
2289
2290        train_btree_index(
2291            fragment2_data_source,
2292            &sub_index_trainer,
2293            fragment_store.as_ref(),
2294            DEFAULT_BTREE_BATCH_SIZE,
2295            Some(vec![2]), // fragment_id = 2
2296        )
2297        .await
2298        .unwrap();
2299
2300        // Merge the fragment files
2301        let part_page_files = vec![
2302            part_page_data_file_path(1 << 32),
2303            part_page_data_file_path(2 << 32),
2304        ];
2305
2306        let part_lookup_files = vec![
2307            part_lookup_file_path(1 << 32),
2308            part_lookup_file_path(2 << 32),
2309        ];
2310
2311        super::merge_metadata_files(
2312            fragment_store.clone(),
2313            &part_page_files,
2314            &part_lookup_files,
2315            Option::from(1usize),
2316        )
2317        .await
2318        .unwrap();
2319
2320        // Load both indexes
2321        let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
2322            .await
2323            .unwrap();
2324
2325        let merged_index = BTreeIndex::load(fragment_store.clone(), None, &LanceCache::no_cache())
2326            .await
2327            .unwrap();
2328
2329        // Test queries one by one to identify the exact problem
2330
2331        // Test 1: Query for value 0 (should be in first page)
2332        let query_0 = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
2333        let full_result_0 = full_index
2334            .search(&query_0, &NoOpMetricsCollector)
2335            .await
2336            .unwrap();
2337        let merged_result_0 = merged_index
2338            .search(&query_0, &NoOpMetricsCollector)
2339            .await
2340            .unwrap();
2341        assert_eq!(full_result_0, merged_result_0, "Query for value 0 failed");
2342
2343        // Test 2: Query for value in middle of first batch (should be in first page)
2344        let mid_first_batch = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
2345        let query_mid_first = SargableQuery::Equals(ScalarValue::Int32(Some(mid_first_batch)));
2346        let full_result_mid_first = full_index
2347            .search(&query_mid_first, &NoOpMetricsCollector)
2348            .await
2349            .unwrap();
2350        let merged_result_mid_first = merged_index
2351            .search(&query_mid_first, &NoOpMetricsCollector)
2352            .await
2353            .unwrap();
2354        assert_eq!(
2355            full_result_mid_first, merged_result_mid_first,
2356            "Query for value {} failed",
2357            mid_first_batch
2358        );
2359
2360        // Test 3: Query for first value in second batch (should be in second page)
2361        let first_second_batch = DEFAULT_BTREE_BATCH_SIZE as i32;
2362        let query_first_second =
2363            SargableQuery::Equals(ScalarValue::Int32(Some(first_second_batch)));
2364        let full_result_first_second = full_index
2365            .search(&query_first_second, &NoOpMetricsCollector)
2366            .await
2367            .unwrap();
2368        let merged_result_first_second = merged_index
2369            .search(&query_first_second, &NoOpMetricsCollector)
2370            .await
2371            .unwrap();
2372        assert_eq!(
2373            full_result_first_second, merged_result_first_second,
2374            "Query for value {} failed",
2375            first_second_batch
2376        );
2377
2378        // Test 4: Query for value in middle of second batch (should be in second page)
2379        let mid_second_batch = (DEFAULT_BTREE_BATCH_SIZE + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
2380        let query_mid_second = SargableQuery::Equals(ScalarValue::Int32(Some(mid_second_batch)));
2381
2382        let full_result_mid_second = full_index
2383            .search(&query_mid_second, &NoOpMetricsCollector)
2384            .await
2385            .unwrap();
2386        let merged_result_mid_second = merged_index
2387            .search(&query_mid_second, &NoOpMetricsCollector)
2388            .await
2389            .unwrap();
2390        assert_eq!(
2391            full_result_mid_second, merged_result_mid_second,
2392            "Query for value {} failed",
2393            mid_second_batch
2394        );
2395    }
2396
2397    #[tokio::test]
2398    async fn test_fragment_btree_index_boundary_queries() {
2399        // Setup stores for both indexes
2400        let full_tmpdir = TempObjDir::default();
2401        let full_store = Arc::new(LanceIndexStore::new(
2402            Arc::new(ObjectStore::local()),
2403            full_tmpdir.clone(),
2404            Arc::new(LanceCache::no_cache()),
2405        ));
2406
2407        let fragment_tmpdir = TempObjDir::default();
2408        let fragment_store = Arc::new(LanceIndexStore::new(
2409            Arc::new(ObjectStore::local()),
2410            fragment_tmpdir.clone(),
2411            Arc::new(LanceCache::no_cache()),
2412        ));
2413
2414        let sub_index_trainer = FlatIndexMetadata::new(DataType::Int32);
2415
2416        // Use 3 * DEFAULT_BTREE_BATCH_SIZE for more comprehensive boundary testing
2417        let total_count = 3 * DEFAULT_BTREE_BATCH_SIZE;
2418
2419        // Method 1: Build complete index directly
2420        let full_data_gen = gen_batch()
2421            .col("value", array::step::<Int32Type>())
2422            .col("_rowid", array::step::<UInt64Type>())
2423            .into_df_stream(RowCount::from(total_count / 3), BatchCount::from(3));
2424        let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
2425            full_data_gen.schema(),
2426            full_data_gen,
2427        ));
2428
2429        train_btree_index(
2430            full_data_source,
2431            &sub_index_trainer,
2432            full_store.as_ref(),
2433            DEFAULT_BTREE_BATCH_SIZE,
2434            None,
2435        )
2436        .await
2437        .unwrap();
2438
2439        // Method 2: Build fragment-based index using 3 fragments
2440        // Fragment 1: 0 to DEFAULT_BTREE_BATCH_SIZE-1
2441        let fragment_size = DEFAULT_BTREE_BATCH_SIZE;
2442        let fragment1_gen = gen_batch()
2443            .col("value", array::step::<Int32Type>())
2444            .col("_rowid", array::step::<UInt64Type>())
2445            .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
2446        let fragment1_data_source = Box::pin(RecordBatchStreamAdapter::new(
2447            fragment1_gen.schema(),
2448            fragment1_gen,
2449        ));
2450
2451        train_btree_index(
2452            fragment1_data_source,
2453            &sub_index_trainer,
2454            fragment_store.as_ref(),
2455            DEFAULT_BTREE_BATCH_SIZE,
2456            Some(vec![1]),
2457        )
2458        .await
2459        .unwrap();
2460
2461        // Fragment 2: DEFAULT_BTREE_BATCH_SIZE to 2*DEFAULT_BTREE_BATCH_SIZE-1
2462        let start_val2 = DEFAULT_BTREE_BATCH_SIZE as i32;
2463        let end_val2 = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2464        let values_fragment2: Vec<i32> = (start_val2..end_val2).collect();
2465        let row_ids_fragment2: Vec<u64> = (start_val2 as u64..end_val2 as u64).collect();
2466        let fragment2_gen = gen_batch()
2467            .col("value", array::cycle::<Int32Type>(values_fragment2))
2468            .col("_rowid", array::cycle::<UInt64Type>(row_ids_fragment2))
2469            .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
2470        let fragment2_data_source = Box::pin(RecordBatchStreamAdapter::new(
2471            fragment2_gen.schema(),
2472            fragment2_gen,
2473        ));
2474
2475        train_btree_index(
2476            fragment2_data_source,
2477            &sub_index_trainer,
2478            fragment_store.as_ref(),
2479            DEFAULT_BTREE_BATCH_SIZE,
2480            Some(vec![2]),
2481        )
2482        .await
2483        .unwrap();
2484
2485        // Fragment 3: 2*DEFAULT_BTREE_BATCH_SIZE to 3*DEFAULT_BTREE_BATCH_SIZE-1
2486        let start_val3 = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2487        let end_val3 = (3 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2488        let values_fragment3: Vec<i32> = (start_val3..end_val3).collect();
2489        let row_ids_fragment3: Vec<u64> = (start_val3 as u64..end_val3 as u64).collect();
2490        let fragment3_gen = gen_batch()
2491            .col("value", array::cycle::<Int32Type>(values_fragment3))
2492            .col("_rowid", array::cycle::<UInt64Type>(row_ids_fragment3))
2493            .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
2494        let fragment3_data_source = Box::pin(RecordBatchStreamAdapter::new(
2495            fragment3_gen.schema(),
2496            fragment3_gen,
2497        ));
2498
2499        train_btree_index(
2500            fragment3_data_source,
2501            &sub_index_trainer,
2502            fragment_store.as_ref(),
2503            DEFAULT_BTREE_BATCH_SIZE,
2504            Some(vec![3]),
2505        )
2506        .await
2507        .unwrap();
2508
2509        // Merge all fragment files
2510        let part_page_files = vec![
2511            part_page_data_file_path(1 << 32),
2512            part_page_data_file_path(2 << 32),
2513            part_page_data_file_path(3 << 32),
2514        ];
2515
2516        let part_lookup_files = vec![
2517            part_lookup_file_path(1 << 32),
2518            part_lookup_file_path(2 << 32),
2519            part_lookup_file_path(3 << 32),
2520        ];
2521
2522        super::merge_metadata_files(
2523            fragment_store.clone(),
2524            &part_page_files,
2525            &part_lookup_files,
2526            Option::from(1usize),
2527        )
2528        .await
2529        .unwrap();
2530
2531        // Load both indexes
2532        let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
2533            .await
2534            .unwrap();
2535
2536        let merged_index = BTreeIndex::load(fragment_store.clone(), None, &LanceCache::no_cache())
2537            .await
2538            .unwrap();
2539
2540        // === Boundary Value Tests ===
2541
2542        // Test 1: Query minimum value (boundary: data start)
2543        let query_min = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
2544        let full_result_min = full_index
2545            .search(&query_min, &NoOpMetricsCollector)
2546            .await
2547            .unwrap();
2548        let merged_result_min = merged_index
2549            .search(&query_min, &NoOpMetricsCollector)
2550            .await
2551            .unwrap();
2552        assert_eq!(
2553            full_result_min, merged_result_min,
2554            "Query for minimum value 0 failed"
2555        );
2556
2557        // Test 2: Query maximum value (boundary: data end)
2558        let max_val = (3 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
2559        let query_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val)));
2560        let full_result_max = full_index
2561            .search(&query_max, &NoOpMetricsCollector)
2562            .await
2563            .unwrap();
2564        let merged_result_max = merged_index
2565            .search(&query_max, &NoOpMetricsCollector)
2566            .await
2567            .unwrap();
2568        assert_eq!(
2569            full_result_max, merged_result_max,
2570            "Query for maximum value {} failed",
2571            max_val
2572        );
2573
2574        // Test 3: Query fragment boundary value (last value of first fragment)
2575        let fragment1_last = (DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
2576        let query_frag1_last = SargableQuery::Equals(ScalarValue::Int32(Some(fragment1_last)));
2577        let full_result_frag1_last = full_index
2578            .search(&query_frag1_last, &NoOpMetricsCollector)
2579            .await
2580            .unwrap();
2581        let merged_result_frag1_last = merged_index
2582            .search(&query_frag1_last, &NoOpMetricsCollector)
2583            .await
2584            .unwrap();
2585        assert_eq!(
2586            full_result_frag1_last, merged_result_frag1_last,
2587            "Query for fragment 1 last value {} failed",
2588            fragment1_last
2589        );
2590
2591        // Test 4: Query fragment boundary value (first value of second fragment)
2592        let fragment2_first = DEFAULT_BTREE_BATCH_SIZE as i32;
2593        let query_frag2_first = SargableQuery::Equals(ScalarValue::Int32(Some(fragment2_first)));
2594        let full_result_frag2_first = full_index
2595            .search(&query_frag2_first, &NoOpMetricsCollector)
2596            .await
2597            .unwrap();
2598        let merged_result_frag2_first = merged_index
2599            .search(&query_frag2_first, &NoOpMetricsCollector)
2600            .await
2601            .unwrap();
2602        assert_eq!(
2603            full_result_frag2_first, merged_result_frag2_first,
2604            "Query for fragment 2 first value {} failed",
2605            fragment2_first
2606        );
2607
2608        // Test 5: Query fragment boundary value (last value of second fragment)
2609        let fragment2_last = (2 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
2610        let query_frag2_last = SargableQuery::Equals(ScalarValue::Int32(Some(fragment2_last)));
2611        let full_result_frag2_last = full_index
2612            .search(&query_frag2_last, &NoOpMetricsCollector)
2613            .await
2614            .unwrap();
2615        let merged_result_frag2_last = merged_index
2616            .search(&query_frag2_last, &NoOpMetricsCollector)
2617            .await
2618            .unwrap();
2619        assert_eq!(
2620            full_result_frag2_last, merged_result_frag2_last,
2621            "Query for fragment 2 last value {} failed",
2622            fragment2_last
2623        );
2624
2625        // Test 6: Query fragment boundary value (first value of third fragment)
2626        let fragment3_first = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2627        let query_frag3_first = SargableQuery::Equals(ScalarValue::Int32(Some(fragment3_first)));
2628        let full_result_frag3_first = full_index
2629            .search(&query_frag3_first, &NoOpMetricsCollector)
2630            .await
2631            .unwrap();
2632        let merged_result_frag3_first = merged_index
2633            .search(&query_frag3_first, &NoOpMetricsCollector)
2634            .await
2635            .unwrap();
2636        assert_eq!(
2637            full_result_frag3_first, merged_result_frag3_first,
2638            "Query for fragment 3 first value {} failed",
2639            fragment3_first
2640        );
2641
2642        // === Non-existent Value Tests ===
2643
2644        // Test 7: Query value below minimum
2645        let query_below_min = SargableQuery::Equals(ScalarValue::Int32(Some(-1)));
2646        let full_result_below = full_index
2647            .search(&query_below_min, &NoOpMetricsCollector)
2648            .await
2649            .unwrap();
2650        let merged_result_below = merged_index
2651            .search(&query_below_min, &NoOpMetricsCollector)
2652            .await
2653            .unwrap();
2654        assert_eq!(
2655            full_result_below, merged_result_below,
2656            "Query for value below minimum (-1) failed"
2657        );
2658
2659        // Test 8: Query value above maximum
2660        let query_above_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val + 1)));
2661        let full_result_above = full_index
2662            .search(&query_above_max, &NoOpMetricsCollector)
2663            .await
2664            .unwrap();
2665        let merged_result_above = merged_index
2666            .search(&query_above_max, &NoOpMetricsCollector)
2667            .await
2668            .unwrap();
2669        assert_eq!(
2670            full_result_above,
2671            merged_result_above,
2672            "Query for value above maximum ({}) failed",
2673            max_val + 1
2674        );
2675
2676        // === Range Query Tests ===
2677
2678        // Test 9: Cross-fragment range query (from first fragment to second fragment)
2679        let range_start = (DEFAULT_BTREE_BATCH_SIZE - 100) as i32;
2680        let range_end = (DEFAULT_BTREE_BATCH_SIZE + 100) as i32;
2681        let query_cross_frag = SargableQuery::Range(
2682            std::collections::Bound::Included(ScalarValue::Int32(Some(range_start))),
2683            std::collections::Bound::Excluded(ScalarValue::Int32(Some(range_end))),
2684        );
2685        let full_result_cross = full_index
2686            .search(&query_cross_frag, &NoOpMetricsCollector)
2687            .await
2688            .unwrap();
2689        let merged_result_cross = merged_index
2690            .search(&query_cross_frag, &NoOpMetricsCollector)
2691            .await
2692            .unwrap();
2693        assert_eq!(
2694            full_result_cross, merged_result_cross,
2695            "Cross-fragment range query [{}, {}] failed",
2696            range_start, range_end
2697        );
2698
2699        // Test 10: Range query within single fragment
2700        let single_frag_start = 100i32;
2701        let single_frag_end = 200i32;
2702        let query_single_frag = SargableQuery::Range(
2703            std::collections::Bound::Included(ScalarValue::Int32(Some(single_frag_start))),
2704            std::collections::Bound::Excluded(ScalarValue::Int32(Some(single_frag_end))),
2705        );
2706        let full_result_single = full_index
2707            .search(&query_single_frag, &NoOpMetricsCollector)
2708            .await
2709            .unwrap();
2710        let merged_result_single = merged_index
2711            .search(&query_single_frag, &NoOpMetricsCollector)
2712            .await
2713            .unwrap();
2714        assert_eq!(
2715            full_result_single, merged_result_single,
2716            "Single fragment range query [{}, {}] failed",
2717            single_frag_start, single_frag_end
2718        );
2719
2720        // Test 11: Large range query spanning all fragments
2721        let large_range_start = 100i32;
2722        let large_range_end = (3 * DEFAULT_BTREE_BATCH_SIZE - 100) as i32;
2723        let query_large_range = SargableQuery::Range(
2724            std::collections::Bound::Included(ScalarValue::Int32(Some(large_range_start))),
2725            std::collections::Bound::Excluded(ScalarValue::Int32(Some(large_range_end))),
2726        );
2727        let full_result_large = full_index
2728            .search(&query_large_range, &NoOpMetricsCollector)
2729            .await
2730            .unwrap();
2731        let merged_result_large = merged_index
2732            .search(&query_large_range, &NoOpMetricsCollector)
2733            .await
2734            .unwrap();
2735        assert_eq!(
2736            full_result_large, merged_result_large,
2737            "Large range query [{}, {}] failed",
2738            large_range_start, large_range_end
2739        );
2740
2741        // === Range Boundary Query Tests ===
2742
2743        // Test 12: Less than query (implemented using range query, from minimum to specified value)
2744        let lt_val = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
2745        let query_lt = SargableQuery::Range(
2746            std::collections::Bound::Included(ScalarValue::Int32(Some(0))),
2747            std::collections::Bound::Excluded(ScalarValue::Int32(Some(lt_val))),
2748        );
2749        let full_result_lt = full_index
2750            .search(&query_lt, &NoOpMetricsCollector)
2751            .await
2752            .unwrap();
2753        let merged_result_lt = merged_index
2754            .search(&query_lt, &NoOpMetricsCollector)
2755            .await
2756            .unwrap();
2757        assert_eq!(
2758            full_result_lt, merged_result_lt,
2759            "Less than query (<{}) failed",
2760            lt_val
2761        );
2762
2763        // Test 13: Greater than query (implemented using range query, from specified value to maximum)
2764        let gt_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2765        let max_range_val = (3 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2766        let query_gt = SargableQuery::Range(
2767            std::collections::Bound::Excluded(ScalarValue::Int32(Some(gt_val))),
2768            std::collections::Bound::Excluded(ScalarValue::Int32(Some(max_range_val))),
2769        );
2770        let full_result_gt = full_index
2771            .search(&query_gt, &NoOpMetricsCollector)
2772            .await
2773            .unwrap();
2774        let merged_result_gt = merged_index
2775            .search(&query_gt, &NoOpMetricsCollector)
2776            .await
2777            .unwrap();
2778        assert_eq!(
2779            full_result_gt, merged_result_gt,
2780            "Greater than query (>{}) failed",
2781            gt_val
2782        );
2783
2784        // Test 14: Less than or equal query (implemented using range query, including boundary value)
2785        let lte_val = (DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
2786        let query_lte = SargableQuery::Range(
2787            std::collections::Bound::Included(ScalarValue::Int32(Some(0))),
2788            std::collections::Bound::Included(ScalarValue::Int32(Some(lte_val))),
2789        );
2790        let full_result_lte = full_index
2791            .search(&query_lte, &NoOpMetricsCollector)
2792            .await
2793            .unwrap();
2794        let merged_result_lte = merged_index
2795            .search(&query_lte, &NoOpMetricsCollector)
2796            .await
2797            .unwrap();
2798        assert_eq!(
2799            full_result_lte, merged_result_lte,
2800            "Less than or equal query (<={}) failed",
2801            lte_val
2802        );
2803
2804        // Test 15: Greater than or equal query (implemented using range query, including boundary value)
2805        let gte_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2806        let query_gte = SargableQuery::Range(
2807            std::collections::Bound::Included(ScalarValue::Int32(Some(gte_val))),
2808            std::collections::Bound::Excluded(ScalarValue::Int32(Some(max_range_val))),
2809        );
2810        let full_result_gte = full_index
2811            .search(&query_gte, &NoOpMetricsCollector)
2812            .await
2813            .unwrap();
2814        let merged_result_gte = merged_index
2815            .search(&query_gte, &NoOpMetricsCollector)
2816            .await
2817            .unwrap();
2818        assert_eq!(
2819            full_result_gte, merged_result_gte,
2820            "Greater than or equal query (>={}) failed",
2821            gte_val
2822        );
2823    }
2824
2825    #[test]
2826    fn test_extract_partition_id() {
2827        // Test valid partition file names
2828        assert_eq!(
2829            super::extract_partition_id("part_123_page_data.lance").unwrap(),
2830            123
2831        );
2832        assert_eq!(
2833            super::extract_partition_id("part_456_page_lookup.lance").unwrap(),
2834            456
2835        );
2836        assert_eq!(
2837            super::extract_partition_id("part_4294967296_page_data.lance").unwrap(),
2838            4294967296
2839        );
2840
2841        // Test invalid file names
2842        assert!(super::extract_partition_id("invalid_filename.lance").is_err());
2843        assert!(super::extract_partition_id("part_abc_page_data.lance").is_err());
2844        assert!(super::extract_partition_id("part_123").is_err());
2845        assert!(super::extract_partition_id("part_").is_err());
2846    }
2847
2848    #[tokio::test]
2849    async fn test_cleanup_partition_files() {
2850        // Create a test store
2851        let tmpdir = TempObjDir::default();
2852        let test_store: Arc<dyn crate::scalar::IndexStore> = Arc::new(LanceIndexStore::new(
2853            Arc::new(ObjectStore::local()),
2854            tmpdir.clone(),
2855            Arc::new(LanceCache::no_cache()),
2856        ));
2857
2858        // Test files with different patterns
2859        let lookup_files = vec![
2860            "part_123_page_lookup.lance".to_string(),
2861            "invalid_lookup_file.lance".to_string(),
2862            "part_456_page_lookup.lance".to_string(),
2863        ];
2864
2865        let page_files = vec![
2866            "part_123_page_data.lance".to_string(),
2867            "invalid_page_file.lance".to_string(),
2868            "part_456_page_data.lance".to_string(),
2869        ];
2870
2871        // The cleanup function should handle both valid and invalid file patterns gracefully
2872        // This test mainly verifies that the function doesn't panic and handles edge cases
2873        super::cleanup_partition_files(&test_store, &lookup_files, &page_files).await;
2874    }
2875}