Skip to main content

lance_index/scalar/
btree.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{
5    any::Any,
6    cmp::Ordering,
7    collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
8    fmt::{Debug, Display},
9    ops::Bound,
10    sync::Arc,
11};
12
13use super::{
14    AnyQuery, BuiltinIndexType, IndexReader, IndexStore, IndexWriter, MetricsCollector,
15    OldIndexDataFilter, SargableQuery, ScalarIndex, ScalarIndexParams, SearchResult,
16    compute_next_prefix,
17};
18use crate::{Index, IndexType};
19use crate::{
20    frag_reuse::FragReuseIndex,
21    progress::{IndexBuildProgress, noop_progress},
22    scalar::{
23        CreatedIndex, UpdateCriteria,
24        expression::{SargableQueryParser, ScalarQueryParser},
25        registry::{ScalarIndexPlugin, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME},
26    },
27};
28use crate::{metrics::NoOpMetricsCollector, scalar::registry::TrainingCriteria};
29use crate::{pbold, scalar::btree::flat::FlatIndex};
30use arrow_arith::numeric::add;
31use arrow_array::{Array, RecordBatch, UInt32Array, new_empty_array};
32use arrow_schema::{DataType, Field, Schema, SortOptions};
33use async_trait::async_trait;
34use datafusion::physical_plan::{
35    ExecutionPlan, SendableRecordBatchStream,
36    sorts::sort_preserving_merge::SortPreservingMergeExec, stream::RecordBatchStreamAdapter,
37    union::UnionExec,
38};
39use datafusion_common::{DataFusionError, ScalarValue};
40use datafusion_physical_expr::{PhysicalSortExpr, expressions::Column};
41use deepsize::DeepSizeOf;
42use futures::{
43    FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
44    future::BoxFuture,
45    stream::{self},
46};
47use lance_core::{
48    Error, ROW_ID, Result,
49    cache::{CacheKey, LanceCache, WeakLanceCache},
50    error::LanceOptionExt,
51    utils::{
52        mask::NullableRowAddrSet,
53        tokio::get_num_compute_intensive_cpus,
54        tracing::{IO_TYPE_LOAD_SCALAR_PART, TRACE_IO_EVENTS},
55    },
56};
57use lance_datafusion::{
58    chunker::chunk_concat_stream,
59    exec::{LanceExecutionOptions, OneShotExec, execute_plan},
60};
61use lance_io::object_store::ObjectStore;
62use log::{debug, warn};
63use object_store::path::Path;
64use rangemap::RangeInclusiveMap;
65use roaring::RoaringBitmap;
66use serde::{Deserialize, Serialize, Serializer};
67use tracing::{info, instrument};
68
69mod flat;
70
71const BTREE_LOOKUP_NAME: &str = "page_lookup.lance";
72const BTREE_PAGES_NAME: &str = "page_data.lance";
73pub const DEFAULT_BTREE_BATCH_SIZE: u64 = 4096;
74const BATCH_SIZE_META_KEY: &str = "batch_size";
75const DEFAULT_RANGE_PARTITIONED: bool = false;
76const RANGE_PARTITIONED_META_KEY: &str = "range_partitioned";
77const PAGE_NUM_PER_RANGE_PARTITION_META_KEY: &str = "page_num_per_range_partition";
78const BTREE_INDEX_VERSION: u32 = 0;
79pub(crate) const BTREE_VALUES_COLUMN: &str = "values";
80pub(crate) const BTREE_IDS_COLUMN: &str = "ids";
81
82/// Wraps a ScalarValue and implements Ord (ScalarValue only implements PartialOrd)
83#[derive(Clone, Debug)]
84pub struct OrderableScalarValue(pub ScalarValue);
85
86impl DeepSizeOf for OrderableScalarValue {
87    fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
88        // deepsize and size both factor in the size of the ScalarValue
89        self.0.size() - std::mem::size_of::<ScalarValue>()
90    }
91}
92
93impl Display for OrderableScalarValue {
94    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95        std::fmt::Display::fmt(&self.0, f)
96    }
97}
98
99impl PartialEq for OrderableScalarValue {
100    fn eq(&self, other: &Self) -> bool {
101        self.0.eq(&other.0)
102    }
103}
104
105impl Eq for OrderableScalarValue {}
106
107impl PartialOrd for OrderableScalarValue {
108    fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
109        Some(self.cmp(other))
110    }
111}
112
113// manual implementation of `Ord` that panics when asked to compare scalars of different type
114// and always puts nulls before non-nulls (this is consistent with Option<T>'s implementation
115// of Ord)
116//
117// TODO: Consider upstreaming this
118impl Ord for OrderableScalarValue {
119    fn cmp(&self, other: &Self) -> Ordering {
120        use ScalarValue::*;
121        // This purposely doesn't have a catch-all "(_, _)" so that
122        // any newly added enum variant will require editing this list
123        // or else face a compile error
124        match (&self.0, &other.0) {
125            (Decimal32(v1, p1, s1), Decimal32(v2, p2, s2)) => {
126                if p1.eq(p2) && s1.eq(s2) {
127                    v1.cmp(v2)
128                } else {
129                    // Two decimal values can only be compared if they have the same precision and scale.
130                    panic!("Attempt to compare decimals with unequal precision / scale")
131                }
132            }
133            (Decimal32(v1, _, _), Null) => {
134                if v1.is_none() {
135                    Ordering::Equal
136                } else {
137                    Ordering::Greater
138                }
139            }
140            (Decimal32(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
141            (Decimal64(v1, p1, s1), Decimal64(v2, p2, s2)) => {
142                if p1.eq(p2) && s1.eq(s2) {
143                    v1.cmp(v2)
144                } else {
145                    // Two decimal values can only be compared if they have the same precision and scale.
146                    panic!("Attempt to compare decimals with unequal precision / scale")
147                }
148            }
149            (Decimal64(v1, _, _), Null) => {
150                if v1.is_none() {
151                    Ordering::Equal
152                } else {
153                    Ordering::Greater
154                }
155            }
156            (Decimal64(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
157            (Decimal128(v1, p1, s1), Decimal128(v2, p2, s2)) => {
158                if p1.eq(p2) && s1.eq(s2) {
159                    v1.cmp(v2)
160                } else {
161                    // Two decimal values can only be compared if they have the same precision and scale.
162                    panic!("Attempt to compare decimals with unequal precision / scale")
163                }
164            }
165            (Decimal128(v1, _, _), Null) => {
166                if v1.is_none() {
167                    Ordering::Equal
168                } else {
169                    Ordering::Greater
170                }
171            }
172            (Decimal128(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
173            (Decimal256(v1, p1, s1), Decimal256(v2, p2, s2)) => {
174                if p1.eq(p2) && s1.eq(s2) {
175                    v1.cmp(v2)
176                } else {
177                    // Two decimal values can only be compared if they have the same precision and scale.
178                    panic!("Attempt to compare decimals with unequal precision / scale")
179                }
180            }
181            (Decimal256(v1, _, _), Null) => {
182                if v1.is_none() {
183                    Ordering::Equal
184                } else {
185                    Ordering::Greater
186                }
187            }
188            (Decimal256(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
189
190            (Boolean(v1), Boolean(v2)) => v1.cmp(v2),
191            (Boolean(v1), Null) => {
192                if v1.is_none() {
193                    Ordering::Equal
194                } else {
195                    Ordering::Greater
196                }
197            }
198            (Boolean(_), _) => panic!("Attempt to compare boolean with non-boolean"),
199            (Float32(v1), Float32(v2)) => match (v1, v2) {
200                (Some(f1), Some(f2)) => f1.total_cmp(f2),
201                (None, Some(_)) => Ordering::Less,
202                (Some(_), None) => Ordering::Greater,
203                (None, None) => Ordering::Equal,
204            },
205            (Float32(v1), Null) => {
206                if v1.is_none() {
207                    Ordering::Equal
208                } else {
209                    Ordering::Greater
210                }
211            }
212            (Float32(_), _) => panic!("Attempt to compare f32 with non-f32"),
213            (Float64(v1), Float64(v2)) => match (v1, v2) {
214                (Some(f1), Some(f2)) => f1.total_cmp(f2),
215                (None, Some(_)) => Ordering::Less,
216                (Some(_), None) => Ordering::Greater,
217                (None, None) => Ordering::Equal,
218            },
219            (Float64(v1), Null) => {
220                if v1.is_none() {
221                    Ordering::Equal
222                } else {
223                    Ordering::Greater
224                }
225            }
226            (Float64(_), _) => panic!("Attempt to compare f64 with non-f64"),
227            (Float16(v1), Float16(v2)) => match (v1, v2) {
228                (Some(f1), Some(f2)) => f1.total_cmp(f2),
229                (None, Some(_)) => Ordering::Less,
230                (Some(_), None) => Ordering::Greater,
231                (None, None) => Ordering::Equal,
232            },
233            (Float16(v1), Null) => {
234                if v1.is_none() {
235                    Ordering::Equal
236                } else {
237                    Ordering::Greater
238                }
239            }
240            (Float16(_), _) => panic!("Attempt to compare f16 with non-f16"),
241            (Int8(v1), Int8(v2)) => v1.cmp(v2),
242            (Int8(v1), Null) => {
243                if v1.is_none() {
244                    Ordering::Equal
245                } else {
246                    Ordering::Greater
247                }
248            }
249            (Int8(_), _) => panic!("Attempt to compare Int8 with non-Int8"),
250            (Int16(v1), Int16(v2)) => v1.cmp(v2),
251            (Int16(v1), Null) => {
252                if v1.is_none() {
253                    Ordering::Equal
254                } else {
255                    Ordering::Greater
256                }
257            }
258            (Int16(_), _) => panic!("Attempt to compare Int16 with non-Int16"),
259            (Int32(v1), Int32(v2)) => v1.cmp(v2),
260            (Int32(v1), Null) => {
261                if v1.is_none() {
262                    Ordering::Equal
263                } else {
264                    Ordering::Greater
265                }
266            }
267            (Int32(_), _) => panic!("Attempt to compare Int32 with non-Int32"),
268            (Int64(v1), Int64(v2)) => v1.cmp(v2),
269            (Int64(v1), Null) => {
270                if v1.is_none() {
271                    Ordering::Equal
272                } else {
273                    Ordering::Greater
274                }
275            }
276            (Int64(_), _) => panic!("Attempt to compare Int64 with non-Int64"),
277            (UInt8(v1), UInt8(v2)) => v1.cmp(v2),
278            (UInt8(v1), Null) => {
279                if v1.is_none() {
280                    Ordering::Equal
281                } else {
282                    Ordering::Greater
283                }
284            }
285            (UInt8(_), _) => panic!("Attempt to compare UInt8 with non-UInt8"),
286            (UInt16(v1), UInt16(v2)) => v1.cmp(v2),
287            (UInt16(v1), Null) => {
288                if v1.is_none() {
289                    Ordering::Equal
290                } else {
291                    Ordering::Greater
292                }
293            }
294            (UInt16(_), _) => panic!("Attempt to compare UInt16 with non-UInt16"),
295            (UInt32(v1), UInt32(v2)) => v1.cmp(v2),
296            (UInt32(v1), Null) => {
297                if v1.is_none() {
298                    Ordering::Equal
299                } else {
300                    Ordering::Greater
301                }
302            }
303            (UInt32(_), _) => panic!("Attempt to compare UInt32 with non-UInt32"),
304            (UInt64(v1), UInt64(v2)) => v1.cmp(v2),
305            (UInt64(v1), Null) => {
306                if v1.is_none() {
307                    Ordering::Equal
308                } else {
309                    Ordering::Greater
310                }
311            }
312            (UInt64(_), _) => panic!("Attempt to compare UInt64 with non-UInt64"),
313            (Utf8(v1) | Utf8View(v1) | LargeUtf8(v1), Utf8(v2) | Utf8View(v2) | LargeUtf8(v2)) => {
314                v1.cmp(v2)
315            }
316            (Utf8(v1) | Utf8View(v1) | LargeUtf8(v1), Null) => {
317                if v1.is_none() {
318                    Ordering::Equal
319                } else {
320                    Ordering::Greater
321                }
322            }
323            (Utf8(_) | Utf8View(_) | LargeUtf8(_), _) => {
324                panic!("Attempt to compare Utf8 with non-Utf8")
325            }
326            (
327                Binary(v1) | LargeBinary(v1) | BinaryView(v1),
328                Binary(v2) | LargeBinary(v2) | BinaryView(v2),
329            ) => v1.cmp(v2),
330            (Binary(v1) | LargeBinary(v1) | BinaryView(v1), Null) => {
331                if v1.is_none() {
332                    Ordering::Equal
333                } else {
334                    Ordering::Greater
335                }
336            }
337            (Binary(_) | LargeBinary(_) | BinaryView(_), _) => {
338                panic!("Attempt to compare Binary with non-Binary")
339            }
340            (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.cmp(v2),
341            (FixedSizeBinary(_, v1), Null) => {
342                if v1.is_none() {
343                    Ordering::Equal
344                } else {
345                    Ordering::Greater
346                }
347            }
348            (FixedSizeBinary(_, _), _) => {
349                panic!("Attempt to compare FixedSizeBinary with non-FixedSizeBinary")
350            }
351            (FixedSizeList(left), FixedSizeList(right)) => {
352                if left.eq(right) {
353                    todo!()
354                } else {
355                    panic!(
356                        "Attempt to compare fixed size list elements with different widths/fields"
357                    )
358                }
359            }
360            (FixedSizeList(left), Null) => {
361                if left.is_null(0) {
362                    Ordering::Equal
363                } else {
364                    Ordering::Greater
365                }
366            }
367            (FixedSizeList(_), _) => {
368                panic!("Attempt to compare FixedSizeList with non-FixedSizeList")
369            }
370            (List(_), List(_)) => todo!(),
371            (List(left), Null) => {
372                if left.is_null(0) {
373                    Ordering::Equal
374                } else {
375                    Ordering::Greater
376                }
377            }
378            (List(_), _) => {
379                panic!("Attempt to compare List with non-List")
380            }
381            (LargeList(_), _) => todo!(),
382            (Map(_), Map(_)) => todo!(),
383            (Map(left), Null) => {
384                if left.is_null(0) {
385                    Ordering::Equal
386                } else {
387                    Ordering::Greater
388                }
389            }
390            (Map(_), _) => {
391                panic!("Attempt to compare Map with non-Map")
392            }
393            (Date32(v1), Date32(v2)) => v1.cmp(v2),
394            (Date32(v1), Null) => {
395                if v1.is_none() {
396                    Ordering::Equal
397                } else {
398                    Ordering::Greater
399                }
400            }
401            (Date32(_), _) => panic!("Attempt to compare Date32 with non-Date32"),
402            (Date64(v1), Date64(v2)) => v1.cmp(v2),
403            (Date64(v1), Null) => {
404                if v1.is_none() {
405                    Ordering::Equal
406                } else {
407                    Ordering::Greater
408                }
409            }
410            (Date64(_), _) => panic!("Attempt to compare Date64 with non-Date64"),
411            (Time32Second(v1), Time32Second(v2)) => v1.cmp(v2),
412            (Time32Second(v1), Null) => {
413                if v1.is_none() {
414                    Ordering::Equal
415                } else {
416                    Ordering::Greater
417                }
418            }
419            (Time32Second(_), _) => panic!("Attempt to compare Time32Second with non-Time32Second"),
420            (Time32Millisecond(v1), Time32Millisecond(v2)) => v1.cmp(v2),
421            (Time32Millisecond(v1), Null) => {
422                if v1.is_none() {
423                    Ordering::Equal
424                } else {
425                    Ordering::Greater
426                }
427            }
428            (Time32Millisecond(_), _) => {
429                panic!("Attempt to compare Time32Millisecond with non-Time32Millisecond")
430            }
431            (Time64Microsecond(v1), Time64Microsecond(v2)) => v1.cmp(v2),
432            (Time64Microsecond(v1), Null) => {
433                if v1.is_none() {
434                    Ordering::Equal
435                } else {
436                    Ordering::Greater
437                }
438            }
439            (Time64Microsecond(_), _) => {
440                panic!("Attempt to compare Time64Microsecond with non-Time64Microsecond")
441            }
442            (Time64Nanosecond(v1), Time64Nanosecond(v2)) => v1.cmp(v2),
443            (Time64Nanosecond(v1), Null) => {
444                if v1.is_none() {
445                    Ordering::Equal
446                } else {
447                    Ordering::Greater
448                }
449            }
450            (Time64Nanosecond(_), _) => {
451                panic!("Attempt to compare Time64Nanosecond with non-Time64Nanosecond")
452            }
453            (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.cmp(v2),
454            (TimestampSecond(v1, _), Null) => {
455                if v1.is_none() {
456                    Ordering::Equal
457                } else {
458                    Ordering::Greater
459                }
460            }
461            (TimestampSecond(_, _), _) => {
462                panic!("Attempt to compare TimestampSecond with non-TimestampSecond")
463            }
464            (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.cmp(v2),
465            (TimestampMillisecond(v1, _), Null) => {
466                if v1.is_none() {
467                    Ordering::Equal
468                } else {
469                    Ordering::Greater
470                }
471            }
472            (TimestampMillisecond(_, _), _) => {
473                panic!("Attempt to compare TimestampMillisecond with non-TimestampMillisecond")
474            }
475            (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => v1.cmp(v2),
476            (TimestampMicrosecond(v1, _), Null) => {
477                if v1.is_none() {
478                    Ordering::Equal
479                } else {
480                    Ordering::Greater
481                }
482            }
483            (TimestampMicrosecond(_, _), _) => {
484                panic!("Attempt to compare TimestampMicrosecond with non-TimestampMicrosecond")
485            }
486            (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => v1.cmp(v2),
487            (TimestampNanosecond(v1, _), Null) => {
488                if v1.is_none() {
489                    Ordering::Equal
490                } else {
491                    Ordering::Greater
492                }
493            }
494            (TimestampNanosecond(_, _), _) => {
495                panic!("Attempt to compare TimestampNanosecond with non-TimestampNanosecond")
496            }
497            (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.cmp(v2),
498            (IntervalYearMonth(v1), Null) => {
499                if v1.is_none() {
500                    Ordering::Equal
501                } else {
502                    Ordering::Greater
503                }
504            }
505            (IntervalYearMonth(_), _) => {
506                panic!("Attempt to compare IntervalYearMonth with non-IntervalYearMonth")
507            }
508            (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.cmp(v2),
509            (IntervalDayTime(v1), Null) => {
510                if v1.is_none() {
511                    Ordering::Equal
512                } else {
513                    Ordering::Greater
514                }
515            }
516            (IntervalDayTime(_), _) => {
517                panic!("Attempt to compare IntervalDayTime with non-IntervalDayTime")
518            }
519            (IntervalMonthDayNano(v1), IntervalMonthDayNano(v2)) => v1.cmp(v2),
520            (IntervalMonthDayNano(v1), Null) => {
521                if v1.is_none() {
522                    Ordering::Equal
523                } else {
524                    Ordering::Greater
525                }
526            }
527            (IntervalMonthDayNano(_), _) => {
528                panic!("Attempt to compare IntervalMonthDayNano with non-IntervalMonthDayNano")
529            }
530            (DurationSecond(v1), DurationSecond(v2)) => v1.cmp(v2),
531            (DurationSecond(v1), Null) => {
532                if v1.is_none() {
533                    Ordering::Equal
534                } else {
535                    Ordering::Greater
536                }
537            }
538            (DurationSecond(_), _) => {
539                panic!("Attempt to compare DurationSecond with non-DurationSecond")
540            }
541            (DurationMillisecond(v1), DurationMillisecond(v2)) => v1.cmp(v2),
542            (DurationMillisecond(v1), Null) => {
543                if v1.is_none() {
544                    Ordering::Equal
545                } else {
546                    Ordering::Greater
547                }
548            }
549            (DurationMillisecond(_), _) => {
550                panic!("Attempt to compare DurationMillisecond with non-DurationMillisecond")
551            }
552            (DurationMicrosecond(v1), DurationMicrosecond(v2)) => v1.cmp(v2),
553            (DurationMicrosecond(v1), Null) => {
554                if v1.is_none() {
555                    Ordering::Equal
556                } else {
557                    Ordering::Greater
558                }
559            }
560            (DurationMicrosecond(_), _) => {
561                panic!("Attempt to compare DurationMicrosecond with non-DurationMicrosecond")
562            }
563            (DurationNanosecond(v1), DurationNanosecond(v2)) => v1.cmp(v2),
564            (DurationNanosecond(v1), Null) => {
565                if v1.is_none() {
566                    Ordering::Equal
567                } else {
568                    Ordering::Greater
569                }
570            }
571            (DurationNanosecond(_), _) => {
572                panic!("Attempt to compare DurationNanosecond with non-DurationNanosecond")
573            }
574            (Struct(_arr), Struct(_arr2)) => todo!(),
575            (Struct(arr), Null) => {
576                if arr.is_empty() {
577                    Ordering::Equal
578                } else {
579                    Ordering::Greater
580                }
581            }
582            (Struct(_arr), _) => panic!("Attempt to compare Struct with non-Struct"),
583            (Dictionary(_k1, _v1), Dictionary(_k2, _v2)) => todo!(),
584            (Dictionary(_, v1), Null) => Self(*v1.clone()).cmp(&Self(ScalarValue::Null)),
585            (Dictionary(_, _), _) => panic!("Attempt to compare Dictionary with non-Dictionary"),
586            // What would a btree of unions even look like?  May not be possible.
587            (Union(_, _, _), _) => todo!("Support for union scalars"),
588            (RunEndEncoded(_, _, _), _) => {
589                todo!("Support for run-end encoded scalars")
590            }
591            (Null, Null) => Ordering::Equal,
592            (Null, _) => todo!(),
593        }
594    }
595}
596
597#[derive(Debug, DeepSizeOf, PartialEq, Eq)]
598struct PageRecord {
599    max: OrderableScalarValue,
600    page_number: u32,
601}
602
603trait BTreeMapExt<K, V> {
604    fn largest_node_less(&self, key: &K) -> Option<(&K, &V)>;
605}
606
607impl<K: Ord, V> BTreeMapExt<K, V> for BTreeMap<K, V> {
608    fn largest_node_less(&self, key: &K) -> Option<(&K, &V)> {
609        self.range((Bound::Unbounded, Bound::Excluded(key)))
610            .next_back()
611    }
612}
613
614/// An in-memory structure that can quickly satisfy scalar queries using a btree of ScalarValue
615#[derive(Debug, DeepSizeOf, PartialEq, Eq)]
616pub struct BTreeLookup {
617    tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>,
618    /// Pages where the value may be null (does not include all_null_pages)
619    null_pages: Vec<u32>,
620    /// Pages that are entirely null
621    all_null_pages: Vec<u32>,
622}
623
624impl BTreeLookup {
625    fn empty() -> Self {
626        Self {
627            tree: BTreeMap::new(),
628            null_pages: Vec::new(),
629            all_null_pages: Vec::new(),
630        }
631    }
632}
633
634#[derive(Debug, Copy, Clone)]
635enum Matches {
636    Some(u32),
637    All(u32),
638}
639
640impl Matches {
641    fn page_id(&self) -> u32 {
642        match self {
643            Self::Some(page_id) => *page_id,
644            Self::All(page_id) => *page_id,
645        }
646    }
647}
648
649impl BTreeLookup {
650    fn new(
651        tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>,
652        null_pages: Vec<u32>,
653        all_null_pages: Vec<u32>,
654    ) -> Self {
655        Self {
656            tree,
657            null_pages,
658            all_null_pages,
659        }
660    }
661
662    // All pages that could have a value equal to val
663    fn pages_eq(&self, query: &OrderableScalarValue) -> Vec<Matches> {
664        if query.0.is_null() {
665            self.pages_null()
666        } else {
667            self.pages_between((Bound::Included(query), Bound::Excluded(query)))
668        }
669    }
670
671    // All pages that could have a value equal to one of the values
672    fn pages_in(&self, values: impl IntoIterator<Item = OrderableScalarValue>) -> Vec<Matches> {
673        // TODO: Right now we convert all Matches::All into Matches::Some.  We could refine this.
674        // It would improve performance on low cardinality data.
675        let page_lists = values
676            .into_iter()
677            .map(|val| {
678                self.pages_eq(&val)
679                    .into_iter()
680                    .map(|matches| matches.page_id())
681            })
682            .collect::<Vec<_>>();
683        let total_size = page_lists.iter().map(|set| set.len()).sum();
684        let mut heap = BinaryHeap::with_capacity(total_size);
685        for page_list in page_lists {
686            heap.extend(page_list);
687        }
688        let mut all_pages = heap.into_sorted_vec();
689        all_pages.dedup();
690        all_pages.into_iter().map(Matches::Some).collect()
691    }
692
693    // All pages that could have a value in the range
694    fn pages_between(
695        &self,
696        range: (Bound<&OrderableScalarValue>, Bound<&OrderableScalarValue>),
697    ) -> Vec<Matches> {
698        // We need to grab a little bit left of the given range because the query might be 7
699        // and the first page might be something like 5-10.
700        let lower_bound = match range.0 {
701            Bound::Unbounded => Bound::Unbounded,
702            // It doesn't matter if the bound is exclusive or inclusive.  We are going to grab
703            // the first node whose min is strictly less than the given bound.  Then we grab
704            // all nodes greater than or equal to that
705            //
706            // We have to peek a bit to the left because we might have something like a lower
707            // bound of 7 and there is a page [5-10] we want to search for.
708            Bound::Included(lower) => self
709                .tree
710                .largest_node_less(lower)
711                .map(|val| Bound::Included(val.0))
712                .unwrap_or(Bound::Unbounded),
713            Bound::Excluded(lower) => self
714                .tree
715                .largest_node_less(lower)
716                .map(|val| Bound::Included(val.0))
717                .unwrap_or(Bound::Unbounded),
718        };
719        let upper_bound = match range.1 {
720            Bound::Unbounded => Bound::Unbounded,
721            Bound::Included(upper) => Bound::Included(upper),
722            // Even if the upper bound is excluded we need to include it on an [x, x) query.  This is because the
723            // query might be [x, x).  Our lower bound might find some [a-x] bucket and we still
724            // want to include any [x, z] bucket.
725            //
726            // We could be slightly more accurate here and only include the upper bound if the lower bound
727            // is defined, inclusive, and equal to the upper bound.  However, let's keep it simple for now.  This
728            // should only affect the probably rare case that our query is a true range query and the value
729            // matches an upper bound.  This will all be moot if/when we merge pages.
730            Bound::Excluded(upper) => Bound::Included(upper),
731        };
732
733        match (lower_bound, upper_bound) {
734            (Bound::Excluded(lower), Bound::Excluded(upper))
735            | (Bound::Excluded(lower), Bound::Included(upper))
736            | (Bound::Included(lower), Bound::Excluded(upper)) => {
737                // It's not really clear what (Included(5), Excluded(5)) would mean so we
738                // interpret it as an empty range which matches rust's BTreeMap behavior
739                if lower >= upper {
740                    return vec![];
741                }
742            }
743            (Bound::Included(lower), Bound::Included(upper)) => {
744                if lower > upper {
745                    return vec![];
746                }
747            }
748            _ => {}
749        }
750
751        let mut matches = Vec::new();
752
753        for (min, page_records) in self.tree.range((lower_bound, upper_bound)) {
754            for page_record in page_records {
755                match lower_bound {
756                    Bound::Unbounded => {}
757                    Bound::Included(lower) => {
758                        if page_record.max.cmp(lower) == Ordering::Less {
759                            continue;
760                        }
761                    }
762                    Bound::Excluded(lower) => {
763                        if page_record.max.cmp(lower) != Ordering::Greater {
764                            continue;
765                        }
766                    }
767                }
768                // At this point we know the page record matches at least some values.
769                // We should test to see if ALL values are a match.
770
771                if min.0.is_null() || page_record.max.0.is_null() {
772                    // If there are nulls then we just use Matches::Some
773                    matches.push(Matches::Some(page_record.page_number));
774                    continue;
775                }
776
777                match range.0 {
778                    // range.0 < X therefore if the smallest value is not strictly greater than
779                    // the lower bound we only have partial match
780                    Bound::Excluded(lower) => {
781                        if min.cmp(lower) != Ordering::Greater {
782                            matches.push(Matches::Some(page_record.page_number));
783                            continue;
784                        }
785                    }
786                    // range.0 <= X therefore if the smallest value is not greater than or equal
787                    // to the lower bound we only have partial match
788                    Bound::Included(lower) => {
789                        if min.cmp(lower) == Ordering::Less {
790                            matches.push(Matches::Some(page_record.page_number));
791                            continue;
792                        }
793                    }
794                    Bound::Unbounded => {}
795                }
796                match range.1 {
797                    // X < range.1 therefore if the largest value is not strictly less than
798                    // the upper bound we only have partial match
799                    Bound::Excluded(upper) => {
800                        if page_record.max.cmp(upper) != Ordering::Less {
801                            matches.push(Matches::Some(page_record.page_number));
802                            continue;
803                        }
804                    }
805                    // X <= range.1 therefore if the largest value is not less than or equal to
806                    // the upper bound we only have partial match
807                    Bound::Included(upper) => {
808                        if page_record.max.cmp(upper) == Ordering::Greater {
809                            matches.push(Matches::Some(page_record.page_number));
810                            continue;
811                        }
812                    }
813                    Bound::Unbounded => {}
814                }
815                // The min is greater than the lower bound and the max is less than the upper bound
816                // so we have a full match
817                matches.push(Matches::All(page_record.page_number));
818            }
819        }
820
821        matches
822    }
823
824    fn pages_null(&self) -> Vec<Matches> {
825        self.null_pages
826            .iter()
827            .map(|page_id| Matches::Some(*page_id))
828            .chain(self.all_null_pages.iter().copied().map(Matches::All))
829            .collect()
830    }
831}
832
833// We only need to open a file reader for pages if we need to load a page.  If all
834// pages are cached we don't open it.  If we do open it we should only open it once.
835#[derive(Clone)]
836struct LazyIndexReader {
837    index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
838    store: Arc<dyn IndexStore>,
839    ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
840}
841
842impl LazyIndexReader {
843    fn new(
844        store: Arc<dyn IndexStore>,
845        ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
846    ) -> Self {
847        Self {
848            index_reader: Arc::new(tokio::sync::Mutex::new(None)),
849            store,
850            ranges_to_files,
851        }
852    }
853
854    async fn get(&self) -> Result<Arc<dyn IndexReader>> {
855        let mut reader = self.index_reader.lock().await;
856        if reader.is_none() {
857            let index_reader = if let Some(ranges_to_files) = &self.ranges_to_files {
858                Arc::new(LazyRangedIndexReader::new(
859                    self.store.clone(),
860                    ranges_to_files.clone(),
861                ))
862            } else {
863                self.store.open_index_file(BTREE_PAGES_NAME).await?
864            };
865            *reader = Some(index_reader);
866        }
867        Ok(reader.as_ref().unwrap().clone())
868    }
869}
870
871/// Index reader to dispatch page query to corresponding ranged page-files.
872struct LazyRangedIndexReader {
873    #[allow(clippy::type_complexity)]
874    readers:
875        Arc<tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::OnceCell<Arc<dyn IndexReader>>>>>>,
876    store: Arc<dyn IndexStore>,
877    ranges_to_files: Arc<RangeInclusiveMap<u32, (String, u32)>>,
878}
879
880impl LazyRangedIndexReader {
881    fn new(
882        store: Arc<dyn IndexStore>,
883        ranges_to_files: Arc<RangeInclusiveMap<u32, (String, u32)>>,
884    ) -> Self {
885        Self {
886            readers: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
887            store,
888            ranges_to_files,
889        }
890    }
891
892    async fn get_reader(&self, file_name: &str) -> Result<Arc<dyn IndexReader>> {
893        let reader_cell = {
894            let mut guard = self.readers.lock().await;
895            guard
896                .entry(file_name.to_string())
897                .or_insert_with(|| Arc::new(tokio::sync::OnceCell::new()))
898                .clone()
899        };
900        let reader = reader_cell
901            .get_or_try_init(|| async { self.store.open_index_file(file_name).await })
902            .await?;
903        Ok(reader.clone())
904    }
905
906    async fn get_reader_and_local_page_idx(
907        &self,
908        page_idx: u32,
909    ) -> Result<(Arc<dyn IndexReader>, u32)> {
910        let (page_file_name, offset) = self.ranges_to_files.get(&page_idx).ok_or_else(|| {
911            Error::internal(format!(
912                "Unexpected page index, index {} is out of range.",
913                page_idx
914            ))
915        })?;
916        let reader = self.get_reader(page_file_name).await?;
917        Ok((reader.clone(), page_idx - *offset))
918    }
919}
920
921#[async_trait]
922impl IndexReader for LazyRangedIndexReader {
923    async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch> {
924        let (reader, local_page_idx) = self.get_reader_and_local_page_idx(n as u32).await?;
925        reader
926            .read_record_batch(local_page_idx as u64, batch_size)
927            .await
928    }
929
930    async fn read_range(
931        &self,
932        _range: std::ops::Range<usize>,
933        _projection: Option<&[&str]>,
934    ) -> Result<RecordBatch> {
935        unimplemented!("Read range is not implemented for lazy page file reader.");
936    }
937
938    async fn num_batches(&self, batch_size: u64) -> u32 {
939        let mut total_batches = 0;
940        for (_, (file_name, _)) in self.ranges_to_files.iter() {
941            let reader = self
942                .get_reader(file_name)
943                .await
944                .unwrap_or_else(|_| panic!("Cannot open page file {}.", file_name));
945            total_batches += reader.as_ref().num_batches(batch_size).await;
946        }
947        total_batches
948    }
949
950    fn num_rows(&self) -> usize {
951        unimplemented!("only async functions are available for lazy page index reader.");
952    }
953
954    fn schema(&self) -> &lance_core::datatypes::Schema {
955        unimplemented!("only async functions are available for lazy page index reader.");
956    }
957}
958
959/// A btree index satisfies scalar queries using a b tree
960///
961/// The upper layers of the btree are expected to be cached and, when unloaded,
962/// are stored in a btree structure in memory.  The leaves of the btree are left
963/// to be searched by some other kind of index (currently a flat search).
964///
965/// This strikes a balance between an expensive memory structure containing all
966/// of the values and an expensive disk structure that can't be efficiently searched.
967///
968/// For example, given 1Bi values we can store 256Ki leaves of size 4Ki.  We only
969/// need memory space for 256Ki leaves (depends on the data type but usually a few MiB
970/// at most) and can narrow our search to 4Ki values.
971///
972// Cache key implementation for type-safe cache access
973#[derive(Debug, Clone, DeepSizeOf)]
974pub struct CachedScalarIndex(Arc<dyn ScalarIndex>);
975
976impl CachedScalarIndex {
977    pub fn new(index: Arc<dyn ScalarIndex>) -> Self {
978        Self(index)
979    }
980
981    pub fn into_inner(self) -> Arc<dyn ScalarIndex> {
982        self.0
983    }
984}
985
986#[derive(Debug, Clone)]
987pub struct BTreePageKey {
988    pub page_number: u32,
989}
990
991impl CacheKey for BTreePageKey {
992    type ValueType = FlatIndex;
993
994    fn key(&self) -> std::borrow::Cow<'_, str> {
995        format!("page-{}", self.page_number).into()
996    }
997
998    fn type_name() -> &'static str {
999        "BTreePage"
1000    }
1001}
1002
1003/// Note: this is very similar to the IVF index except we store the IVF part in a btree
1004/// for faster lookup
1005#[derive(Clone, Debug)]
1006pub struct BTreeIndex {
1007    page_lookup: Arc<BTreeLookup>,
1008    index_cache: WeakLanceCache,
1009    store: Arc<dyn IndexStore>,
1010    data_type: DataType,
1011    batch_size: u64,
1012
1013    /// A map that translates a global_page_idx stored in the single lookup file into the
1014    /// specific page file and local_page_idx.
1015    ///
1016    /// This is the key data structure used for efficiently reading data from a merged,
1017    /// range-partitioned index. It stores mappings from a contiguous range of global page
1018    /// indices to a tuple containing:
1019    ///
1020    /// 1. The path to the corresponding page file (e.g., `part_i_page_file.lance`).
1021    /// 2. The start offset that was used to calculate the local_page_idx for that partition.
1022    ///
1023    /// When a query needs to access a specific page using its `global_page_idx`:
1024    ///
1025    /// 1. The `global_page_idx` is used to look up its range in this `RangeInclusiveMap`,
1026    ///    and the map returns the `(file_path, start_offset)` tuple for that range.
1027    /// 3. The `local_page_idx` is calculated using the formula:
1028    ///    `local_page_idx = global_page_idx - start_offset`.
1029    /// 4. With the `file_path` and `local_page_idx`, the system can directly open the
1030    ///    correct partition file and read the specific page.
1031    ///
1032    /// # Example
1033    ///
1034    /// If the map contains an entry `(100..=199) => ("part_2_page_file.lance", 100)`, and we
1035    /// need to find `global_page_idx = 142`:
1036    ///
1037    /// - The map finds that 142 falls within the range `100..=199`, and it returns
1038    ///   `("part_2_page_file.lance", 100)`.
1039    /// - The local page_idx is calculated: `142 - 100 = 42`.
1040    /// - The system now knows to read page `42` from the file `part_2_page_file.lance`.
1041    ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
1042    frag_reuse_index: Option<Arc<FragReuseIndex>>,
1043}
1044
1045impl DeepSizeOf for BTreeIndex {
1046    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
1047        // We don't include the index cache, or anything stored in it. For example:
1048        // sub_index and fri.
1049        self.page_lookup.deep_size_of_children(context) + self.store.deep_size_of_children(context)
1050    }
1051}
1052
1053impl BTreeIndex {
1054    #[allow(clippy::too_many_arguments)]
1055    fn new(
1056        page_lookup: Arc<BTreeLookup>,
1057        store: Arc<dyn IndexStore>,
1058        data_type: DataType,
1059        index_cache: WeakLanceCache,
1060        batch_size: u64,
1061        ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
1062        frag_reuse_index: Option<Arc<FragReuseIndex>>,
1063    ) -> Self {
1064        Self {
1065            page_lookup,
1066            store,
1067            data_type,
1068            index_cache,
1069            batch_size,
1070            ranges_to_files,
1071            frag_reuse_index,
1072        }
1073    }
1074
1075    async fn lookup_page(
1076        &self,
1077        page_number: u32,
1078        index_reader: LazyIndexReader,
1079        metrics: &dyn MetricsCollector,
1080    ) -> Result<Arc<FlatIndex>> {
1081        self.index_cache
1082            .get_or_insert_with_key(BTreePageKey { page_number }, move || async move {
1083                self.read_page(page_number, index_reader, metrics).await
1084            })
1085            .await
1086    }
1087
1088    #[instrument(level = "debug", skip_all)]
1089    async fn read_page(
1090        &self,
1091        page_number: u32,
1092        index_reader: LazyIndexReader,
1093        metrics: &dyn MetricsCollector,
1094    ) -> Result<FlatIndex> {
1095        metrics.record_part_load();
1096        info!(target: TRACE_IO_EVENTS, r#type=IO_TYPE_LOAD_SCALAR_PART, index_type="btree", part_id=page_number);
1097        let index_reader = index_reader.get().await?;
1098        let mut serialized_page = index_reader
1099            .read_record_batch(page_number as u64, self.batch_size)
1100            .await?;
1101        if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
1102            serialized_page =
1103                frag_reuse_index_ref.remap_row_ids_record_batch(serialized_page, 1)?;
1104        }
1105        FlatIndex::try_new(serialized_page)
1106    }
1107
1108    async fn search_page(
1109        &self,
1110        query: &SargableQuery,
1111        matches: Matches,
1112        index_reader: LazyIndexReader,
1113        metrics: &dyn MetricsCollector,
1114    ) -> Result<NullableRowAddrSet> {
1115        let subindex = self
1116            .lookup_page(matches.page_id(), index_reader, metrics)
1117            .await?;
1118
1119        match matches {
1120            Matches::Some(_) => {
1121                // TODO: If this is an IN query we can perhaps simplify the subindex query by restricting it to the
1122                // values that might be in the page.  E.g. if we are searching for X IN [5, 3, 7] and five is in pages
1123                // 1 and 2 and three is in page 2 and seven is in pages 8 and 9, then when searching page 2 we only need
1124                // to search for X IN [5, 3]
1125                subindex.search(query, metrics)
1126            }
1127            Matches::All(_) => Ok(match query {
1128                // This means we hit an all-null page so just grab all row ids as true
1129                SargableQuery::IsNull() => subindex.all_ignore_nulls(),
1130                _ => subindex.all(),
1131            }),
1132        }
1133    }
1134
1135    #[instrument(level = "debug", skip_all)]
1136    fn try_from_serialized(
1137        data: RecordBatch,
1138        store: Arc<dyn IndexStore>,
1139        index_cache: &LanceCache,
1140        batch_size: u64,
1141        ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
1142        frag_reuse_index: Option<Arc<FragReuseIndex>>,
1143    ) -> Result<Self> {
1144        let mut map = BTreeMap::<OrderableScalarValue, Vec<PageRecord>>::new();
1145        // Pages that have at least one null value
1146        let mut null_pages = Vec::<u32>::new();
1147        // Pages that are entirely null
1148        let mut all_null_pages = Vec::<u32>::new();
1149
1150        if data.num_rows() == 0 {
1151            let data_type = data.column(0).data_type().clone();
1152            let page_lookup = Arc::new(BTreeLookup::empty());
1153            return Ok(Self::new(
1154                page_lookup,
1155                store,
1156                data_type,
1157                WeakLanceCache::from(index_cache),
1158                batch_size,
1159                ranges_to_files,
1160                frag_reuse_index,
1161            ));
1162        }
1163
1164        let mins = data.column(0);
1165        let maxs = data.column(1);
1166        let null_counts = data
1167            .column(2)
1168            .as_any()
1169            .downcast_ref::<UInt32Array>()
1170            .unwrap();
1171        let page_numbers = data
1172            .column(3)
1173            .as_any()
1174            .downcast_ref::<UInt32Array>()
1175            .unwrap();
1176
1177        for idx in 0..data.num_rows() {
1178            let min = OrderableScalarValue(ScalarValue::try_from_array(&mins, idx)?);
1179            let max = OrderableScalarValue(ScalarValue::try_from_array(&maxs, idx)?);
1180            let null_count = null_counts.values()[idx];
1181            let page_number = page_numbers.values()[idx];
1182
1183            // If the page is entirely null don't even bother putting it in the tree
1184            if max.0.is_null() {
1185                all_null_pages.push(page_number);
1186                // continue so we don't add it to the null_pages
1187                continue;
1188            } else {
1189                map.entry(min)
1190                    .or_default()
1191                    .push(PageRecord { max, page_number });
1192            }
1193
1194            if null_count > 0 {
1195                null_pages.push(page_number);
1196            }
1197        }
1198
1199        let last_max = ScalarValue::try_from_array(&maxs, data.num_rows() - 1)?;
1200        map.entry(OrderableScalarValue(last_max)).or_default();
1201
1202        let data_type = mins.data_type();
1203
1204        let page_lookup = Arc::new(BTreeLookup::new(map, null_pages, all_null_pages));
1205
1206        Ok(Self::new(
1207            page_lookup,
1208            store,
1209            data_type.clone(),
1210            WeakLanceCache::from(index_cache),
1211            batch_size,
1212            ranges_to_files,
1213            frag_reuse_index,
1214        ))
1215    }
1216
1217    async fn load(
1218        store: Arc<dyn IndexStore>,
1219        frag_reuse_index: Option<Arc<FragReuseIndex>>,
1220        index_cache: &LanceCache,
1221    ) -> Result<Arc<Self>> {
1222        let page_lookup_file = store.open_index_file(BTREE_LOOKUP_NAME).await?;
1223        let num_rows_in_lookup = page_lookup_file.num_rows();
1224        let serialized_lookup = page_lookup_file
1225            .read_range(0..num_rows_in_lookup, None)
1226            .await?;
1227        let file_schema = page_lookup_file.schema();
1228        let batch_size = file_schema
1229            .metadata
1230            .get(BATCH_SIZE_META_KEY)
1231            .map(|bs| bs.parse().unwrap_or(DEFAULT_BTREE_BATCH_SIZE))
1232            .unwrap_or(DEFAULT_BTREE_BATCH_SIZE);
1233
1234        let range_partitioned = file_schema
1235            .metadata
1236            .get(RANGE_PARTITIONED_META_KEY)
1237            .map(|bs| bs.parse().unwrap_or(DEFAULT_RANGE_PARTITIONED))
1238            .unwrap_or(DEFAULT_RANGE_PARTITIONED);
1239        // For range-partitioned indices, construct the `ranges_to_files` map.
1240        // This converts the list of (partition ID, page count) from metadata into a map
1241        // from a global page range to its corresponding file and starting offset.
1242        let ranges_to_files = if range_partitioned {
1243            let part_sizes_str = file_schema
1244            .metadata
1245            .get(PAGE_NUM_PER_RANGE_PARTITION_META_KEY)
1246            .expect("Range-partitioned Btree lookup file must have page-number-per-range-file metadata!");
1247            let part_sizes_vec: Vec<(u64, u32)> = serde_json::from_str(part_sizes_str)?;
1248            let mut offset: u32 = 0;
1249
1250            let range_map = part_sizes_vec
1251                .into_iter()
1252                .map(|(id, size)| {
1253                    let range = offset..=(offset + size - 1);
1254                    let file_with_size = (part_page_data_file_path(id), offset);
1255                    offset += size;
1256                    (range, file_with_size)
1257                })
1258                .collect();
1259
1260            Some(Arc::new(range_map))
1261        } else {
1262            None
1263        };
1264
1265        Ok(Arc::new(Self::try_from_serialized(
1266            serialized_lookup,
1267            store,
1268            index_cache,
1269            batch_size,
1270            ranges_to_files,
1271            frag_reuse_index,
1272        )?))
1273    }
1274
1275    // For legacy reasons a btree index expects the training input to use value/_rowid
1276    fn train_schema(&self) -> Schema {
1277        let value_field = Field::new(VALUE_COLUMN_NAME, self.data_type.clone(), true);
1278        let row_id_field = Field::new(ROW_ID, DataType::UInt64, false);
1279        Schema::new(vec![value_field, row_id_field])
1280    }
1281
1282    /// Create a stream of all the data in the index, in the same format used to train the index
1283    async fn into_data_stream(self) -> Result<SendableRecordBatchStream> {
1284        let lazy_reader = LazyIndexReader::new(self.store.clone(), self.ranges_to_files.clone());
1285        let reader = lazy_reader.get().await?;
1286        let new_schema = Arc::new(self.train_schema());
1287        let new_schema_clone = new_schema.clone();
1288        let reader_stream = IndexReaderStream::new(reader, self.batch_size).await;
1289        let batches = reader_stream
1290            .map(|fut| fut.map_err(DataFusionError::from))
1291            .buffered(self.store.io_parallelism())
1292            .map_ok(move |batch| {
1293                RecordBatch::try_new(
1294                    new_schema.clone(),
1295                    vec![batch.column(0).clone(), batch.column(1).clone()],
1296                )
1297                .unwrap()
1298            })
1299            .boxed();
1300        Ok(Box::pin(RecordBatchStreamAdapter::new(
1301            new_schema_clone,
1302            batches,
1303        )))
1304    }
1305
1306    async fn combine_old_new(
1307        self,
1308        new_data: SendableRecordBatchStream,
1309        chunk_size: u64,
1310        old_data_filter: Option<OldIndexDataFilter>,
1311    ) -> Result<SendableRecordBatchStream> {
1312        let value_column_index = new_data.schema().index_of(VALUE_COLUMN_NAME)?;
1313
1314        let new_input = Arc::new(OneShotExec::new(new_data));
1315        let old_stream = self.into_data_stream().await?;
1316        let old_stream = match old_data_filter {
1317            Some(filter) => filter_row_ids(old_stream, filter),
1318            None => old_stream,
1319        };
1320        let old_input = Arc::new(OneShotExec::new(old_stream));
1321        debug_assert_eq!(
1322            old_input.schema().flattened_fields().len(),
1323            new_input.schema().flattened_fields().len()
1324        );
1325
1326        let sort_expr = PhysicalSortExpr {
1327            expr: Arc::new(Column::new(VALUE_COLUMN_NAME, value_column_index)),
1328            options: SortOptions {
1329                descending: false,
1330                nulls_first: true,
1331            },
1332        };
1333        // The UnionExec creates multiple partitions but the SortPreservingMergeExec merges
1334        // them back into a single partition.
1335        let all_data = UnionExec::try_new(vec![old_input, new_input])?;
1336        let ordered = Arc::new(SortPreservingMergeExec::new([sort_expr].into(), all_data));
1337
1338        let unchunked = execute_plan(
1339            ordered,
1340            LanceExecutionOptions {
1341                use_spilling: true,
1342                ..Default::default()
1343            },
1344        )?;
1345        Ok(chunk_concat_stream(unchunked, chunk_size as usize))
1346    }
1347}
1348
1349/// Filter a stream of record batches using the selection semantics encapsulated
1350/// by `old_data_filter`.
1351fn filter_row_ids(
1352    stream: SendableRecordBatchStream,
1353    old_data_filter: OldIndexDataFilter,
1354) -> SendableRecordBatchStream {
1355    let schema = stream.schema();
1356    let filtered = stream.map(move |batch_result| {
1357        let batch = batch_result?;
1358        let row_ids = batch[ROW_ID]
1359            .as_any()
1360            .downcast_ref::<arrow_array::UInt64Array>()
1361            .ok_or_else(|| Error::internal("expected UInt64Array for row_id column"))?;
1362        let mask = old_data_filter.filter_row_ids(row_ids);
1363        Ok(arrow_select::filter::filter_record_batch(&batch, &mask)?)
1364    });
1365    Box::pin(RecordBatchStreamAdapter::new(schema, filtered))
1366}
1367
1368fn wrap_bound(bound: &Bound<ScalarValue>) -> Bound<OrderableScalarValue> {
1369    match bound {
1370        Bound::Unbounded => Bound::Unbounded,
1371        Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
1372        Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
1373    }
1374}
1375
1376fn serialize_with_display<T: Display, S: Serializer>(
1377    value: &Option<T>,
1378    serializer: S,
1379) -> std::result::Result<S::Ok, S::Error> {
1380    if let Some(value) = value {
1381        serializer.collect_str(value)
1382    } else {
1383        serializer.collect_str("N/A")
1384    }
1385}
1386
1387#[derive(Serialize)]
1388struct BTreeStatistics {
1389    #[serde(serialize_with = "serialize_with_display")]
1390    min: Option<OrderableScalarValue>,
1391    #[serde(serialize_with = "serialize_with_display")]
1392    max: Option<OrderableScalarValue>,
1393    num_pages: u32,
1394}
1395
1396#[async_trait]
1397impl Index for BTreeIndex {
1398    fn as_any(&self) -> &dyn Any {
1399        self
1400    }
1401
1402    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
1403        self
1404    }
1405
1406    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
1407        Err(Error::not_supported_source(
1408            "BTreeIndex is not vector index".into(),
1409        ))
1410    }
1411
1412    async fn prewarm(&self) -> Result<()> {
1413        let index_reader = LazyIndexReader::new(self.store.clone(), self.ranges_to_files.clone());
1414        let reader = index_reader.get().await?;
1415        let num_pages = reader.num_batches(self.batch_size).await;
1416        let mut pages = stream::iter(0..num_pages)
1417            .map(|page_idx| {
1418                let index_reader = index_reader.clone();
1419                async move {
1420                    let page = self
1421                        .read_page(page_idx, index_reader, &NoOpMetricsCollector)
1422                        .await?;
1423                    Result::Ok((page_idx, page))
1424                }
1425            })
1426            .buffer_unordered(get_num_compute_intensive_cpus());
1427
1428        while let Some((page_idx, page)) = pages.try_next().await? {
1429            let inserted = self
1430                .index_cache
1431                .insert_with_key(
1432                    &BTreePageKey {
1433                        page_number: page_idx,
1434                    },
1435                    Arc::new(page),
1436                )
1437                .await;
1438
1439            if !inserted {
1440                return Err(Error::internal(
1441                    "Failed to prewarm index: cache is no longer available".to_string(),
1442                ));
1443            }
1444        }
1445
1446        Ok(())
1447    }
1448
1449    fn index_type(&self) -> IndexType {
1450        IndexType::BTree
1451    }
1452
1453    fn statistics(&self) -> Result<serde_json::Value> {
1454        let min = self
1455            .page_lookup
1456            .tree
1457            .first_key_value()
1458            .map(|(k, _)| k.clone());
1459        let max = self
1460            .page_lookup
1461            .tree
1462            .last_key_value()
1463            .map(|(k, _)| k.clone());
1464        serde_json::to_value(&BTreeStatistics {
1465            num_pages: self.page_lookup.tree.len() as u32,
1466            min,
1467            max,
1468        })
1469        .map_err(|err| err.into())
1470    }
1471
1472    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
1473        let mut frag_ids = RoaringBitmap::default();
1474
1475        let lazy_reader = LazyIndexReader::new(self.store.clone(), self.ranges_to_files.clone());
1476        let sub_index_reader = lazy_reader.get().await?;
1477        let mut reader_stream = IndexReaderStream::new(sub_index_reader, self.batch_size)
1478            .await
1479            .buffered(self.store.io_parallelism());
1480        while let Some(serialized) = reader_stream.try_next().await? {
1481            let page = FlatIndex::try_new(serialized)?;
1482            frag_ids |= page.calculate_included_frags()?;
1483        }
1484
1485        Ok(frag_ids)
1486    }
1487}
1488
1489#[async_trait]
1490impl ScalarIndex for BTreeIndex {
1491    async fn search(
1492        &self,
1493        query: &dyn AnyQuery,
1494        metrics: &dyn MetricsCollector,
1495    ) -> Result<SearchResult> {
1496        let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
1497        let mut pages = match query {
1498            SargableQuery::Equals(val) => self
1499                .page_lookup
1500                .pages_eq(&OrderableScalarValue(val.clone())),
1501            SargableQuery::Range(start, end) => self
1502                .page_lookup
1503                .pages_between((wrap_bound(start).as_ref(), wrap_bound(end).as_ref())),
1504            SargableQuery::IsIn(values) => self
1505                .page_lookup
1506                .pages_in(values.iter().map(|val| OrderableScalarValue(val.clone()))),
1507            SargableQuery::FullTextSearch(_) => {
1508                return Err(Error::invalid_input(
1509                    "full text search is not supported for BTree index, build a inverted index for it",
1510                ));
1511            }
1512            SargableQuery::IsNull() => self.page_lookup.pages_null(),
1513            SargableQuery::LikePrefix(prefix) => {
1514                // Convert LikePrefix to a range query: [prefix, next_prefix)
1515                match prefix {
1516                    ScalarValue::Utf8(Some(s)) => {
1517                        let start = Bound::Included(OrderableScalarValue(prefix.clone()));
1518                        let end = match compute_next_prefix(s) {
1519                            Some(next) => {
1520                                Bound::Excluded(OrderableScalarValue(ScalarValue::Utf8(Some(next))))
1521                            }
1522                            None => Bound::Unbounded,
1523                        };
1524                        self.page_lookup
1525                            .pages_between((start.as_ref(), end.as_ref()))
1526                    }
1527                    ScalarValue::LargeUtf8(Some(s)) => {
1528                        let start = Bound::Included(OrderableScalarValue(prefix.clone()));
1529                        let end = match compute_next_prefix(s) {
1530                            Some(next) => Bound::Excluded(OrderableScalarValue(
1531                                ScalarValue::LargeUtf8(Some(next)),
1532                            )),
1533                            None => Bound::Unbounded,
1534                        };
1535                        self.page_lookup
1536                            .pages_between((start.as_ref(), end.as_ref()))
1537                    }
1538                    _ => {
1539                        // Conservative: return all pages for non-string types
1540                        // This is consistent with ZoneMap behavior
1541                        self.page_lookup
1542                            .pages_between((Bound::Unbounded, Bound::Unbounded))
1543                    }
1544                }
1545            }
1546        };
1547
1548        // For non-IsNull queries, also include null pages so that null row IDs
1549        // are tracked in the result. Any comparison with NULL yields NULL, and
1550        // we need this information for correct three-valued logic (e.g. NOT,
1551        // OR). Without this, a query like `NOT(x = 0)` on data where 0 doesn't
1552        // exist would incorrectly include NULL rows.
1553        //
1554        // We add them as Matches::Some (not Matches::All) so that
1555        // FlatIndex::search() evaluates the predicate and correctly marks
1556        // the rows as NULL rather than TRUE.
1557        if !matches!(query, SargableQuery::IsNull()) {
1558            let existing: HashSet<u32> = pages.iter().map(|m| m.page_id()).collect();
1559            for &page_id in self
1560                .page_lookup
1561                .null_pages
1562                .iter()
1563                .chain(self.page_lookup.all_null_pages.iter())
1564            {
1565                if !existing.contains(&page_id) {
1566                    pages.push(Matches::Some(page_id));
1567                }
1568            }
1569        }
1570
1571        let lazy_index_reader =
1572            LazyIndexReader::new(self.store.clone(), self.ranges_to_files.clone());
1573        let page_tasks = pages
1574            .into_iter()
1575            .map(|page_index| {
1576                self.search_page(query, page_index, lazy_index_reader.clone(), metrics)
1577                    .boxed()
1578            })
1579            .collect::<Vec<_>>();
1580        debug!("Searching {} btree pages", page_tasks.len());
1581
1582        // Collect both matching row IDs and null row IDs from all pages
1583        let results: Vec<NullableRowAddrSet> = stream::iter(page_tasks)
1584            // I/O and compute mixed here but important case is index in cache so
1585            // use compute intensive thread count
1586            .buffered(get_num_compute_intensive_cpus())
1587            .try_collect()
1588            .await?;
1589
1590        // Merge matching row IDs
1591        let selection = NullableRowAddrSet::union_all(&results);
1592
1593        Ok(SearchResult::Exact(selection))
1594    }
1595
1596    fn can_remap(&self) -> bool {
1597        true
1598    }
1599
1600    async fn remap(
1601        &self,
1602        mapping: &HashMap<u64, Option<u64>>,
1603        dest_store: &dyn IndexStore,
1604    ) -> Result<CreatedIndex> {
1605        // (part_id, path)
1606        // The part_id is None for a basic index
1607        // For a range-based index we use Some(0), Some(1), ...
1608        //   even if those weren't the original part ids
1609        let part_page_files: Vec<(Option<u32>, &str)> =
1610            if let Some(ranges_to_files) = &self.ranges_to_files {
1611                // Range-based Index: Directly collect references to the file paths.
1612                ranges_to_files
1613                    .iter()
1614                    .enumerate()
1615                    .map(|(part_id, (_, (path, _)))| (Some(part_id as u32), path.as_str()))
1616                    .collect()
1617            } else {
1618                // Basic Index: There is only one source page file.
1619                vec![(None, BTREE_PAGES_NAME)]
1620            };
1621
1622        let mapping = Arc::new(mapping.clone());
1623        let train_schema = Arc::new(self.train_schema());
1624
1625        // TODO: Could potentially parallelize this across parts, unclear it would be worth it
1626        for (part_id, page_file) in part_page_files {
1627            // Retrain on the remapped pages
1628            let sub_index_reader = self.store.open_index_file(page_file).await?;
1629            let mapping = mapping.clone();
1630
1631            let train_schema_clone = train_schema.clone();
1632            let train_schema = train_schema.clone();
1633
1634            let remapped_stream = IndexReaderStream::new(sub_index_reader, self.batch_size)
1635                .await
1636                .buffered(self.store.io_parallelism())
1637                .map_err(DataFusionError::from)
1638                .and_then(move |batch| {
1639                    // Remap the batch and then convert from the serialized schema to the training input schema
1640                    let remapped =
1641                        FlatIndex::remap_batch(batch, &mapping).map_err(DataFusionError::from);
1642                    let with_train_schema = remapped.and_then(|batch| {
1643                        RecordBatch::try_new(train_schema.clone(), batch.columns().to_vec())
1644                            .map_err(DataFusionError::from)
1645                    });
1646                    std::future::ready(with_train_schema)
1647                });
1648
1649            let remapped_stream = Box::pin(RecordBatchStreamAdapter::new(
1650                train_schema_clone,
1651                remapped_stream,
1652            ));
1653
1654            train_btree_index(remapped_stream, dest_store, self.batch_size, None, part_id).await?;
1655        }
1656
1657        if let Some(ranges_to_files) = &self.ranges_to_files {
1658            let num_parts = ranges_to_files.len();
1659            // Merge the lookups if we are a range-based index
1660            let page_files = (0..num_parts)
1661                .map(|part_id| part_page_data_file_path((part_id as u64) << 32))
1662                .collect::<Vec<_>>();
1663            let lookup_files = (0..num_parts)
1664                .map(|part_id| part_lookup_file_path((part_id as u64) << 32))
1665                .collect::<Vec<_>>();
1666            merge_metadata_files(
1667                dest_store,
1668                &page_files,
1669                &lookup_files,
1670                None,
1671                noop_progress(),
1672            )
1673            .await?;
1674        }
1675
1676        Ok(CreatedIndex {
1677            index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1678                .unwrap(),
1679            index_version: BTREE_INDEX_VERSION,
1680            files: Some(dest_store.list_files_with_sizes().await?),
1681        })
1682    }
1683
1684    async fn update(
1685        &self,
1686        new_data: SendableRecordBatchStream,
1687        dest_store: &dyn IndexStore,
1688        old_data_filter: Option<OldIndexDataFilter>,
1689    ) -> Result<CreatedIndex> {
1690        // Merge the existing index data with the new data and then retrain the index on the merged stream
1691        let merged_data_source = self
1692            .clone()
1693            .combine_old_new(new_data, self.batch_size, old_data_filter)
1694            .await?;
1695        train_btree_index(merged_data_source, dest_store, self.batch_size, None, None).await?;
1696
1697        Ok(CreatedIndex {
1698            index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1699                .unwrap(),
1700            index_version: BTREE_INDEX_VERSION,
1701            files: Some(dest_store.list_files_with_sizes().await?),
1702        })
1703    }
1704
1705    fn update_criteria(&self) -> UpdateCriteria {
1706        UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::Values).with_row_id())
1707    }
1708
1709    fn derive_index_params(&self) -> Result<ScalarIndexParams> {
1710        let params = serde_json::to_value(BTreeParameters {
1711            zone_size: Some(self.batch_size),
1712            range_id: None,
1713        })?;
1714        Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::BTree).with_params(&params))
1715    }
1716}
1717
1718struct BatchStats {
1719    min: ScalarValue,
1720    max: ScalarValue,
1721    null_count: u32,
1722}
1723
1724fn analyze_batch(batch: &RecordBatch) -> Result<BatchStats> {
1725    let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
1726    if values.is_empty() {
1727        return Err(Error::internal(
1728            "received an empty batch in btree training".to_string(),
1729        ));
1730    }
1731    let min = ScalarValue::try_from_array(&values, 0)
1732        .map_err(|e| Error::internal(format!("failed to get min value from batch: {}", e)))?;
1733    let max = ScalarValue::try_from_array(&values, values.len() - 1)
1734        .map_err(|e| Error::internal(format!("failed to get max value from batch: {}", e)))?;
1735
1736    Ok(BatchStats {
1737        min,
1738        max,
1739        null_count: values.null_count() as u32,
1740    })
1741}
1742
1743/// A trait that must be implemented by anything that wishes to act as a btree subindex
1744#[async_trait]
1745pub trait BTreeSubIndex: Debug + Send + Sync + DeepSizeOf {
1746    /// Trains the subindex on a single batch of data and serializes it to Arrow
1747    async fn train(&self, batch: RecordBatch) -> Result<RecordBatch>;
1748
1749    /// Deserialize a subindex from Arrow
1750    async fn load_subindex(&self, serialized: RecordBatch) -> Result<Arc<dyn ScalarIndex>>;
1751
1752    /// Retrieve the data used to originally train this page
1753    ///
1754    /// In order to perform an update we need to merge the old data in with the new data which
1755    /// means we need to access the new data.  Right now this is convenient for flat indices but
1756    /// we may need to take a different approach if we ever decide to use a sub-index other than
1757    /// flat
1758    async fn retrieve_data(&self, serialized: RecordBatch) -> Result<RecordBatch>;
1759
1760    /// The schema of the subindex when serialized to Arrow
1761    fn schema(&self) -> &Arc<Schema>;
1762
1763    /// Given a serialized page, deserialize it, remap the row ids, and re-serialize it
1764    async fn remap_subindex(
1765        &self,
1766        serialized: RecordBatch,
1767        mapping: &HashMap<u64, Option<u64>>,
1768    ) -> Result<RecordBatch>;
1769}
1770
1771struct EncodedBatch {
1772    stats: BatchStats,
1773    page_number: u32,
1774}
1775
1776async fn train_btree_page(
1777    batch: RecordBatch,
1778    batch_idx: u32,
1779    writer: &mut dyn IndexWriter,
1780    schema: Arc<Schema>,
1781) -> Result<EncodedBatch> {
1782    let stats = analyze_batch(&batch)?;
1783
1784    // Renames from value/_rowid to values/ids
1785    let trained = RecordBatch::try_new(
1786        schema.clone(),
1787        vec![
1788            batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?.clone(),
1789            batch.column_by_name(ROW_ID).expect_ok()?.clone(),
1790        ],
1791    )?;
1792
1793    writer.write_record_batch(trained).await?;
1794    Ok(EncodedBatch {
1795        stats,
1796        page_number: batch_idx,
1797    })
1798}
1799
1800fn btree_stats_as_batch(stats: Vec<EncodedBatch>, value_type: &DataType) -> Result<RecordBatch> {
1801    let mins = if stats.is_empty() {
1802        new_empty_array(value_type)
1803    } else {
1804        ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.min.clone()))?
1805    };
1806    let maxs = if stats.is_empty() {
1807        new_empty_array(value_type)
1808    } else {
1809        ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.max.clone()))?
1810    };
1811    let null_counts = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.stats.null_count));
1812    let page_numbers = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.page_number));
1813
1814    let schema = Arc::new(Schema::new(vec![
1815        // min and max can be null if the entire batch is null values
1816        Field::new("min", mins.data_type().clone(), true),
1817        Field::new("max", maxs.data_type().clone(), true),
1818        Field::new("null_count", null_counts.data_type().clone(), false),
1819        Field::new("page_idx", page_numbers.data_type().clone(), false),
1820    ]));
1821
1822    let columns = vec![
1823        mins,
1824        maxs,
1825        Arc::new(null_counts) as Arc<dyn Array>,
1826        Arc::new(page_numbers) as Arc<dyn Array>,
1827    ];
1828
1829    Ok(RecordBatch::try_new(schema, columns)?)
1830}
1831
1832/// Train a btree index from a stream of sorted page-size batches of values and row ids
1833pub async fn train_btree_index(
1834    batches_source: SendableRecordBatchStream,
1835    index_store: &dyn IndexStore,
1836    batch_size: u64,
1837    fragment_ids: Option<Vec<u32>>,
1838    range_id: Option<u32>,
1839) -> Result<()> {
1840    // Create `partition_id` for distributed index building.
1841    // This ID serves as a high-level mask (first 32 bits of a u64) to ensure
1842    // that index partitions generated by different workers do not conflict.
1843    // Lance supports two strategies for distributed training: fragment-based and range-based.
1844    let partition_id = fragment_ids
1845        .as_ref()
1846        // --- Fragment-based Partitioning ---
1847        // Used when training sub-indexes on a fragment-level-split basis. The `partition_id` is
1848        // derived from `fragment_ids` to associate the index pages with their source fragment.
1849        .and_then(|frag_ids| frag_ids.first())
1850        .map(|&first_frag_id| (first_frag_id as u64) << 32)
1851        // --- Range-based Partitioning ---
1852        // Built upon data globally sorted by an external compute engine. The `range_id` creates
1853        // a unique name for the index pages generated by each worker.
1854        .or_else(|| range_id.map(|id| (id as u64) << 32));
1855
1856    let flat_schema = Arc::new(Schema::new(vec![
1857        Field::new(
1858            BTREE_VALUES_COLUMN,
1859            batches_source.schema().field(0).data_type().clone(),
1860            true,
1861        ),
1862        Field::new(BTREE_IDS_COLUMN, DataType::UInt64, false),
1863    ]));
1864
1865    let mut sub_index_file = match partition_id {
1866        None => {
1867            index_store
1868                .new_index_file(BTREE_PAGES_NAME, flat_schema.clone())
1869                .await?
1870        }
1871        Some(partition_id) => {
1872            index_store
1873                .new_index_file(
1874                    part_page_data_file_path(partition_id).as_str(),
1875                    flat_schema.clone(),
1876                )
1877                .await?
1878        }
1879    };
1880
1881    let mut encoded_batches = Vec::new();
1882    let mut batch_idx = 0;
1883
1884    let value_type = batches_source
1885        .schema()
1886        .field_with_name(VALUE_COLUMN_NAME)?
1887        .data_type()
1888        .clone();
1889
1890    let mut batches_source = chunk_concat_stream(batches_source, batch_size as usize);
1891
1892    while let Some(batch) = batches_source.try_next().await? {
1893        encoded_batches.push(
1894            train_btree_page(
1895                batch,
1896                batch_idx,
1897                sub_index_file.as_mut(),
1898                flat_schema.clone(),
1899            )
1900            .await?,
1901        );
1902        batch_idx += 1;
1903    }
1904    sub_index_file.finish().await?;
1905    let record_batch = btree_stats_as_batch(encoded_batches, &value_type)?;
1906    let mut file_schema = record_batch.schema().as_ref().clone();
1907    file_schema
1908        .metadata
1909        .insert(BATCH_SIZE_META_KEY.to_string(), batch_size.to_string());
1910    file_schema.metadata.insert(
1911        RANGE_PARTITIONED_META_KEY.to_string(),
1912        range_id.is_some().to_string(),
1913    );
1914    let mut btree_index_file = match partition_id {
1915        None => {
1916            index_store
1917                .new_index_file(BTREE_LOOKUP_NAME, Arc::new(file_schema))
1918                .await?
1919        }
1920        Some(partition_id) => {
1921            index_store
1922                .new_index_file(
1923                    part_lookup_file_path(partition_id).as_str(),
1924                    Arc::new(file_schema),
1925                )
1926                .await?
1927        }
1928    };
1929    btree_index_file.write_record_batch(record_batch).await?;
1930    btree_index_file.finish().await?;
1931    Ok(())
1932}
1933
1934pub async fn merge_index_files(
1935    object_store: &ObjectStore,
1936    index_dir: &Path,
1937    store: Arc<dyn IndexStore>,
1938    batch_readhead: Option<usize>,
1939    progress: Arc<dyn IndexBuildProgress>,
1940) -> Result<()> {
1941    // List all partition page / lookup files in the index directory
1942    let (part_page_files, part_lookup_files) =
1943        list_page_lookup_files(object_store, index_dir).await?;
1944    merge_metadata_files(
1945        store.as_ref(),
1946        &part_page_files,
1947        &part_lookup_files,
1948        batch_readhead,
1949        progress,
1950    )
1951    .await
1952}
1953
1954/// List and filter files from the index directory
1955/// Returns (page_files, lookup_files)
1956async fn list_page_lookup_files(
1957    object_store: &ObjectStore,
1958    index_dir: &Path,
1959) -> Result<(Vec<String>, Vec<String>)> {
1960    let mut part_page_files = Vec::new();
1961    let mut part_lookup_files = Vec::new();
1962
1963    let mut list_stream = object_store.list(Some(index_dir.clone()));
1964
1965    while let Some(item) = list_stream.next().await {
1966        match item {
1967            Ok(meta) => {
1968                let file_name = meta.location.filename().unwrap_or_default();
1969                // Filter files matching the pattern part_*_page_data.lance
1970                if file_name.starts_with("part_") && file_name.ends_with("_page_data.lance") {
1971                    part_page_files.push(file_name.to_string());
1972                }
1973                // Filter files matching the pattern part_*_page_lookup.lance
1974                if file_name.starts_with("part_") && file_name.ends_with("_page_lookup.lance") {
1975                    part_lookup_files.push(file_name.to_string());
1976                }
1977            }
1978            Err(_) => continue,
1979        }
1980    }
1981
1982    if part_page_files.is_empty() || part_lookup_files.is_empty() {
1983        return Err(Error::internal(format!(
1984            "No partition metadata files found in index directory: {} (page_files: {}, lookup_files: {})",
1985            index_dir,
1986            part_page_files.len(),
1987            part_lookup_files.len()
1988        )));
1989    }
1990
1991    Ok((part_page_files, part_lookup_files))
1992}
1993
1994/// Merge multiple partition page / lookup files into a complete metadata file
1995///
1996/// In a distributed environment, each worker node writes partition page / lookup file for the partitions it processes,
1997/// and this function merges these files into a final metadata file.
1998/// - For fragment-based indices, it performs a full K-way sort-merge of page files to create new global page and lookup files.
1999/// - For range-based indices, it concatenates lookup files, as data is already globally sorted.
2000async fn merge_metadata_files(
2001    store: &dyn IndexStore,
2002    part_page_files: &[String],
2003    part_lookup_files: &[String],
2004    batch_readhead: Option<usize>,
2005    progress: Arc<dyn IndexBuildProgress>,
2006) -> Result<()> {
2007    if part_lookup_files.is_empty() || part_page_files.is_empty() {
2008        return Err(Error::internal(
2009            "No partition files provided for merging".to_string(),
2010        ));
2011    }
2012
2013    // Step 1: Create lookup map for page files by partition ID
2014    if part_lookup_files.len() != part_page_files.len() {
2015        return Err(Error::internal(format!(
2016            "Number of partition lookup files ({}) does not match number of partition page files ({})",
2017            part_lookup_files.len(),
2018            part_page_files.len()
2019        )));
2020    }
2021    let mut page_files_map = HashMap::new();
2022    for page_file in part_page_files {
2023        let partition_id = extract_partition_id(page_file)?;
2024        page_files_map.insert(partition_id, page_file);
2025    }
2026
2027    // Step 2: Validate that all lookup files have corresponding page files
2028    for lookup_file in part_lookup_files {
2029        let partition_id = extract_partition_id(lookup_file)?;
2030        if !page_files_map.contains_key(&partition_id) {
2031            return Err(Error::internal(format!(
2032                "No corresponding page file found for lookup file: {} (partition_id: {})",
2033                lookup_file, partition_id
2034            )));
2035        }
2036    }
2037
2038    // Step 3: Extract shared metadata and generate lookup_schema
2039    let first_lookup_reader = store.open_index_file(&part_lookup_files[0]).await?;
2040    let batch_size = first_lookup_reader
2041        .schema()
2042        .metadata
2043        .get(BATCH_SIZE_META_KEY)
2044        .map(|bs| bs.parse().unwrap_or(DEFAULT_BTREE_BATCH_SIZE))
2045        .unwrap_or(DEFAULT_BTREE_BATCH_SIZE);
2046    let range_partitioned = first_lookup_reader
2047        .schema()
2048        .metadata
2049        .get(RANGE_PARTITIONED_META_KEY)
2050        .map(|bs| bs.parse().unwrap_or(DEFAULT_RANGE_PARTITIONED))
2051        .unwrap_or(DEFAULT_RANGE_PARTITIONED);
2052
2053    // Get the value type from lookup schema (min column)
2054    let value_type = first_lookup_reader
2055        .schema()
2056        .fields
2057        .first()
2058        .unwrap()
2059        .data_type();
2060
2061    let mut metadata = HashMap::new();
2062    metadata.insert(BATCH_SIZE_META_KEY.to_string(), batch_size.to_string());
2063    let lookup_schema = Arc::new(Schema::new(vec![
2064        Field::new("min", value_type.clone(), true),
2065        Field::new("max", value_type.clone(), true),
2066        Field::new("null_count", DataType::UInt32, false),
2067        Field::new("page_idx", DataType::UInt32, false),
2068    ]));
2069
2070    // Step 4: Merge pages and lookups and generate new index files
2071    if range_partitioned {
2072        merge_range_partitioned_lookups(
2073            store,
2074            part_lookup_files,
2075            lookup_schema,
2076            metadata,
2077            batch_size,
2078            batch_readhead,
2079            progress,
2080        )
2081        .await
2082    } else {
2083        merge_pages_and_lookups(
2084            store,
2085            part_page_files,
2086            part_lookup_files,
2087            &page_files_map,
2088            lookup_schema,
2089            metadata,
2090            batch_size,
2091            batch_readhead,
2092            progress,
2093        )
2094        .await
2095    }
2096}
2097
2098/// Merges multiple lookup files from a range-partitioned index into a single, unified lookup file.
2099///
2100/// A range-partitioned B-Tree index creates a separate `page_lookup.lance` file for
2101/// each partition. Each of these files has its own local `page_idx` column, where the indices
2102/// start from 0.
2103///
2104/// This function's primary goal is to combine these separate files into one large
2105/// `page_lookup.lance` file. To do this, it remaps the local `page_idx` from each partition
2106/// file into a contiguous, global `page_idx` space. It processes partition files sequentially,
2107/// calculating an offset based on the number of pages in all previously processed partitions.
2108///
2109/// **The reverse operation occurs when the B-Tree index is loaded**: a global `page_idx` is translated
2110/// back into a `(partition_id, local_page_idx)` tuple. This translation is made possible by the
2111/// metadata stored under the `PAGE_NUM_PER_RANGE_PARTITION_META_KEY`, which this function
2112/// is responsible for writing.
2113///
2114/// # Examples
2115///
2116/// If we have two partition lookup files:
2117/// - `part_0_page_lookup.lance`: Contains 3 pages. Its `page_idx` column is `[0, 1, 2]`.
2118/// - `part_1_page_lookup.lance`: Contains 4 pages. Its `page_idx` column is `[0, 1, 2, 3]`.
2119///
2120/// The merge process works as follows:
2121/// 1. Process `part_0`: The offset is 0. The indices `[0, 1, 2]` are written as is.
2122/// 2. Process `part_1`: The offset is 3 and the local indices `[0, 1, 2, 3]` are remapped
2123///    by adding the offset, resulting in `[3, 4, 5, 6]`.
2124///
2125/// The final, merged `_page_lookup.lance` will have a single `page_idx` column containing
2126/// `[0, 1, 2, 3, 4, 5, 6]`.
2127async fn merge_range_partitioned_lookups(
2128    store: &dyn IndexStore,
2129    part_lookup_files: &[String],
2130    lookup_schema: Arc<Schema>,
2131    mut metadata: HashMap<String, String>,
2132    batch_size: u64,
2133    batch_readhead: Option<usize>,
2134    progress: Arc<dyn IndexBuildProgress>,
2135) -> Result<()> {
2136    let sorted_part_lookup_files = sort_files_by_partition_id(part_lookup_files)?;
2137    let mut lookup_file = store
2138        .new_index_file(BTREE_LOOKUP_NAME, lookup_schema)
2139        .await?;
2140
2141    // stores partition id and the number of pages in that partition
2142    let mut pages_per_file: Vec<(u64, u32)> = Vec::with_capacity(sorted_part_lookup_files.len());
2143    let mut num_pages_written = 0u32;
2144
2145    progress
2146        .stage_start(
2147            "merge_lookups",
2148            Some(sorted_part_lookup_files.len() as u64),
2149            "files",
2150        )
2151        .await?;
2152
2153    for (idx, (part_id, part_lookup_file)) in sorted_part_lookup_files.into_iter().enumerate() {
2154        let lookup_reader = store.open_index_file(&part_lookup_file).await?;
2155        let reader_stream = IndexReaderStream::new(lookup_reader.clone(), batch_size).await;
2156        let mut stream = reader_stream.buffered(batch_readhead.unwrap_or(1)).boxed();
2157        while let Some(batch) = stream.next().await {
2158            let original_batch = batch?;
2159            let modified_batch = add_offset_to_page_idx(&original_batch, num_pages_written)?;
2160            lookup_file.write_record_batch(modified_batch).await?;
2161        }
2162        pages_per_file.push((part_id, lookup_reader.num_rows() as u32));
2163        num_pages_written += lookup_reader.num_rows() as u32;
2164        progress
2165            .stage_progress("merge_lookups", idx as u64 + 1)
2166            .await?;
2167    }
2168
2169    metadata.insert(RANGE_PARTITIONED_META_KEY.to_string(), "true".to_string());
2170    metadata.insert(
2171        PAGE_NUM_PER_RANGE_PARTITION_META_KEY.to_string(),
2172        serde_json::to_string(&pages_per_file)?,
2173    );
2174
2175    lookup_file.finish_with_metadata(metadata).await?;
2176    progress.stage_complete("merge_lookups").await?;
2177
2178    // In this mode, we only clean up lookup files, and page files are untouched.
2179    cleanup_partition_files(store, part_lookup_files, &[]).await;
2180    Ok(())
2181}
2182
2183/// Merges partition files using a K-way sort-merge algorithm.
2184///
2185/// This function assumes its inputs have been pre-validated. It reads from all
2186/// partitioned page files simultaneously, merges them into a single sorted stream,
2187/// writes a new global page file, and generates a corresponding global lookup file.
2188#[allow(clippy::too_many_arguments)]
2189async fn merge_pages_and_lookups(
2190    store: &dyn IndexStore,
2191    part_page_files: &[String],
2192    part_lookup_files: &[String],
2193    page_files_map: &HashMap<u64, &String>,
2194    lookup_schema: Arc<Schema>,
2195    metadata: HashMap<String, String>,
2196    batch_size: u64,
2197    batch_readhead: Option<usize>,
2198    progress: Arc<dyn IndexBuildProgress>,
2199) -> Result<()> {
2200    // Create a new global page file
2201    let partition_id = extract_partition_id(part_lookup_files[0].as_str())?;
2202    let page_file = page_files_map.get(&partition_id).unwrap();
2203    let page_reader = store.open_index_file(page_file).await?;
2204    let page_schema = page_reader.schema().clone();
2205
2206    let arrow_schema = Arc::new(Schema::from(&page_schema));
2207    let mut page_file = store
2208        .new_index_file(BTREE_PAGES_NAME, arrow_schema.clone())
2209        .await?;
2210    progress.stage_start("merge_pages", None, "pages").await?;
2211    let lookup_entries = merge_pages(
2212        part_lookup_files,
2213        page_files_map,
2214        store,
2215        batch_size,
2216        &mut page_file,
2217        arrow_schema.clone(),
2218        batch_readhead,
2219        progress.clone(),
2220    )
2221    .await?;
2222    page_file.finish().await?;
2223    progress.stage_complete("merge_pages").await?;
2224
2225    let lookup_batch = RecordBatch::try_new(
2226        lookup_schema.clone(),
2227        vec![
2228            ScalarValue::iter_to_array(lookup_entries.iter().map(|(min, _, _, _)| min.clone()))?,
2229            ScalarValue::iter_to_array(lookup_entries.iter().map(|(_, max, _, _)| max.clone()))?,
2230            Arc::new(UInt32Array::from_iter_values(
2231                lookup_entries
2232                    .iter()
2233                    .map(|(_, _, null_count, _)| *null_count),
2234            )),
2235            Arc::new(UInt32Array::from_iter_values(
2236                lookup_entries.iter().map(|(_, _, _, page_idx)| *page_idx),
2237            )),
2238        ],
2239    )?;
2240    let mut lookup_file = store
2241        .new_index_file(BTREE_LOOKUP_NAME, lookup_schema)
2242        .await?;
2243    progress
2244        .stage_start("write_lookup_file", Some(1), "files")
2245        .await?;
2246    lookup_file.write_record_batch(lookup_batch).await?;
2247    lookup_file.finish_with_metadata(metadata).await?;
2248    progress.stage_progress("write_lookup_file", 1).await?;
2249    progress.stage_complete("write_lookup_file").await?;
2250
2251    // After successfully writing the merged files, delete all partition files
2252    // Only perform deletion after files are successfully written, ensuring debug information is not lost in case of failure
2253    cleanup_partition_files(store, part_lookup_files, part_page_files).await;
2254
2255    Ok(())
2256}
2257
2258// Adjust local_page_idx_ in each look-up file to create a contiguous global_page_idx
2259fn add_offset_to_page_idx(batch: &RecordBatch, offset: u32) -> Result<RecordBatch> {
2260    let (page_idx_pos, _) = batch.schema().column_with_name("page_idx").ok_or_else(|| {
2261        Error::internal("Column 'page_idx' not found in RecordBatch schema".to_string())
2262    })?;
2263    let page_idx_array = batch
2264        .column(page_idx_pos)
2265        .as_any()
2266        .downcast_ref::<UInt32Array>()
2267        .ok_or_else(|| {
2268            Error::internal("Failed to downcast 'page_idx' column to UInt32Array".to_string())
2269        })?;
2270    let offset_array = UInt32Array::from(vec![offset; page_idx_array.len()]);
2271    let new_page_idx_array_ref = add(page_idx_array, &offset_array)?;
2272    let mut new_columns = batch.columns().to_vec();
2273    new_columns[page_idx_pos] = new_page_idx_array_ref;
2274    let new_batch = RecordBatch::try_new(batch.schema(), new_columns)?;
2275    Ok(new_batch)
2276}
2277
2278/// Merge pages using Datafusion's SortPreservingMergeExec
2279/// which implements a K-way merge algorithm with fixed-size output batches
2280#[allow(clippy::too_many_arguments)]
2281async fn merge_pages(
2282    part_lookup_files: &[String],
2283    page_files_map: &HashMap<u64, &String>,
2284    store: &dyn IndexStore,
2285    batch_size: u64,
2286    page_file: &mut Box<dyn IndexWriter>,
2287    arrow_schema: Arc<Schema>,
2288    batch_readhead: Option<usize>,
2289    progress: Arc<dyn IndexBuildProgress>,
2290) -> Result<Vec<(ScalarValue, ScalarValue, u32, u32)>> {
2291    let mut lookup_entries = Vec::new();
2292    let mut page_idx = 0u32;
2293
2294    debug!(
2295        "Starting SortPreservingMerge with {} partitions",
2296        part_lookup_files.len()
2297    );
2298
2299    let value_field = arrow_schema.field(0).clone().with_name(VALUE_COLUMN_NAME);
2300    let row_id_field = arrow_schema.field(1).clone().with_name(ROW_ID);
2301    let stream_schema = Arc::new(Schema::new(vec![value_field, row_id_field]));
2302
2303    // Create execution plans for each stream
2304    let mut inputs: Vec<Arc<dyn ExecutionPlan>> = Vec::new();
2305    for lookup_file in part_lookup_files {
2306        let partition_id = extract_partition_id(lookup_file)?;
2307        let page_file_name = (*page_files_map.get(&partition_id).ok_or_else(|| {
2308            Error::internal(format!(
2309                "Page file not found for partition ID: {}",
2310                partition_id
2311            ))
2312        })?)
2313        .clone();
2314
2315        let reader = store.open_index_file(&page_file_name).await?;
2316
2317        let reader_stream = IndexReaderStream::new(reader, batch_size).await;
2318
2319        let stream = reader_stream
2320            .map(|fut| fut.map_err(DataFusionError::from))
2321            .buffered(batch_readhead.unwrap_or(1))
2322            .boxed();
2323
2324        let sendable_stream =
2325            Box::pin(RecordBatchStreamAdapter::new(stream_schema.clone(), stream));
2326        inputs.push(Arc::new(OneShotExec::new(sendable_stream)));
2327    }
2328
2329    // Create Union execution plan to combine all partitions
2330    let union_inputs = UnionExec::try_new(inputs)?;
2331
2332    // Create SortPreservingMerge execution plan
2333    let value_column_index = stream_schema.index_of(VALUE_COLUMN_NAME)?;
2334    let sort_expr = PhysicalSortExpr {
2335        expr: Arc::new(Column::new(VALUE_COLUMN_NAME, value_column_index)),
2336        options: SortOptions {
2337            descending: false,
2338            nulls_first: true,
2339        },
2340    };
2341
2342    let merge_exec = Arc::new(SortPreservingMergeExec::new(
2343        [sort_expr].into(),
2344        union_inputs,
2345    ));
2346
2347    let unchunked = execute_plan(
2348        merge_exec,
2349        LanceExecutionOptions {
2350            use_spilling: false,
2351            ..Default::default()
2352        },
2353    )?;
2354
2355    // Use chunk_concat_stream to ensure fixed batch sizes
2356    let mut chunked_stream = chunk_concat_stream(unchunked, batch_size as usize);
2357
2358    // Process chunked stream
2359    while let Some(batch) = chunked_stream.try_next().await? {
2360        let writer_batch = RecordBatch::try_new(
2361            arrow_schema.clone(),
2362            vec![batch.column(0).clone(), batch.column(1).clone()],
2363        )?;
2364
2365        page_file.write_record_batch(writer_batch).await?;
2366
2367        let min_val = ScalarValue::try_from_array(batch.column(0), 0)?;
2368        let max_val = ScalarValue::try_from_array(batch.column(0), batch.num_rows() - 1)?;
2369        let null_count = batch.column(0).null_count() as u32;
2370
2371        lookup_entries.push((min_val, max_val, null_count, page_idx));
2372        page_idx += 1;
2373        progress
2374            .stage_progress("merge_pages", page_idx as u64)
2375            .await?;
2376    }
2377
2378    Ok(lookup_entries)
2379}
2380
2381// Sorts file paths by the partition ID extracted from file name.
2382fn sort_files_by_partition_id(part_files: &[String]) -> Result<Vec<(u64, String)>> {
2383    let mut files_with_ids: Vec<(u64, &String)> = part_files
2384        .iter()
2385        .map(|file| extract_partition_id(file).map(|id| (id, file)))
2386        .collect::<Result<Vec<_>>>()?;
2387
2388    files_with_ids.sort_unstable_by_key(|k| k.0);
2389
2390    let sorted_files = files_with_ids
2391        .into_iter()
2392        .map(|(id, file)| (id, file.clone()))
2393        .collect();
2394
2395    Ok(sorted_files)
2396}
2397
2398/// Extract partition ID from partition file name
2399/// Expected format: "part_{partition_id}_{suffix}.lance"
2400fn extract_partition_id(filename: &str) -> Result<u64> {
2401    if !filename.starts_with("part_") {
2402        return Err(Error::internal(format!(
2403            "Invalid partition file name format: {}",
2404            filename
2405        )));
2406    }
2407
2408    let parts: Vec<&str> = filename.split('_').collect();
2409    if parts.len() < 3 {
2410        return Err(Error::internal(format!(
2411            "Invalid partition file name format: {}",
2412            filename
2413        )));
2414    }
2415
2416    parts[1].parse::<u64>().map_err(|_| {
2417        Error::internal(format!(
2418            "Failed to parse partition ID from filename: {}",
2419            filename
2420        ))
2421    })
2422}
2423
2424/// Clean up partition files after successful merge
2425///
2426/// This function safely deletes partition lookup and page files after a successful merge operation.
2427/// File deletion failures are logged but do not affect the overall success of the merge operation.
2428async fn cleanup_partition_files(
2429    store: &dyn IndexStore,
2430    part_lookup_files: &[String],
2431    part_page_files: &[String],
2432) {
2433    // Clean up partition lookup files
2434    for file_name in part_lookup_files {
2435        cleanup_single_file(
2436            store,
2437            file_name,
2438            "part_",
2439            "_page_lookup.lance",
2440            "partition lookup",
2441        )
2442        .await;
2443    }
2444
2445    // Clean up partition page files
2446    for file_name in part_page_files {
2447        cleanup_single_file(
2448            store,
2449            file_name,
2450            "part_",
2451            "_page_data.lance",
2452            "partition page",
2453        )
2454        .await;
2455    }
2456}
2457
2458/// Helper function to clean up a single partition file
2459///
2460/// Performs safety checks on the filename pattern before attempting deletion.
2461async fn cleanup_single_file(
2462    store: &dyn IndexStore,
2463    file_name: &str,
2464    expected_prefix: &str,
2465    expected_suffix: &str,
2466    file_type: &str,
2467) {
2468    if file_name.starts_with(expected_prefix) && file_name.ends_with(expected_suffix) {
2469        match store.delete_index_file(file_name).await {
2470            Ok(()) => {
2471                debug!("Successfully deleted {} file: {}", file_type, file_name);
2472            }
2473            Err(e) => {
2474                warn!(
2475                    "Failed to delete {} file '{}': {}. \
2476                    This does not affect the merge operation, but may leave \
2477                    partition files that should be cleaned up manually.",
2478                    file_type, file_name, e
2479                );
2480            }
2481        }
2482    } else {
2483        // If the filename doesn't match the expected format, log a warning but don't attempt deletion
2484        warn!(
2485            "Skipping deletion of file '{}' as it does not match the expected \
2486            {} file pattern ({}*{})",
2487            file_name, file_type, expected_prefix, expected_suffix
2488        );
2489    }
2490}
2491
2492pub(crate) fn part_page_data_file_path(partition_id: u64) -> String {
2493    format!("part_{}_{}", partition_id, BTREE_PAGES_NAME)
2494}
2495
2496pub(crate) fn part_lookup_file_path(partition_id: u64) -> String {
2497    format!("part_{}_{}", partition_id, BTREE_LOOKUP_NAME)
2498}
2499
2500/// A stream that reads the original training data back out of the index
2501///
2502/// This is used for updating the index
2503struct IndexReaderStream {
2504    reader: Arc<dyn IndexReader>,
2505    batch_size: u64,
2506    num_batches: u32,
2507    batch_idx: u32,
2508}
2509
2510impl IndexReaderStream {
2511    async fn new(reader: Arc<dyn IndexReader>, batch_size: u64) -> Self {
2512        let num_batches = reader.num_batches(batch_size).await;
2513        Self {
2514            reader,
2515            batch_size,
2516            num_batches,
2517            batch_idx: 0,
2518        }
2519    }
2520}
2521
2522impl Stream for IndexReaderStream {
2523    type Item = BoxFuture<'static, Result<RecordBatch>>;
2524
2525    fn poll_next(
2526        self: std::pin::Pin<&mut Self>,
2527        _cx: &mut std::task::Context<'_>,
2528    ) -> std::task::Poll<Option<Self::Item>> {
2529        let this = self.get_mut();
2530        if this.batch_idx >= this.num_batches {
2531            return std::task::Poll::Ready(None);
2532        }
2533        let batch_num = this.batch_idx;
2534        this.batch_idx += 1;
2535        let reader_copy = this.reader.clone();
2536        let batch_size = this.batch_size;
2537        let read_task = async move {
2538            reader_copy
2539                .read_record_batch(batch_num as u64, batch_size)
2540                .await
2541        }
2542        .boxed();
2543        std::task::Poll::Ready(Some(read_task))
2544    }
2545}
2546
2547/// Parameters for a btree index
2548#[derive(Debug, Serialize, Deserialize)]
2549pub struct BTreeParameters {
2550    /// The number of rows to include in each zone
2551    pub zone_size: Option<u64>,
2552
2553    /// The ordinal ID of a data partition for building a large, distributed BTree index.
2554    ///
2555    /// When building an index from multiple, pre-partitioned data chunks (for example,
2556    /// in a distributed environment), this ID specifies which partition this particular
2557    /// build operation corresponds to.
2558    ///
2559    /// # Data Distribution Requirements
2560    ///
2561    /// If this parameter is `Some(id)`, the caller **must** guarantee that the input data
2562    /// is strictly global sorted. The input data, when considered as a whole across all
2563    /// partitions ordered by `range_id`, must be sorted.
2564    ///
2565    /// Concretely, this means:
2566    ///
2567    /// All values in the data provided for `range_id: N` must be **less than or equal to**
2568    /// all values in the data for `range_id: N+1`.
2569    ///
2570    /// Lance relies on this precondition to ensure the final, merged index is valid and
2571    /// correctly ordered.
2572    ///
2573    /// # `None` Case
2574    ///
2575    /// If `range_id` is `None`, a single, monolithic index is built over the provided dataset.
2576    pub range_id: Option<u32>,
2577}
2578
2579struct BTreeTrainingRequest {
2580    parameters: BTreeParameters,
2581    criteria: TrainingCriteria,
2582}
2583
2584impl BTreeTrainingRequest {
2585    pub fn new(parameters: BTreeParameters) -> Self {
2586        Self {
2587            parameters,
2588            // BTree indexes need data sorted by the value column
2589            criteria: TrainingCriteria::new(TrainingOrdering::Values).with_row_id(),
2590        }
2591    }
2592}
2593
2594impl TrainingRequest for BTreeTrainingRequest {
2595    fn as_any(&self) -> &dyn std::any::Any {
2596        self
2597    }
2598
2599    fn criteria(&self) -> &TrainingCriteria {
2600        &self.criteria
2601    }
2602}
2603
2604#[derive(Debug, Default)]
2605pub struct BTreeIndexPlugin;
2606
2607#[async_trait]
2608impl ScalarIndexPlugin for BTreeIndexPlugin {
2609    fn name(&self) -> &str {
2610        "BTree"
2611    }
2612
2613    fn new_training_request(
2614        &self,
2615        params: &str,
2616        field: &Field,
2617    ) -> Result<Box<dyn TrainingRequest>> {
2618        if field.data_type().is_nested() {
2619            return Err(Error::invalid_input_source(
2620                "A btree index can only be created on a non-nested field.".into(),
2621            ));
2622        }
2623
2624        let params = serde_json::from_str::<BTreeParameters>(params)?;
2625        Ok(Box::new(BTreeTrainingRequest::new(params)))
2626    }
2627
2628    fn provides_exact_answer(&self) -> bool {
2629        true
2630    }
2631
2632    fn version(&self) -> u32 {
2633        BTREE_INDEX_VERSION
2634    }
2635
2636    fn new_query_parser(
2637        &self,
2638        index_name: String,
2639        _index_details: &prost_types::Any,
2640    ) -> Option<Box<dyn ScalarQueryParser>> {
2641        Some(Box::new(SargableQueryParser::new(index_name, false)))
2642    }
2643
2644    async fn train_index(
2645        &self,
2646        data: SendableRecordBatchStream,
2647        index_store: &dyn IndexStore,
2648        request: Box<dyn TrainingRequest>,
2649        fragment_ids: Option<Vec<u32>>,
2650        _progress: Arc<dyn crate::progress::IndexBuildProgress>,
2651    ) -> Result<CreatedIndex> {
2652        let request = request
2653            .as_any()
2654            .downcast_ref::<BTreeTrainingRequest>()
2655            .unwrap();
2656        train_btree_index(
2657            data,
2658            index_store,
2659            request
2660                .parameters
2661                .zone_size
2662                .unwrap_or(DEFAULT_BTREE_BATCH_SIZE),
2663            fragment_ids,
2664            request.parameters.range_id,
2665        )
2666        .await?;
2667        Ok(CreatedIndex {
2668            index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
2669                .unwrap(),
2670            index_version: BTREE_INDEX_VERSION,
2671            files: Some(index_store.list_files_with_sizes().await?),
2672        })
2673    }
2674
2675    async fn load_index(
2676        &self,
2677        index_store: Arc<dyn IndexStore>,
2678        _index_details: &prost_types::Any,
2679        frag_reuse_index: Option<Arc<FragReuseIndex>>,
2680        cache: &LanceCache,
2681    ) -> Result<Arc<dyn ScalarIndex>> {
2682        Ok(BTreeIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
2683    }
2684}
2685
2686#[cfg(test)]
2687mod tests {
2688    use std::sync::atomic::Ordering;
2689    use std::{collections::HashMap, sync::Arc};
2690
2691    use arrow::datatypes::{Float32Type, Float64Type, Int32Type, UInt64Type};
2692    use arrow_array::{FixedSizeListArray, record_batch};
2693    use datafusion::{
2694        execution::{SendableRecordBatchStream, TaskContext},
2695        physical_plan::{ExecutionPlan, sorts::sort::SortExec, stream::RecordBatchStreamAdapter},
2696    };
2697    use datafusion_common::{DataFusionError, ScalarValue};
2698    use datafusion_physical_expr::{PhysicalSortExpr, expressions::col};
2699    use deepsize::DeepSizeOf;
2700    use futures::TryStreamExt;
2701    use futures::stream;
2702    use lance_core::utils::mask::RowSetOps;
2703    use lance_core::utils::tempfile::TempObjDir;
2704    use lance_core::{cache::LanceCache, utils::mask::RowAddrTreeMap};
2705    use lance_datafusion::{chunker::break_stream, datagen::DatafusionDatagenExt};
2706    use lance_datagen::{ArrayGeneratorExt, BatchCount, RowCount, array, gen_batch};
2707    use lance_io::object_store::ObjectStore;
2708    use object_store::path::Path;
2709
2710    use crate::metrics::LocalMetricsCollector;
2711    use crate::progress::{IndexBuildProgress, noop_progress};
2712    use crate::{
2713        metrics::NoOpMetricsCollector,
2714        scalar::{
2715            IndexStore, OldIndexDataFilter, SargableQuery, ScalarIndex, SearchResult,
2716            btree::{BTREE_PAGES_NAME, BTreeIndex},
2717            lance_format::LanceIndexStore,
2718        },
2719    };
2720
2721    use super::{
2722        DEFAULT_BTREE_BATCH_SIZE, OrderableScalarValue, part_lookup_file_path,
2723        part_page_data_file_path, train_btree_index,
2724    };
2725
2726    lance_testing::define_stage_event_progress!(
2727        RecordingProgress,
2728        IndexBuildProgress,
2729        lance_core::Result<()>
2730    );
2731    #[test]
2732    fn test_scalar_value_size() {
2733        let size_of_i32 = OrderableScalarValue(ScalarValue::Int32(Some(0))).deep_size_of();
2734        let size_of_many_i32 = OrderableScalarValue(ScalarValue::FixedSizeList(Arc::new(
2735            FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
2736                vec![Some(vec![Some(0); 128])],
2737                128,
2738            ),
2739        )))
2740        .deep_size_of();
2741
2742        // deep_size_of should account for the rust type overhead
2743        assert!(size_of_i32 > 4);
2744        assert!(size_of_many_i32 > 128 * 4);
2745    }
2746
2747    #[tokio::test]
2748    async fn test_null_ids() {
2749        let tmpdir = TempObjDir::default();
2750        let test_store = Arc::new(LanceIndexStore::new(
2751            Arc::new(ObjectStore::local()),
2752            tmpdir.clone(),
2753            Arc::new(LanceCache::no_cache()),
2754        ));
2755
2756        // Generate 50,000 rows of random data with 80% nulls
2757        let stream = gen_batch()
2758            .col(
2759                "value",
2760                array::rand::<Float32Type>().with_nulls(&[true, false, false, false, false]),
2761            )
2762            .col("_rowid", array::step::<UInt64Type>())
2763            .into_df_stream(RowCount::from(5000), BatchCount::from(10));
2764
2765        train_btree_index(stream, test_store.as_ref(), 5000, None, None)
2766            .await
2767            .unwrap();
2768
2769        let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2770            .await
2771            .unwrap();
2772
2773        assert_eq!(index.page_lookup.null_pages.len(), 10);
2774
2775        let remap_dir = TempObjDir::default();
2776        let remap_store = Arc::new(LanceIndexStore::new(
2777            Arc::new(ObjectStore::local()),
2778            remap_dir.clone(),
2779            Arc::new(LanceCache::no_cache()),
2780        ));
2781
2782        // Remap with a no-op mapping.  The remapped index should be identical to the original
2783        index
2784            .remap(&HashMap::default(), remap_store.as_ref())
2785            .await
2786            .unwrap();
2787
2788        let remap_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache())
2789            .await
2790            .unwrap();
2791
2792        assert_eq!(remap_index.page_lookup, index.page_lookup);
2793
2794        let original_pages = test_store.open_index_file(BTREE_PAGES_NAME).await.unwrap();
2795        let remapped_pages = remap_store.open_index_file(BTREE_PAGES_NAME).await.unwrap();
2796
2797        assert_eq!(original_pages.num_rows(), remapped_pages.num_rows());
2798
2799        let original_data = original_pages
2800            .read_record_batch(0, original_pages.num_rows() as u64)
2801            .await
2802            .unwrap();
2803        let remapped_data = remapped_pages
2804            .read_record_batch(0, remapped_pages.num_rows() as u64)
2805            .await
2806            .unwrap();
2807
2808        assert_eq!(original_data, remapped_data);
2809    }
2810
2811    #[tokio::test]
2812    async fn test_nan_ordering() {
2813        let tmpdir = TempObjDir::default();
2814        let test_store = Arc::new(LanceIndexStore::new(
2815            Arc::new(ObjectStore::local()),
2816            tmpdir.clone(),
2817            Arc::new(LanceCache::no_cache()),
2818        ));
2819
2820        let values = vec![
2821            0.0,
2822            1.0,
2823            2.0,
2824            3.0,
2825            f64::NAN,
2826            f64::NEG_INFINITY,
2827            f64::INFINITY,
2828        ];
2829
2830        // This is a bit overkill but we've had bugs in the past where DF's sort
2831        // didn't agree with Arrow's sort so we do an end-to-end test here
2832        // and use DF to sort the data like we would in a real dataset.
2833        let data = gen_batch()
2834            .col("value", array::cycle::<Float64Type>(values.clone()))
2835            .col("_rowid", array::step::<UInt64Type>())
2836            .into_df_exec(RowCount::from(10), BatchCount::from(100));
2837        let schema = data.schema();
2838        let sort_expr = PhysicalSortExpr::new_default(col("value", schema.as_ref()).unwrap());
2839        let plan = Arc::new(SortExec::new([sort_expr].into(), data));
2840        let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap();
2841        let stream = break_stream(stream, 64);
2842        let stream = stream.map_err(DataFusionError::from);
2843        let stream =
2844            Box::pin(RecordBatchStreamAdapter::new(schema, stream)) as SendableRecordBatchStream;
2845
2846        train_btree_index(stream, test_store.as_ref(), 64, None, None)
2847            .await
2848            .unwrap();
2849
2850        let index = BTreeIndex::load(test_store, None, &LanceCache::no_cache())
2851            .await
2852            .unwrap();
2853
2854        for (idx, value) in values.into_iter().enumerate() {
2855            let query = SargableQuery::Equals(ScalarValue::Float64(Some(value)));
2856            let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2857            assert_eq!(
2858                result,
2859                SearchResult::exact(RowAddrTreeMap::from_iter(((idx as u64)..1000).step_by(7)))
2860            );
2861        }
2862    }
2863
2864    #[tokio::test]
2865    async fn test_page_cache() {
2866        let tmpdir = TempObjDir::default();
2867        let test_store = Arc::new(LanceIndexStore::new(
2868            Arc::new(ObjectStore::local()),
2869            tmpdir.clone(),
2870            Arc::new(LanceCache::no_cache()),
2871        ));
2872
2873        let data = gen_batch()
2874            .col("value", array::step::<Float32Type>())
2875            .col("_rowid", array::step::<UInt64Type>())
2876            .into_df_exec(RowCount::from(1000), BatchCount::from(10));
2877        let schema = data.schema();
2878        let sort_expr = PhysicalSortExpr::new_default(col("value", schema.as_ref()).unwrap());
2879        let plan = Arc::new(SortExec::new([sort_expr].into(), data));
2880        let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap();
2881        let stream = break_stream(stream, 64);
2882        let stream = stream.map_err(DataFusionError::from);
2883        let stream =
2884            Box::pin(RecordBatchStreamAdapter::new(schema, stream)) as SendableRecordBatchStream;
2885
2886        train_btree_index(stream, test_store.as_ref(), 64, None, None)
2887            .await
2888            .unwrap();
2889
2890        let cache = Arc::new(LanceCache::with_capacity(100 * 1024 * 1024));
2891        let index = BTreeIndex::load(test_store, None, cache.as_ref())
2892            .await
2893            .unwrap();
2894
2895        let query = SargableQuery::Equals(ScalarValue::Float32(Some(0.0)));
2896        let metrics = LocalMetricsCollector::default();
2897        let query1 = index.search(&query, &metrics);
2898        let query2 = index.search(&query, &metrics);
2899        tokio::join!(query1, query2).0.unwrap();
2900        assert_eq!(metrics.parts_loaded.load(Ordering::Relaxed), 1);
2901    }
2902
2903    #[tokio::test]
2904    async fn test_like_prefix_search() {
2905        use arrow::datatypes::DataType;
2906        use arrow_array::StringArray;
2907
2908        let tmpdir = TempObjDir::default();
2909        let test_store = Arc::new(LanceIndexStore::new(
2910            Arc::new(ObjectStore::local()),
2911            tmpdir.clone(),
2912            Arc::new(LanceCache::no_cache()),
2913        ));
2914
2915        // Create string data with various prefixes
2916        let values = vec![
2917            "apple",
2918            "app",
2919            "application",
2920            "banana",
2921            "band",
2922            "test_ns$table1",
2923            "test_ns$table2",
2924            "test_ns2$table1",
2925            "test",
2926            "testing",
2927        ];
2928        let row_ids: Vec<u64> = (0..values.len() as u64).collect();
2929
2930        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
2931            arrow::datatypes::Field::new("value", DataType::Utf8, false),
2932            arrow::datatypes::Field::new("_rowid", DataType::UInt64, false),
2933        ]));
2934
2935        let batch = arrow::record_batch::RecordBatch::try_new(
2936            schema.clone(),
2937            vec![
2938                Arc::new(StringArray::from(values.clone())),
2939                Arc::new(arrow_array::UInt64Array::from(row_ids)),
2940            ],
2941        )
2942        .unwrap();
2943
2944        let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
2945            schema,
2946            stream::once(async { Ok(batch) }),
2947        ));
2948
2949        train_btree_index(stream, test_store.as_ref(), 100, None, None)
2950            .await
2951            .unwrap();
2952
2953        let index = BTreeIndex::load(test_store, None, &LanceCache::no_cache())
2954            .await
2955            .unwrap();
2956
2957        // Test LikePrefix for "app" - should match "apple", "app", "application" (row ids 0, 1, 2)
2958        let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("app".to_string())));
2959        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2960
2961        match &result {
2962            SearchResult::Exact(row_ids) => {
2963                let ids: Vec<u64> = row_ids
2964                    .true_rows()
2965                    .row_addrs()
2966                    .unwrap()
2967                    .map(u64::from)
2968                    .collect();
2969                assert!(ids.contains(&0), "Should contain row 0 (apple)");
2970                assert!(ids.contains(&1), "Should contain row 1 (app)");
2971                assert!(ids.contains(&2), "Should contain row 2 (application)");
2972                assert!(!ids.contains(&3), "Should not contain row 3 (banana)");
2973            }
2974            _ => panic!("Expected Exact result"),
2975        }
2976
2977        // Test LikePrefix for "test_ns$" - should match "test_ns$table1", "test_ns$table2" (row ids 5, 6)
2978        let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("test_ns$".to_string())));
2979        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2980
2981        match &result {
2982            SearchResult::Exact(row_ids) => {
2983                let ids: Vec<u64> = row_ids
2984                    .true_rows()
2985                    .row_addrs()
2986                    .unwrap()
2987                    .map(u64::from)
2988                    .collect();
2989                assert!(ids.contains(&5), "Should contain row 5 (test_ns$table1)");
2990                assert!(ids.contains(&6), "Should contain row 6 (test_ns$table2)");
2991                assert!(
2992                    !ids.contains(&7),
2993                    "Should not contain row 7 (test_ns2$table1)"
2994                );
2995            }
2996            _ => panic!("Expected Exact result"),
2997        }
2998
2999        // Test LikePrefix for "test" - should match "test", "testing", "test_ns$table1", "test_ns$table2", "test_ns2$table1"
3000        let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("test".to_string())));
3001        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3002
3003        match &result {
3004            SearchResult::Exact(row_ids) => {
3005                let ids: Vec<u64> = row_ids
3006                    .true_rows()
3007                    .row_addrs()
3008                    .unwrap()
3009                    .map(u64::from)
3010                    .collect();
3011                assert!(
3012                    ids.contains(&5),
3013                    "Should contain row 5 (test_ns$table1): {:?}",
3014                    ids
3015                );
3016                assert!(
3017                    ids.contains(&6),
3018                    "Should contain row 6 (test_ns$table2): {:?}",
3019                    ids
3020                );
3021                assert!(
3022                    ids.contains(&7),
3023                    "Should contain row 7 (test_ns2$table1): {:?}",
3024                    ids
3025                );
3026                assert!(ids.contains(&8), "Should contain row 8 (test): {:?}", ids);
3027                assert!(
3028                    ids.contains(&9),
3029                    "Should contain row 9 (testing): {:?}",
3030                    ids
3031                );
3032            }
3033            _ => panic!("Expected Exact result"),
3034        }
3035    }
3036
3037    #[tokio::test]
3038    async fn test_like_prefix_search_large_utf8() {
3039        use arrow::datatypes::DataType;
3040        use arrow_array::LargeStringArray;
3041
3042        let tmpdir = TempObjDir::default();
3043        let test_store = Arc::new(LanceIndexStore::new(
3044            Arc::new(ObjectStore::local()),
3045            tmpdir.clone(),
3046            Arc::new(LanceCache::no_cache()),
3047        ));
3048
3049        let values = vec!["apple", "app", "application", "banana"];
3050        let row_ids: Vec<u64> = (0..values.len() as u64).collect();
3051
3052        let schema = Arc::new(arrow::datatypes::Schema::new(vec![
3053            arrow::datatypes::Field::new("value", DataType::LargeUtf8, false),
3054            arrow::datatypes::Field::new("_rowid", DataType::UInt64, false),
3055        ]));
3056
3057        let batch = arrow::record_batch::RecordBatch::try_new(
3058            schema.clone(),
3059            vec![
3060                Arc::new(LargeStringArray::from(values)),
3061                Arc::new(arrow_array::UInt64Array::from(row_ids)),
3062            ],
3063        )
3064        .unwrap();
3065
3066        let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
3067            schema,
3068            stream::once(async { Ok(batch) }),
3069        ));
3070
3071        train_btree_index(stream, test_store.as_ref(), 100, None, None)
3072            .await
3073            .unwrap();
3074
3075        let index = BTreeIndex::load(test_store, None, &LanceCache::no_cache())
3076            .await
3077            .unwrap();
3078
3079        // Test LikePrefix with LargeUtf8
3080        let query = SargableQuery::LikePrefix(ScalarValue::LargeUtf8(Some("app".to_string())));
3081        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3082
3083        match &result {
3084            SearchResult::Exact(row_ids) => {
3085                let ids: Vec<u64> = row_ids
3086                    .true_rows()
3087                    .row_addrs()
3088                    .unwrap()
3089                    .map(u64::from)
3090                    .collect();
3091                assert!(ids.contains(&0), "Should contain row 0 (apple)");
3092                assert!(ids.contains(&1), "Should contain row 1 (app)");
3093                assert!(ids.contains(&2), "Should contain row 2 (application)");
3094                assert!(!ids.contains(&3), "Should not contain row 3 (banana)");
3095            }
3096            _ => panic!("Expected Exact result"),
3097        }
3098    }
3099
3100    #[tokio::test]
3101    async fn test_fragment_btree_index_consistency() {
3102        // Setup stores for both indexes
3103        let full_tmpdir = TempObjDir::default();
3104        let full_store = Arc::new(LanceIndexStore::new(
3105            Arc::new(ObjectStore::local()),
3106            full_tmpdir.clone(),
3107            Arc::new(LanceCache::no_cache()),
3108        ));
3109
3110        let fragment_tmpdir = TempObjDir::default();
3111        let fragment_store = Arc::new(LanceIndexStore::new(
3112            Arc::new(ObjectStore::local()),
3113            fragment_tmpdir.clone(),
3114            Arc::new(LanceCache::no_cache()),
3115        ));
3116
3117        // Method 1: Build complete index directly using the same data
3118        // Create deterministic data for comparison - use 2 * DEFAULT_BTREE_BATCH_SIZE for testing
3119        let total_count = 2 * DEFAULT_BTREE_BATCH_SIZE;
3120        let full_data_gen = gen_batch()
3121            .col("value", array::step::<Int32Type>())
3122            .col("_rowid", array::step::<UInt64Type>())
3123            .into_df_stream(RowCount::from(total_count / 2), BatchCount::from(2));
3124        let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
3125            full_data_gen.schema(),
3126            full_data_gen,
3127        ));
3128
3129        train_btree_index(
3130            full_data_source,
3131            full_store.as_ref(),
3132            DEFAULT_BTREE_BATCH_SIZE,
3133            None,
3134            None,
3135        )
3136        .await
3137        .unwrap();
3138
3139        // Method 2: Build fragment-based index using the same data split into fragments
3140        // Create fragment 1 index - first half of the data (0 to DEFAULT_BTREE_BATCH_SIZE-1)
3141        let half_count = DEFAULT_BTREE_BATCH_SIZE;
3142        let fragment1_gen = gen_batch()
3143            .col("value", array::step::<Int32Type>())
3144            .col("_rowid", array::step::<UInt64Type>())
3145            .into_df_stream(RowCount::from(half_count), BatchCount::from(1));
3146        let fragment1_data_source = Box::pin(RecordBatchStreamAdapter::new(
3147            fragment1_gen.schema(),
3148            fragment1_gen,
3149        ));
3150
3151        train_btree_index(
3152            fragment1_data_source,
3153            fragment_store.as_ref(),
3154            DEFAULT_BTREE_BATCH_SIZE,
3155            Some(vec![1]), // fragment_id = 1
3156            None,
3157        )
3158        .await
3159        .unwrap();
3160
3161        // Create fragment 2 index - second half of the data (DEFAULT_BTREE_BATCH_SIZE to 2*DEFAULT_BTREE_BATCH_SIZE-1)
3162        let start_val = DEFAULT_BTREE_BATCH_SIZE as i32;
3163        let end_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3164        let values_second_half: Vec<i32> = (start_val..end_val).collect();
3165        let row_ids_second_half: Vec<u64> = (start_val as u64..end_val as u64).collect();
3166        let fragment2_gen = gen_batch()
3167            .col("value", array::cycle::<Int32Type>(values_second_half))
3168            .col("_rowid", array::cycle::<UInt64Type>(row_ids_second_half))
3169            .into_df_stream(RowCount::from(half_count), BatchCount::from(1));
3170        let fragment2_data_source = Box::pin(RecordBatchStreamAdapter::new(
3171            fragment2_gen.schema(),
3172            fragment2_gen,
3173        ));
3174
3175        train_btree_index(
3176            fragment2_data_source,
3177            fragment_store.as_ref(),
3178            DEFAULT_BTREE_BATCH_SIZE,
3179            Some(vec![2]), // fragment_id = 2
3180            None,
3181        )
3182        .await
3183        .unwrap();
3184
3185        // Merge the fragment files
3186        let part_page_files = vec![
3187            part_page_data_file_path(1 << 32),
3188            part_page_data_file_path(2 << 32),
3189        ];
3190
3191        let part_lookup_files = vec![
3192            part_lookup_file_path(1 << 32),
3193            part_lookup_file_path(2 << 32),
3194        ];
3195
3196        let progress = Arc::new(RecordingProgress::default());
3197        super::merge_metadata_files(
3198            fragment_store.as_ref(),
3199            &part_page_files,
3200            &part_lookup_files,
3201            Option::from(1usize),
3202            progress.clone(),
3203        )
3204        .await
3205        .unwrap();
3206
3207        let tags = progress
3208            .recorded_events()
3209            .iter()
3210            .map(|(kind, stage, _)| format!("{kind}:{stage}"))
3211            .collect::<Vec<_>>();
3212        let merge_start = tags
3213            .iter()
3214            .position(|e| e == "start:merge_pages")
3215            .expect("missing merge_pages start");
3216        let merge_complete = tags
3217            .iter()
3218            .position(|e| e == "complete:merge_pages")
3219            .expect("missing merge_pages complete");
3220        let lookup_start = tags
3221            .iter()
3222            .position(|e| e == "start:write_lookup_file")
3223            .expect("missing write_lookup_file start");
3224        let lookup_complete = tags
3225            .iter()
3226            .position(|e| e == "complete:write_lookup_file")
3227            .expect("missing write_lookup_file complete");
3228        assert!(merge_start < merge_complete);
3229        assert!(merge_complete < lookup_start);
3230        assert!(lookup_start < lookup_complete);
3231        assert!(
3232            tags.iter().any(|e| e == "progress:merge_pages"),
3233            "expected merge_pages progress callbacks"
3234        );
3235        assert!(
3236            tags.iter().any(|e| e == "progress:write_lookup_file"),
3237            "expected write_lookup_file progress callbacks"
3238        );
3239
3240        // Load both indexes
3241        let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
3242            .await
3243            .unwrap();
3244
3245        let merged_index = BTreeIndex::load(fragment_store.clone(), None, &LanceCache::no_cache())
3246            .await
3247            .unwrap();
3248
3249        // Test queries one by one to identify the exact problem
3250
3251        // Test 1: Query for value 0 (should be in first page)
3252        let query_0 = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
3253        let full_result_0 = full_index
3254            .search(&query_0, &NoOpMetricsCollector)
3255            .await
3256            .unwrap();
3257        let merged_result_0 = merged_index
3258            .search(&query_0, &NoOpMetricsCollector)
3259            .await
3260            .unwrap();
3261        assert_eq!(full_result_0, merged_result_0, "Query for value 0 failed");
3262
3263        // Test 2: Query for value in middle of first batch (should be in first page)
3264        let mid_first_batch = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
3265        let query_mid_first = SargableQuery::Equals(ScalarValue::Int32(Some(mid_first_batch)));
3266        let full_result_mid_first = full_index
3267            .search(&query_mid_first, &NoOpMetricsCollector)
3268            .await
3269            .unwrap();
3270        let merged_result_mid_first = merged_index
3271            .search(&query_mid_first, &NoOpMetricsCollector)
3272            .await
3273            .unwrap();
3274        assert_eq!(
3275            full_result_mid_first, merged_result_mid_first,
3276            "Query for value {} failed",
3277            mid_first_batch
3278        );
3279
3280        // Test 3: Query for first value in second batch (should be in second page)
3281        let first_second_batch = DEFAULT_BTREE_BATCH_SIZE as i32;
3282        let query_first_second =
3283            SargableQuery::Equals(ScalarValue::Int32(Some(first_second_batch)));
3284        let full_result_first_second = full_index
3285            .search(&query_first_second, &NoOpMetricsCollector)
3286            .await
3287            .unwrap();
3288        let merged_result_first_second = merged_index
3289            .search(&query_first_second, &NoOpMetricsCollector)
3290            .await
3291            .unwrap();
3292        assert_eq!(
3293            full_result_first_second, merged_result_first_second,
3294            "Query for value {} failed",
3295            first_second_batch
3296        );
3297
3298        // Test 4: Query for value in middle of second batch (should be in second page)
3299        let mid_second_batch = (DEFAULT_BTREE_BATCH_SIZE + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
3300        let query_mid_second = SargableQuery::Equals(ScalarValue::Int32(Some(mid_second_batch)));
3301
3302        let full_result_mid_second = full_index
3303            .search(&query_mid_second, &NoOpMetricsCollector)
3304            .await
3305            .unwrap();
3306        let merged_result_mid_second = merged_index
3307            .search(&query_mid_second, &NoOpMetricsCollector)
3308            .await
3309            .unwrap();
3310        assert_eq!(
3311            full_result_mid_second, merged_result_mid_second,
3312            "Query for value {} failed",
3313            mid_second_batch
3314        );
3315    }
3316
3317    #[tokio::test]
3318    async fn test_fragment_btree_index_boundary_queries() {
3319        // Setup stores for both indexes
3320        let full_tmpdir = TempObjDir::default();
3321        let full_store = Arc::new(LanceIndexStore::new(
3322            Arc::new(ObjectStore::local()),
3323            full_tmpdir.clone(),
3324            Arc::new(LanceCache::no_cache()),
3325        ));
3326
3327        let fragment_tmpdir = TempObjDir::default();
3328        let fragment_store = Arc::new(LanceIndexStore::new(
3329            Arc::new(ObjectStore::local()),
3330            fragment_tmpdir.clone(),
3331            Arc::new(LanceCache::no_cache()),
3332        ));
3333
3334        // Use 3 * DEFAULT_BTREE_BATCH_SIZE for more comprehensive boundary testing
3335        let total_count = 3 * DEFAULT_BTREE_BATCH_SIZE;
3336
3337        // Method 1: Build complete index directly
3338        let full_data_gen = gen_batch()
3339            .col("value", array::step::<Int32Type>())
3340            .col("_rowid", array::step::<UInt64Type>())
3341            .into_df_stream(RowCount::from(total_count / 3), BatchCount::from(3));
3342        let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
3343            full_data_gen.schema(),
3344            full_data_gen,
3345        ));
3346
3347        train_btree_index(
3348            full_data_source,
3349            full_store.as_ref(),
3350            DEFAULT_BTREE_BATCH_SIZE,
3351            None,
3352            None,
3353        )
3354        .await
3355        .unwrap();
3356
3357        // Method 2: Build fragment-based index using 3 fragments
3358        // Fragment 1: 0 to DEFAULT_BTREE_BATCH_SIZE-1
3359        let fragment_size = DEFAULT_BTREE_BATCH_SIZE;
3360        let fragment1_gen = gen_batch()
3361            .col("value", array::step::<Int32Type>())
3362            .col("_rowid", array::step::<UInt64Type>())
3363            .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
3364        let fragment1_data_source = Box::pin(RecordBatchStreamAdapter::new(
3365            fragment1_gen.schema(),
3366            fragment1_gen,
3367        ));
3368
3369        train_btree_index(
3370            fragment1_data_source,
3371            fragment_store.as_ref(),
3372            DEFAULT_BTREE_BATCH_SIZE,
3373            Some(vec![1]),
3374            None,
3375        )
3376        .await
3377        .unwrap();
3378
3379        // Fragment 2: DEFAULT_BTREE_BATCH_SIZE to 2*DEFAULT_BTREE_BATCH_SIZE-1
3380        let start_val2 = DEFAULT_BTREE_BATCH_SIZE as i32;
3381        let end_val2 = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3382        let values_fragment2: Vec<i32> = (start_val2..end_val2).collect();
3383        let row_ids_fragment2: Vec<u64> = (start_val2 as u64..end_val2 as u64).collect();
3384        let fragment2_gen = gen_batch()
3385            .col("value", array::cycle::<Int32Type>(values_fragment2))
3386            .col("_rowid", array::cycle::<UInt64Type>(row_ids_fragment2))
3387            .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
3388        let fragment2_data_source = Box::pin(RecordBatchStreamAdapter::new(
3389            fragment2_gen.schema(),
3390            fragment2_gen,
3391        ));
3392
3393        train_btree_index(
3394            fragment2_data_source,
3395            fragment_store.as_ref(),
3396            DEFAULT_BTREE_BATCH_SIZE,
3397            Some(vec![2]),
3398            None,
3399        )
3400        .await
3401        .unwrap();
3402
3403        // Fragment 3: 2*DEFAULT_BTREE_BATCH_SIZE to 3*DEFAULT_BTREE_BATCH_SIZE-1
3404        let start_val3 = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3405        let end_val3 = (3 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3406        let values_fragment3: Vec<i32> = (start_val3..end_val3).collect();
3407        let row_ids_fragment3: Vec<u64> = (start_val3 as u64..end_val3 as u64).collect();
3408        let fragment3_gen = gen_batch()
3409            .col("value", array::cycle::<Int32Type>(values_fragment3))
3410            .col("_rowid", array::cycle::<UInt64Type>(row_ids_fragment3))
3411            .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
3412        let fragment3_data_source = Box::pin(RecordBatchStreamAdapter::new(
3413            fragment3_gen.schema(),
3414            fragment3_gen,
3415        ));
3416
3417        train_btree_index(
3418            fragment3_data_source,
3419            fragment_store.as_ref(),
3420            DEFAULT_BTREE_BATCH_SIZE,
3421            Some(vec![3]),
3422            None,
3423        )
3424        .await
3425        .unwrap();
3426
3427        // Merge all fragment files
3428        let part_page_files = vec![
3429            part_page_data_file_path(1 << 32),
3430            part_page_data_file_path(2 << 32),
3431            part_page_data_file_path(3 << 32),
3432        ];
3433
3434        let part_lookup_files = vec![
3435            part_lookup_file_path(1 << 32),
3436            part_lookup_file_path(2 << 32),
3437            part_lookup_file_path(3 << 32),
3438        ];
3439
3440        super::merge_metadata_files(
3441            fragment_store.as_ref(),
3442            &part_page_files,
3443            &part_lookup_files,
3444            Option::from(1usize),
3445            noop_progress(),
3446        )
3447        .await
3448        .unwrap();
3449
3450        // Load both indexes
3451        let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
3452            .await
3453            .unwrap();
3454
3455        let merged_index = BTreeIndex::load(fragment_store.clone(), None, &LanceCache::no_cache())
3456            .await
3457            .unwrap();
3458
3459        // === Boundary Value Tests ===
3460
3461        // Test 1: Query minimum value (boundary: data start)
3462        let query_min = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
3463        let full_result_min = full_index
3464            .search(&query_min, &NoOpMetricsCollector)
3465            .await
3466            .unwrap();
3467        let merged_result_min = merged_index
3468            .search(&query_min, &NoOpMetricsCollector)
3469            .await
3470            .unwrap();
3471        assert_eq!(
3472            full_result_min, merged_result_min,
3473            "Query for minimum value 0 failed"
3474        );
3475
3476        // Test 2: Query maximum value (boundary: data end)
3477        let max_val = (3 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
3478        let query_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val)));
3479        let full_result_max = full_index
3480            .search(&query_max, &NoOpMetricsCollector)
3481            .await
3482            .unwrap();
3483        let merged_result_max = merged_index
3484            .search(&query_max, &NoOpMetricsCollector)
3485            .await
3486            .unwrap();
3487        assert_eq!(
3488            full_result_max, merged_result_max,
3489            "Query for maximum value {} failed",
3490            max_val
3491        );
3492
3493        // Test 3: Query fragment boundary value (last value of first fragment)
3494        let fragment1_last = (DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
3495        let query_frag1_last = SargableQuery::Equals(ScalarValue::Int32(Some(fragment1_last)));
3496        let full_result_frag1_last = full_index
3497            .search(&query_frag1_last, &NoOpMetricsCollector)
3498            .await
3499            .unwrap();
3500        let merged_result_frag1_last = merged_index
3501            .search(&query_frag1_last, &NoOpMetricsCollector)
3502            .await
3503            .unwrap();
3504        assert_eq!(
3505            full_result_frag1_last, merged_result_frag1_last,
3506            "Query for fragment 1 last value {} failed",
3507            fragment1_last
3508        );
3509
3510        // Test 4: Query fragment boundary value (first value of second fragment)
3511        let fragment2_first = DEFAULT_BTREE_BATCH_SIZE as i32;
3512        let query_frag2_first = SargableQuery::Equals(ScalarValue::Int32(Some(fragment2_first)));
3513        let full_result_frag2_first = full_index
3514            .search(&query_frag2_first, &NoOpMetricsCollector)
3515            .await
3516            .unwrap();
3517        let merged_result_frag2_first = merged_index
3518            .search(&query_frag2_first, &NoOpMetricsCollector)
3519            .await
3520            .unwrap();
3521        assert_eq!(
3522            full_result_frag2_first, merged_result_frag2_first,
3523            "Query for fragment 2 first value {} failed",
3524            fragment2_first
3525        );
3526
3527        // Test 5: Query fragment boundary value (last value of second fragment)
3528        let fragment2_last = (2 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
3529        let query_frag2_last = SargableQuery::Equals(ScalarValue::Int32(Some(fragment2_last)));
3530        let full_result_frag2_last = full_index
3531            .search(&query_frag2_last, &NoOpMetricsCollector)
3532            .await
3533            .unwrap();
3534        let merged_result_frag2_last = merged_index
3535            .search(&query_frag2_last, &NoOpMetricsCollector)
3536            .await
3537            .unwrap();
3538        assert_eq!(
3539            full_result_frag2_last, merged_result_frag2_last,
3540            "Query for fragment 2 last value {} failed",
3541            fragment2_last
3542        );
3543
3544        // Test 6: Query fragment boundary value (first value of third fragment)
3545        let fragment3_first = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3546        let query_frag3_first = SargableQuery::Equals(ScalarValue::Int32(Some(fragment3_first)));
3547        let full_result_frag3_first = full_index
3548            .search(&query_frag3_first, &NoOpMetricsCollector)
3549            .await
3550            .unwrap();
3551        let merged_result_frag3_first = merged_index
3552            .search(&query_frag3_first, &NoOpMetricsCollector)
3553            .await
3554            .unwrap();
3555        assert_eq!(
3556            full_result_frag3_first, merged_result_frag3_first,
3557            "Query for fragment 3 first value {} failed",
3558            fragment3_first
3559        );
3560
3561        // === Non-existent Value Tests ===
3562
3563        // Test 7: Query value below minimum
3564        let query_below_min = SargableQuery::Equals(ScalarValue::Int32(Some(-1)));
3565        let full_result_below = full_index
3566            .search(&query_below_min, &NoOpMetricsCollector)
3567            .await
3568            .unwrap();
3569        let merged_result_below = merged_index
3570            .search(&query_below_min, &NoOpMetricsCollector)
3571            .await
3572            .unwrap();
3573        assert_eq!(
3574            full_result_below, merged_result_below,
3575            "Query for value below minimum (-1) failed"
3576        );
3577
3578        // Test 8: Query value above maximum
3579        let query_above_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val + 1)));
3580        let full_result_above = full_index
3581            .search(&query_above_max, &NoOpMetricsCollector)
3582            .await
3583            .unwrap();
3584        let merged_result_above = merged_index
3585            .search(&query_above_max, &NoOpMetricsCollector)
3586            .await
3587            .unwrap();
3588        assert_eq!(
3589            full_result_above,
3590            merged_result_above,
3591            "Query for value above maximum ({}) failed",
3592            max_val + 1
3593        );
3594
3595        // === Range Query Tests ===
3596
3597        // Test 9: Cross-fragment range query (from first fragment to second fragment)
3598        let range_start = (DEFAULT_BTREE_BATCH_SIZE - 100) as i32;
3599        let range_end = (DEFAULT_BTREE_BATCH_SIZE + 100) as i32;
3600        let query_cross_frag = SargableQuery::Range(
3601            std::collections::Bound::Included(ScalarValue::Int32(Some(range_start))),
3602            std::collections::Bound::Excluded(ScalarValue::Int32(Some(range_end))),
3603        );
3604        let full_result_cross = full_index
3605            .search(&query_cross_frag, &NoOpMetricsCollector)
3606            .await
3607            .unwrap();
3608        let merged_result_cross = merged_index
3609            .search(&query_cross_frag, &NoOpMetricsCollector)
3610            .await
3611            .unwrap();
3612        assert_eq!(
3613            full_result_cross, merged_result_cross,
3614            "Cross-fragment range query [{}, {}] failed",
3615            range_start, range_end
3616        );
3617
3618        // Test 10: Range query within single fragment
3619        let single_frag_start = 100i32;
3620        let single_frag_end = 200i32;
3621        let query_single_frag = SargableQuery::Range(
3622            std::collections::Bound::Included(ScalarValue::Int32(Some(single_frag_start))),
3623            std::collections::Bound::Excluded(ScalarValue::Int32(Some(single_frag_end))),
3624        );
3625        let full_result_single = full_index
3626            .search(&query_single_frag, &NoOpMetricsCollector)
3627            .await
3628            .unwrap();
3629        let merged_result_single = merged_index
3630            .search(&query_single_frag, &NoOpMetricsCollector)
3631            .await
3632            .unwrap();
3633        assert_eq!(
3634            full_result_single, merged_result_single,
3635            "Single fragment range query [{}, {}] failed",
3636            single_frag_start, single_frag_end
3637        );
3638
3639        // Test 11: Large range query spanning all fragments
3640        let large_range_start = 100i32;
3641        let large_range_end = (3 * DEFAULT_BTREE_BATCH_SIZE - 100) as i32;
3642        let query_large_range = SargableQuery::Range(
3643            std::collections::Bound::Included(ScalarValue::Int32(Some(large_range_start))),
3644            std::collections::Bound::Excluded(ScalarValue::Int32(Some(large_range_end))),
3645        );
3646        let full_result_large = full_index
3647            .search(&query_large_range, &NoOpMetricsCollector)
3648            .await
3649            .unwrap();
3650        let merged_result_large = merged_index
3651            .search(&query_large_range, &NoOpMetricsCollector)
3652            .await
3653            .unwrap();
3654        assert_eq!(
3655            full_result_large, merged_result_large,
3656            "Large range query [{}, {}] failed",
3657            large_range_start, large_range_end
3658        );
3659
3660        // === Range Boundary Query Tests ===
3661
3662        // Test 12: Less than query (implemented using range query, from minimum to specified value)
3663        let lt_val = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
3664        let query_lt = SargableQuery::Range(
3665            std::collections::Bound::Included(ScalarValue::Int32(Some(0))),
3666            std::collections::Bound::Excluded(ScalarValue::Int32(Some(lt_val))),
3667        );
3668        let full_result_lt = full_index
3669            .search(&query_lt, &NoOpMetricsCollector)
3670            .await
3671            .unwrap();
3672        let merged_result_lt = merged_index
3673            .search(&query_lt, &NoOpMetricsCollector)
3674            .await
3675            .unwrap();
3676        assert_eq!(
3677            full_result_lt, merged_result_lt,
3678            "Less than query (<{}) failed",
3679            lt_val
3680        );
3681
3682        // Test 13: Greater than query (implemented using range query, from specified value to maximum)
3683        let gt_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3684        let max_range_val = (3 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3685        let query_gt = SargableQuery::Range(
3686            std::collections::Bound::Excluded(ScalarValue::Int32(Some(gt_val))),
3687            std::collections::Bound::Excluded(ScalarValue::Int32(Some(max_range_val))),
3688        );
3689        let full_result_gt = full_index
3690            .search(&query_gt, &NoOpMetricsCollector)
3691            .await
3692            .unwrap();
3693        let merged_result_gt = merged_index
3694            .search(&query_gt, &NoOpMetricsCollector)
3695            .await
3696            .unwrap();
3697        assert_eq!(
3698            full_result_gt, merged_result_gt,
3699            "Greater than query (>{}) failed",
3700            gt_val
3701        );
3702
3703        // Test 14: Less than or equal query (implemented using range query, including boundary value)
3704        let lte_val = (DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
3705        let query_lte = SargableQuery::Range(
3706            std::collections::Bound::Included(ScalarValue::Int32(Some(0))),
3707            std::collections::Bound::Included(ScalarValue::Int32(Some(lte_val))),
3708        );
3709        let full_result_lte = full_index
3710            .search(&query_lte, &NoOpMetricsCollector)
3711            .await
3712            .unwrap();
3713        let merged_result_lte = merged_index
3714            .search(&query_lte, &NoOpMetricsCollector)
3715            .await
3716            .unwrap();
3717        assert_eq!(
3718            full_result_lte, merged_result_lte,
3719            "Less than or equal query (<={}) failed",
3720            lte_val
3721        );
3722
3723        // Test 15: Greater than or equal query (implemented using range query, including boundary value)
3724        let gte_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3725        let query_gte = SargableQuery::Range(
3726            std::collections::Bound::Included(ScalarValue::Int32(Some(gte_val))),
3727            std::collections::Bound::Excluded(ScalarValue::Int32(Some(max_range_val))),
3728        );
3729        let full_result_gte = full_index
3730            .search(&query_gte, &NoOpMetricsCollector)
3731            .await
3732            .unwrap();
3733        let merged_result_gte = merged_index
3734            .search(&query_gte, &NoOpMetricsCollector)
3735            .await
3736            .unwrap();
3737        assert_eq!(
3738            full_result_gte, merged_result_gte,
3739            "Greater than or equal query (>={}) failed",
3740            gte_val
3741        );
3742    }
3743
3744    #[test]
3745    fn test_extract_partition_id() {
3746        // Test valid partition file names
3747        assert_eq!(
3748            super::extract_partition_id("part_123_page_data.lance").unwrap(),
3749            123
3750        );
3751        assert_eq!(
3752            super::extract_partition_id("part_456_page_lookup.lance").unwrap(),
3753            456
3754        );
3755        assert_eq!(
3756            super::extract_partition_id("part_4294967296_page_data.lance").unwrap(),
3757            4294967296
3758        );
3759
3760        // Test invalid file names
3761        assert!(super::extract_partition_id("invalid_filename.lance").is_err());
3762        assert!(super::extract_partition_id("part_abc_page_data.lance").is_err());
3763        assert!(super::extract_partition_id("part_123").is_err());
3764        assert!(super::extract_partition_id("part_").is_err());
3765    }
3766
3767    #[tokio::test]
3768    async fn test_cleanup_partition_files() {
3769        // Create a test store
3770        let tmpdir = TempObjDir::default();
3771        let test_store: Arc<dyn crate::scalar::IndexStore> = Arc::new(LanceIndexStore::new(
3772            Arc::new(ObjectStore::local()),
3773            tmpdir.clone(),
3774            Arc::new(LanceCache::no_cache()),
3775        ));
3776
3777        // Test files with different patterns
3778        let lookup_files = vec![
3779            "part_123_page_lookup.lance".to_string(),
3780            "invalid_lookup_file.lance".to_string(),
3781            "part_456_page_lookup.lance".to_string(),
3782        ];
3783
3784        let page_files = vec![
3785            "part_123_page_data.lance".to_string(),
3786            "invalid_page_file.lance".to_string(),
3787            "part_456_page_data.lance".to_string(),
3788        ];
3789
3790        // The cleanup function should handle both valid and invalid file patterns gracefully
3791        // This test mainly verifies that the function doesn't panic and handles edge cases
3792        super::cleanup_partition_files(test_store.as_ref(), &lookup_files, &page_files).await;
3793    }
3794
3795    #[tokio::test]
3796    async fn test_btree_null_handling_in_queries() {
3797        let store = Arc::new(LanceIndexStore::new(
3798            Arc::new(ObjectStore::memory()),
3799            Path::default(),
3800            Arc::new(LanceCache::no_cache()),
3801        ));
3802
3803        // Create test data: [null, 0, 5] at row IDs [0, 1, 2]
3804        // BTree expects sorted data with nulls first (or filtered out)
3805        let batch = record_batch!(
3806            ("value", Int32, [None, Some(0), Some(5)]),
3807            ("_rowid", UInt64, [0, 1, 2])
3808        )
3809        .unwrap();
3810        let stream = stream::once(futures::future::ok(batch.clone()));
3811        let stream = Box::pin(RecordBatchStreamAdapter::new(batch.schema(), stream));
3812
3813        // Train the btree index with FlatIndexMetadata as sub-index
3814        super::train_btree_index(stream, store.as_ref(), 256, None, None)
3815            .await
3816            .unwrap();
3817
3818        let cache = LanceCache::with_capacity(1024 * 1024);
3819        let index = super::BTreeIndex::load(store.clone(), None, &cache)
3820            .await
3821            .unwrap();
3822
3823        // Test 1: Search for value 5 - should return allow=[2], null=[0]
3824        let query = SargableQuery::Equals(ScalarValue::Int32(Some(5)));
3825        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3826
3827        match result {
3828            SearchResult::Exact(row_ids) => {
3829                let actual_rows: Vec<u64> = row_ids
3830                    .true_rows()
3831                    .row_addrs()
3832                    .unwrap()
3833                    .map(u64::from)
3834                    .collect();
3835                assert_eq!(actual_rows, vec![2], "Should find row 2 where value == 5");
3836
3837                // Check that null_row_ids contains row 0
3838                let null_row_ids = row_ids.null_rows();
3839                assert!(!null_row_ids.is_empty(), "null_row_ids should be non-empty");
3840                let null_rows: Vec<u64> =
3841                    null_row_ids.row_addrs().unwrap().map(u64::from).collect();
3842                assert_eq!(null_rows, vec![0], "Should report row 0 as null");
3843            }
3844            _ => panic!("Expected Exact search result"),
3845        }
3846
3847        // Test 2: Range query [0, 3] - should return allow=[1], null=[0]
3848        let query = SargableQuery::Range(
3849            std::ops::Bound::Included(ScalarValue::Int32(Some(0))),
3850            std::ops::Bound::Included(ScalarValue::Int32(Some(3))),
3851        );
3852        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3853
3854        match result {
3855            SearchResult::Exact(row_ids) => {
3856                let actual_rows: Vec<u64> = row_ids
3857                    .true_rows()
3858                    .row_addrs()
3859                    .unwrap()
3860                    .map(u64::from)
3861                    .collect();
3862                assert_eq!(actual_rows, vec![1], "Should find row 1 where value == 0");
3863
3864                // Should report row 0 as null
3865                let null_row_ids = row_ids.null_rows();
3866                assert!(!null_row_ids.is_empty(), "null_row_ids should be non-empty");
3867                let null_rows: Vec<u64> =
3868                    null_row_ids.row_addrs().unwrap().map(u64::from).collect();
3869                assert_eq!(null_rows, vec![0], "Should report row 0 as null");
3870            }
3871            _ => panic!("Expected Exact search result"),
3872        }
3873
3874        // Test 3: IsIn query [0, 5] - should return allow=[1, 2], null=[0]
3875        let query = SargableQuery::IsIn(vec![
3876            ScalarValue::Int32(Some(0)),
3877            ScalarValue::Int32(Some(5)),
3878        ]);
3879        let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3880
3881        match result {
3882            SearchResult::Exact(row_ids) => {
3883                let mut actual_rows: Vec<u64> = row_ids
3884                    .true_rows()
3885                    .row_addrs()
3886                    .unwrap()
3887                    .map(u64::from)
3888                    .collect();
3889                actual_rows.sort();
3890                assert_eq!(
3891                    actual_rows,
3892                    vec![1, 2],
3893                    "Should find rows 1 and 2 where value in [0, 5]"
3894                );
3895
3896                // Should report row 0 as null
3897                let null_row_ids = row_ids.null_rows();
3898                assert!(!null_row_ids.is_empty(), "null_row_ids should be non-empty");
3899                let null_rows: Vec<u64> =
3900                    null_row_ids.row_addrs().unwrap().map(u64::from).collect();
3901                assert_eq!(null_rows, vec![0], "Should report row 0 as null");
3902            }
3903            _ => panic!("Expected Exact search result"),
3904        }
3905    }
3906
3907    #[tokio::test]
3908    async fn test_range_btree_index_consistency() {
3909        // Setup stores for both indexes
3910        let full_tmpdir = TempObjDir::default();
3911        let full_store = Arc::new(LanceIndexStore::new(
3912            Arc::new(ObjectStore::local()),
3913            full_tmpdir.clone(),
3914            Arc::new(LanceCache::no_cache()),
3915        ));
3916
3917        let range_tmpdir = TempObjDir::default();
3918        let range_store = Arc::new(LanceIndexStore::new(
3919            Arc::new(ObjectStore::local()),
3920            range_tmpdir.clone(),
3921            Arc::new(LanceCache::no_cache()),
3922        ));
3923
3924        // Method 1: Build complete index directly using the same data
3925        // Create deterministic data for comparison - use 4 * DEFAULT_BTREE_BATCH_SIZE for testing
3926        let total_count = 4 * DEFAULT_BTREE_BATCH_SIZE;
3927        let full_data_gen = gen_batch()
3928            .col("value", array::step::<Int32Type>())
3929            .col("_rowid", array::step::<UInt64Type>())
3930            .into_df_stream(RowCount::from(total_count / 4), BatchCount::from(4));
3931        let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
3932            full_data_gen.schema(),
3933            full_data_gen,
3934        ));
3935
3936        train_btree_index(
3937            full_data_source,
3938            full_store.as_ref(),
3939            DEFAULT_BTREE_BATCH_SIZE,
3940            None,
3941            None,
3942        )
3943        .await
3944        .unwrap();
3945
3946        // Method 2: Build range-based index using the same data split into ranges
3947        // Create range 1 index, intentionally make it not divisible by DEFAULT_BTREE_BATCH_SIZE
3948        let range1_gen = gen_batch()
3949            .col("value", array::step::<Int32Type>())
3950            .col("_rowid", array::step::<UInt64Type>())
3951            .into_df_stream(
3952                RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
3953                BatchCount::from(5),
3954            );
3955        let range1_data_source = Box::pin(RecordBatchStreamAdapter::new(
3956            range1_gen.schema(),
3957            range1_gen,
3958        ));
3959
3960        train_btree_index(
3961            range1_data_source,
3962            range_store.as_ref(),
3963            DEFAULT_BTREE_BATCH_SIZE,
3964            None,
3965            Option::from(0u32),
3966        )
3967        .await
3968        .unwrap();
3969
3970        // Create range 2 index, also intentionally make it not divisible by DEFAULT_BTREE_BATCH_SIZE
3971        let start_val = (DEFAULT_BTREE_BATCH_SIZE * 2 + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
3972        let end_val = (4 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3973        let values_second_half: Vec<i32> = (start_val..end_val).collect();
3974        let row_ids_second_half: Vec<u64> = (start_val as u64..end_val as u64).collect();
3975        let range2_gen = gen_batch()
3976            .col("value", array::cycle::<Int32Type>(values_second_half))
3977            .col("_rowid", array::cycle::<UInt64Type>(row_ids_second_half))
3978            .into_df_stream(
3979                RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
3980                BatchCount::from(3),
3981            );
3982        let range2_data_source = Box::pin(RecordBatchStreamAdapter::new(
3983            range2_gen.schema(),
3984            range2_gen,
3985        ));
3986
3987        train_btree_index(
3988            range2_data_source,
3989            range_store.as_ref(),
3990            DEFAULT_BTREE_BATCH_SIZE,
3991            None,
3992            Option::from(1u32),
3993        )
3994        .await
3995        .unwrap();
3996
3997        // Merge the fragment files
3998        let part_page_files = vec![
3999            part_page_data_file_path(0 << 32),
4000            part_page_data_file_path(1 << 32),
4001        ];
4002
4003        let part_lookup_files = vec![
4004            part_lookup_file_path(0 << 32),
4005            part_lookup_file_path(1 << 32),
4006        ];
4007
4008        super::merge_metadata_files(
4009            range_store.as_ref(),
4010            &part_page_files,
4011            &part_lookup_files,
4012            Option::from(1usize),
4013            noop_progress(),
4014        )
4015        .await
4016        .unwrap();
4017
4018        let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
4019            .await
4020            .unwrap();
4021
4022        let ranged_index = BTreeIndex::load(range_store.clone(), None, &LanceCache::no_cache())
4023            .await
4024            .unwrap();
4025
4026        // Equality Tests
4027
4028        // Test 1: Query for value 0
4029        let query_0 = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
4030        let full_result_0 = full_index
4031            .search(&query_0, &NoOpMetricsCollector)
4032            .await
4033            .unwrap();
4034        let ranged_result_0 = ranged_index
4035            .search(&query_0, &NoOpMetricsCollector)
4036            .await
4037            .unwrap();
4038        assert_eq!(full_result_0, ranged_result_0, "Query for value 0 failed");
4039
4040        // Test 2: Query for value in middle of first batch (should be in first page)
4041        let mid_first_batch = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
4042        let query_mid_first = SargableQuery::Equals(ScalarValue::Int32(Some(mid_first_batch)));
4043        let full_result_mid_first = full_index
4044            .search(&query_mid_first, &NoOpMetricsCollector)
4045            .await
4046            .unwrap();
4047        let ranged_result_mid_first = ranged_index
4048            .search(&query_mid_first, &NoOpMetricsCollector)
4049            .await
4050            .unwrap();
4051        assert_eq!(
4052            full_result_mid_first, ranged_result_mid_first,
4053            "Query for value {} failed",
4054            mid_first_batch
4055        );
4056
4057        // Test 3: Query for value in the last batch (should be in the second range file)
4058        let mid_last_batch = (DEFAULT_BTREE_BATCH_SIZE * 3 + (DEFAULT_BTREE_BATCH_SIZE / 2)) as i32;
4059        let query_mid_last = SargableQuery::Equals(ScalarValue::Int32(Some(mid_last_batch)));
4060        let full_result_mid_last = full_index
4061            .search(&query_mid_last, &NoOpMetricsCollector)
4062            .await
4063            .unwrap();
4064        let ranged_result_mid_last = ranged_index
4065            .search(&query_mid_last, &NoOpMetricsCollector)
4066            .await
4067            .unwrap();
4068        assert_eq!(
4069            full_result_mid_last, ranged_result_mid_last,
4070            "Query for value {} failed",
4071            mid_last_batch
4072        );
4073
4074        // Test 4: Query upper bound.
4075        let max_val = (4 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
4076        let query_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val)));
4077        let full_result_max = full_index
4078            .search(&query_max, &NoOpMetricsCollector)
4079            .await
4080            .unwrap();
4081        let ranged_result_max = ranged_index
4082            .search(&query_max, &NoOpMetricsCollector)
4083            .await
4084            .unwrap();
4085        assert_eq!(
4086            full_result_max, ranged_result_max,
4087            "Query for maximum value {} failed",
4088            max_val
4089        );
4090
4091        // Test 5: Query first value of the second page file.
4092        let second_first_val = (DEFAULT_BTREE_BATCH_SIZE * 2 + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
4093        let query_second_first = SargableQuery::Equals(ScalarValue::Int32(Some(second_first_val)));
4094        let full_result_second_first = full_index
4095            .search(&query_second_first, &NoOpMetricsCollector)
4096            .await
4097            .unwrap();
4098        let ranged_result_second_first = ranged_index
4099            .search(&query_second_first, &NoOpMetricsCollector)
4100            .await
4101            .unwrap();
4102        assert_eq!(
4103            full_result_second_first, ranged_result_second_first,
4104            "Query for first value of the second page file {} failed",
4105            second_first_val
4106        );
4107
4108        // Test 6: Query value below the minimum
4109        let query_below_min = SargableQuery::Equals(ScalarValue::Int32(Some(-1)));
4110        let full_result_below = full_index
4111            .search(&query_below_min, &NoOpMetricsCollector)
4112            .await
4113            .unwrap();
4114        let ranged_result_below = ranged_index
4115            .search(&query_below_min, &NoOpMetricsCollector)
4116            .await
4117            .unwrap();
4118        assert_eq!(
4119            full_result_below, ranged_result_below,
4120            "Query for value below minimum (-1) failed"
4121        );
4122
4123        // Test 7: Query value above the maximum
4124        let query_above_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val + 1)));
4125        let full_result_above = full_index
4126            .search(&query_above_max, &NoOpMetricsCollector)
4127            .await
4128            .unwrap();
4129        let ranged_result_above = ranged_index
4130            .search(&query_above_max, &NoOpMetricsCollector)
4131            .await
4132            .unwrap();
4133        assert_eq!(
4134            full_result_above,
4135            ranged_result_above,
4136            "Query for value above maximum ({}) failed",
4137            max_val + 1
4138        );
4139
4140        // Range Tests
4141
4142        // Test 8: Cross-range query: One range including different values from adjacent range files.
4143        let range_start =
4144            (DEFAULT_BTREE_BATCH_SIZE * 2 + DEFAULT_BTREE_BATCH_SIZE / 2 - 100) as i32;
4145        let range_end = range_start + 200;
4146        let query_cross_range = SargableQuery::Range(
4147            std::collections::Bound::Included(ScalarValue::Int32(Some(range_start))),
4148            std::collections::Bound::Excluded(ScalarValue::Int32(Some(range_end))),
4149        );
4150        let full_result_cross = full_index
4151            .search(&query_cross_range, &NoOpMetricsCollector)
4152            .await
4153            .unwrap();
4154        let ranged_result_cross = ranged_index
4155            .search(&query_cross_range, &NoOpMetricsCollector)
4156            .await
4157            .unwrap();
4158        assert_eq!(
4159            full_result_cross, ranged_result_cross,
4160            "Cross-range range query [{}, {}] failed",
4161            range_start, range_end
4162        );
4163
4164        // Test 9 Test simple range within a single page file
4165        let single_range_start = (DEFAULT_BTREE_BATCH_SIZE * 4 - 300) as i32;
4166        let single_range_end = single_range_start + 200;
4167        let query_single_range = SargableQuery::Range(
4168            std::collections::Bound::Included(ScalarValue::Int32(Some(single_range_start))),
4169            std::collections::Bound::Excluded(ScalarValue::Int32(Some(single_range_end))),
4170        );
4171        let full_result_single = full_index
4172            .search(&query_single_range, &NoOpMetricsCollector)
4173            .await
4174            .unwrap();
4175        let ranged_result_single = ranged_index
4176            .search(&query_single_range, &NoOpMetricsCollector)
4177            .await
4178            .unwrap();
4179        assert_eq!(
4180            full_result_single, ranged_result_single,
4181            "Single range query [{}, {}] failed",
4182            single_range_start, single_range_end
4183        );
4184
4185        // Test 10: Large range query spanning almost all values
4186        let large_range_start = 100_i32;
4187        let large_range_end = (DEFAULT_BTREE_BATCH_SIZE * 4 - 100) as i32;
4188        let query_large_range = SargableQuery::Range(
4189            std::collections::Bound::Included(ScalarValue::Int32(Some(large_range_start))),
4190            std::collections::Bound::Excluded(ScalarValue::Int32(Some(large_range_end))),
4191        );
4192        let full_result_single = full_index
4193            .search(&query_large_range, &NoOpMetricsCollector)
4194            .await
4195            .unwrap();
4196        let ranged_result_single = ranged_index
4197            .search(&query_large_range, &NoOpMetricsCollector)
4198            .await
4199            .unwrap();
4200        assert_eq!(
4201            full_result_single, ranged_result_single,
4202            "Single fragment range query [{}, {}] failed",
4203            large_range_start, large_range_end
4204        );
4205
4206        let remap_dir = TempObjDir::default();
4207        let remap_store = Arc::new(LanceIndexStore::new(
4208            Arc::new(ObjectStore::local()),
4209            remap_dir.clone(),
4210            Arc::new(LanceCache::no_cache()),
4211        ));
4212
4213        // Remap with a no-op mapping.  The remapped index should be identical to the original
4214        ranged_index
4215            .remap(&HashMap::default(), remap_store.as_ref())
4216            .await
4217            .unwrap();
4218
4219        let remap_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache())
4220            .await
4221            .unwrap();
4222
4223        assert_eq!(remap_index.page_lookup, ranged_index.page_lookup);
4224
4225        let ranged_pages = range_store
4226            .open_index_file(part_page_data_file_path(1 << 32).as_str())
4227            .await
4228            .unwrap();
4229        let remapped_pages = remap_store
4230            .open_index_file(part_page_data_file_path(1 << 32).as_str())
4231            .await
4232            .unwrap();
4233
4234        assert_eq!(ranged_pages.num_rows(), remapped_pages.num_rows());
4235
4236        let original_data = ranged_pages
4237            .read_record_batch(0, ranged_pages.num_rows() as u64)
4238            .await
4239            .unwrap();
4240        let remapped_data = remapped_pages
4241            .read_record_batch(0, remapped_pages.num_rows() as u64)
4242            .await
4243            .unwrap();
4244
4245        assert_eq!(original_data, remapped_data);
4246    }
4247
4248    #[tokio::test]
4249    async fn test_update_ranged_index() {
4250        // Setup stores for both indexes
4251        let old_tmpdir = TempObjDir::default();
4252        let old_store = Arc::new(LanceIndexStore::new(
4253            Arc::new(ObjectStore::local()),
4254            old_tmpdir.clone(),
4255            Arc::new(LanceCache::no_cache()),
4256        ));
4257
4258        let new_tmpdir = TempObjDir::default();
4259        let new_store = Arc::new(LanceIndexStore::new(
4260            Arc::new(ObjectStore::local()),
4261            new_tmpdir.clone(),
4262            Arc::new(LanceCache::no_cache()),
4263        ));
4264
4265        // Create range 1 index, intentionally make it not divisible by DEFAULT_BTREE_BATCH_SIZE
4266        let range1_gen = gen_batch()
4267            .col("value", array::step::<Int32Type>())
4268            .col("_rowid", array::step::<UInt64Type>())
4269            .into_df_stream(
4270                RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
4271                BatchCount::from(5),
4272            );
4273        let range1_data_source = Box::pin(RecordBatchStreamAdapter::new(
4274            range1_gen.schema(),
4275            range1_gen,
4276        ));
4277
4278        train_btree_index(
4279            range1_data_source,
4280            old_store.as_ref(),
4281            DEFAULT_BTREE_BATCH_SIZE,
4282            None,
4283            Option::from(1u32),
4284        )
4285        .await
4286        .unwrap();
4287
4288        // Create range 2 index, also intentionally make it not divisible by DEFAULT_BTREE_BATCH_SIZE
4289        let start_val = (DEFAULT_BTREE_BATCH_SIZE * 2 + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
4290        let end_val = (4 * DEFAULT_BTREE_BATCH_SIZE) as i32;
4291        let values_second_half: Vec<i32> = (start_val..end_val).collect();
4292        let row_ids_second_half: Vec<u64> = (start_val as u64..end_val as u64).collect();
4293        let range2_gen = gen_batch()
4294            .col("value", array::cycle::<Int32Type>(values_second_half))
4295            .col("_rowid", array::cycle::<UInt64Type>(row_ids_second_half))
4296            .into_df_stream(
4297                RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
4298                BatchCount::from(3),
4299            );
4300        let range2_data_source = Box::pin(RecordBatchStreamAdapter::new(
4301            range2_gen.schema(),
4302            range2_gen,
4303        ));
4304
4305        train_btree_index(
4306            range2_data_source,
4307            old_store.as_ref(),
4308            DEFAULT_BTREE_BATCH_SIZE,
4309            None,
4310            Option::from(2u32),
4311        )
4312        .await
4313        .unwrap();
4314
4315        // Merge the fragment files
4316        let part_page_files = vec![
4317            part_page_data_file_path(1 << 32),
4318            part_page_data_file_path(2 << 32),
4319        ];
4320
4321        let part_lookup_files = vec![
4322            part_lookup_file_path(1 << 32),
4323            part_lookup_file_path(2 << 32),
4324        ];
4325
4326        super::merge_metadata_files(
4327            old_store.as_ref(),
4328            &part_page_files,
4329            &part_lookup_files,
4330            Option::from(1usize),
4331            noop_progress(),
4332        )
4333        .await
4334        .unwrap();
4335
4336        // create some update data
4337        let start_val = (DEFAULT_BTREE_BATCH_SIZE * 2) as i32;
4338        let end_val = (DEFAULT_BTREE_BATCH_SIZE * 3) as i32;
4339        let row_id_delta = (DEFAULT_BTREE_BATCH_SIZE * 3) as i32;
4340        let values: Vec<i32> = (start_val..end_val).collect();
4341        let row_ids: Vec<u64> =
4342            ((start_val + row_id_delta) as u64..(end_val + row_id_delta) as u64).collect();
4343        let update_data = gen_batch()
4344            .col("value", array::cycle::<Int32Type>(values))
4345            .col("_rowid", array::cycle::<UInt64Type>(row_ids))
4346            .into_df_stream(
4347                RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
4348                BatchCount::from(2),
4349            );
4350        let update_data_source = Box::pin(RecordBatchStreamAdapter::new(
4351            update_data.schema(),
4352            update_data,
4353        ));
4354
4355        let ranged_index = BTreeIndex::load(old_store.clone(), None, &LanceCache::no_cache())
4356            .await
4357            .unwrap();
4358
4359        // update the ranged index
4360        ranged_index
4361            .update(update_data_source, new_store.as_ref(), None)
4362            .await
4363            .expect("Error in updating ranged index");
4364
4365        let updated_index = BTreeIndex::load(new_store.clone(), None, &LanceCache::no_cache())
4366            .await
4367            .unwrap();
4368
4369        assert!(
4370            updated_index.ranges_to_files.is_none(),
4371            "Updated ranged-btree-index should fall back to non-ranged"
4372        );
4373
4374        let updated_value = (DEFAULT_BTREE_BATCH_SIZE * 2 + (DEFAULT_BTREE_BATCH_SIZE / 2)) as i32;
4375        let updated_query = SargableQuery::Equals(ScalarValue::Int32(Some(updated_value)));
4376
4377        let query_result = updated_index
4378            .search(&updated_query, &NoOpMetricsCollector)
4379            .await
4380            .unwrap();
4381        match query_result {
4382            SearchResult::Exact(row_id_map) => {
4383                assert!(
4384                    row_id_map.selected(updated_value as u64),
4385                    "Updated index should contain original rowids."
4386                );
4387                assert!(
4388                    row_id_map.selected((updated_value + row_id_delta) as u64),
4389                    "Updated index should contain new rowids"
4390                );
4391            }
4392            _ => {
4393                panic!("Btree search result should always be Exact.");
4394            }
4395        }
4396    }
4397
4398    #[tokio::test]
4399    async fn test_update_with_exact_row_id_filter() {
4400        let old_tmpdir = TempObjDir::default();
4401        let old_store = Arc::new(LanceIndexStore::new(
4402            Arc::new(ObjectStore::local()),
4403            old_tmpdir.clone(),
4404            Arc::new(LanceCache::no_cache()),
4405        ));
4406
4407        let new_tmpdir = TempObjDir::default();
4408        let new_store = Arc::new(LanceIndexStore::new(
4409            Arc::new(ObjectStore::local()),
4410            new_tmpdir.clone(),
4411            Arc::new(LanceCache::no_cache()),
4412        ));
4413
4414        let old_data = gen_batch()
4415            .col("value", array::step::<Int32Type>())
4416            .col("_rowid", array::step::<UInt64Type>())
4417            .into_df_stream(RowCount::from(512), BatchCount::from(2));
4418        let old_data_source = Box::pin(RecordBatchStreamAdapter::new(old_data.schema(), old_data));
4419        train_btree_index(
4420            old_data_source,
4421            old_store.as_ref(),
4422            DEFAULT_BTREE_BATCH_SIZE,
4423            None,
4424            None,
4425        )
4426        .await
4427        .unwrap();
4428
4429        let index = BTreeIndex::load(old_store.clone(), None, &LanceCache::no_cache())
4430            .await
4431            .unwrap();
4432
4433        let new_data = gen_batch()
4434            .col("value", array::step_custom::<Int32Type>(2000, 1))
4435            .col("_rowid", array::step_custom::<UInt64Type>(2000, 1))
4436            .into_df_stream(RowCount::from(100), BatchCount::from(1));
4437        let new_data_source = Box::pin(RecordBatchStreamAdapter::new(new_data.schema(), new_data));
4438
4439        let mut retained_old_rows = RowAddrTreeMap::new();
4440        retained_old_rows.insert_range(0..64);
4441        retained_old_rows.insert_range(300..364);
4442
4443        index
4444            .update(
4445                new_data_source,
4446                new_store.as_ref(),
4447                Some(OldIndexDataFilter::RowIds(retained_old_rows)),
4448            )
4449            .await
4450            .unwrap();
4451
4452        let updated_index = BTreeIndex::load(new_store.clone(), None, &LanceCache::no_cache())
4453            .await
4454            .unwrap();
4455
4456        let present = |value: i32| {
4457            let updated_index = updated_index.clone();
4458            async move {
4459                let query = SargableQuery::Equals(ScalarValue::Int32(Some(value)));
4460                match updated_index
4461                    .search(&query, &NoOpMetricsCollector)
4462                    .await
4463                    .unwrap()
4464                {
4465                    SearchResult::Exact(row_id_map) => row_id_map.selected(value as u64),
4466                    _ => unreachable!("Btree search result should always be Exact"),
4467                }
4468            }
4469        };
4470
4471        assert!(present(12).await);
4472        assert!(present(320).await);
4473        assert!(!present(120).await);
4474        assert!(!present(420).await);
4475        assert!(present(2005).await);
4476    }
4477
4478    /// Rust equivalent of Python test `test_btree_remap_big_deletions`
4479    ///
4480    /// This test verifies that btree index remapping works correctly when a large
4481    /// portion of the data is deleted. The Python test:
4482    /// 1. Writes 15K rows in 3 fragments (values 0-14999)
4483    /// 2. Creates a btree index (will have multiple pages)
4484    /// 3. Deletes rows where a > 1000 AND a < 10000 (deletes values 1001-9999)
4485    /// 4. Runs compaction (materializes deletions via remap)
4486    /// 5. Verifies the index still works for remaining values
4487    #[tokio::test]
4488    async fn test_btree_remap_big_deletions() {
4489        let tmpdir = TempObjDir::default();
4490        let test_store = Arc::new(LanceIndexStore::new(
4491            Arc::new(ObjectStore::local()),
4492            tmpdir.clone(),
4493            Arc::new(LanceCache::no_cache()),
4494        ));
4495
4496        // Generate 15000 rows with values 0-14999 and row_ids 0-14999
4497        // Using a smaller batch size to ensure we get multiple pages
4498        let batch_size = 4096;
4499        let total_rows = 15000;
4500
4501        let stream = gen_batch()
4502            .col("value", array::step::<Int32Type>())
4503            .col("_rowid", array::step::<UInt64Type>())
4504            .into_df_stream(RowCount::from(total_rows), BatchCount::from(1));
4505
4506        train_btree_index(stream, test_store.as_ref(), batch_size, None, None)
4507            .await
4508            .unwrap();
4509
4510        let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache())
4511            .await
4512            .unwrap();
4513
4514        // Create a mapping that simulates deleting rows where value > 1000 AND value < 10000
4515        // Since values match row_ids in our test data:
4516        // - Rows 0-1000 (values 0-1000) are kept with same row_ids
4517        // - Rows 1001-9999 (values 1001-9999) are deleted (mapped to None)
4518        // - Rows 10000-14999 (values 10000-14999) are remapped to new row_ids 1001-5999
4519        let mut mapping: HashMap<u64, Option<u64>> = HashMap::new();
4520
4521        // Mark deleted rows (values 1001-9999)
4522        for old_id in 1001..10000 {
4523            mapping.insert(old_id, None);
4524        }
4525
4526        let mut new_id_counter = 100_000;
4527
4528        // Remap all other rows
4529        for old_id in (0..1000).chain(10000..15000) {
4530            let new_id = new_id_counter;
4531            new_id_counter += 1;
4532            mapping.insert(old_id, Some(new_id));
4533        }
4534
4535        let remap_dir = TempObjDir::default();
4536        let remap_store = Arc::new(LanceIndexStore::new(
4537            Arc::new(ObjectStore::local()),
4538            remap_dir.clone(),
4539            Arc::new(LanceCache::no_cache()),
4540        ));
4541
4542        // Remap the index with our deletion mapping
4543        index.remap(&mapping, remap_store.as_ref()).await.unwrap();
4544
4545        let remapped_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache())
4546            .await
4547            .unwrap();
4548
4549        // Verify values that should exist (values 0-1000 and 10000-14999)
4550        // These correspond to: original values 0-1000 at row_ids 0-1000
4551        // and original values 10000-14999 at new row_ids 1001-5999
4552        let should_exist = vec![0, 500, 1000, 10000, 13000, 14000, 14999];
4553        for value in should_exist {
4554            let query = SargableQuery::Equals(ScalarValue::Int32(Some(value)));
4555            let result = remapped_index
4556                .search(&query, &NoOpMetricsCollector)
4557                .await
4558                .unwrap();
4559            match result {
4560                SearchResult::Exact(row_id_map) => {
4561                    assert!(
4562                        !row_id_map.is_empty(),
4563                        "Value {} should exist in remapped index but was not found",
4564                        value
4565                    );
4566                }
4567                _ => {
4568                    panic!("Btree search result should always be Exact.");
4569                }
4570            }
4571        }
4572
4573        // Verify values that should NOT exist (values 1001-9999 were deleted)
4574        let should_not_exist = vec![1001, 5000, 8000, 9999];
4575        for value in should_not_exist {
4576            let query = SargableQuery::Equals(ScalarValue::Int32(Some(value)));
4577            let result = remapped_index
4578                .search(&query, &NoOpMetricsCollector)
4579                .await
4580                .unwrap();
4581            match result {
4582                SearchResult::Exact(row_id_map) => {
4583                    assert!(
4584                        row_id_map.is_empty(),
4585                        "Value {} should NOT exist in remapped index but was found",
4586                        value
4587                    );
4588                }
4589                _ => {
4590                    panic!("Btree search result should always be Exact.");
4591                }
4592            }
4593        }
4594    }
4595
4596    /// Regression test: BTree search must track null row IDs for non-IsNull
4597    /// queries, even when no pages match the queried value.
4598    ///
4599    /// Without this, `NOT(x = val)` when `val` is absent from the data would
4600    /// produce an empty null set, causing NULL rows to incorrectly pass.
4601    #[tokio::test]
4602    async fn test_search_tracks_nulls_for_absent_value() {
4603        use arrow_array::{Int32Array, UInt64Array};
4604
4605        let tmpdir = TempObjDir::default();
4606        let test_store = Arc::new(LanceIndexStore::new(
4607            Arc::new(ObjectStore::local()),
4608            tmpdir.clone(),
4609            Arc::new(LanceCache::no_cache()),
4610        ));
4611
4612        // Create data with 80% nulls so that training produces separate
4613        // all-null pages (which are not in the BTree map). Non-null values
4614        // are all in [100, 5099], so value 0 never appears.
4615        let num_rows = 5000u64;
4616        let values: Int32Array = (0..num_rows)
4617            .map(|i| {
4618                if i % 5 != 0 {
4619                    None // 80% null
4620                } else {
4621                    Some(100 + i as i32) // non-null values in [100, 5099]
4622                }
4623            })
4624            .collect();
4625        let row_ids = UInt64Array::from_iter_values(0..num_rows);
4626        let data = arrow_array::RecordBatch::try_from_iter(vec![
4627            ("value", Arc::new(values) as arrow_array::ArrayRef),
4628            ("_rowid", Arc::new(row_ids) as arrow_array::ArrayRef),
4629        ])
4630        .unwrap();
4631
4632        let schema = data.schema();
4633        let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
4634            schema,
4635            stream::iter(vec![Ok(data)]),
4636        ));
4637        train_btree_index(stream, test_store.as_ref(), num_rows, None, None)
4638            .await
4639            .unwrap();
4640
4641        let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache())
4642            .await
4643            .unwrap();
4644
4645        // Verify we have all-null pages (the bug depends on this)
4646        assert!(
4647            !index.page_lookup.all_null_pages.is_empty(),
4648            "Test setup requires all-null pages; got null_pages={}, all_null_pages={}",
4649            index.page_lookup.null_pages.len(),
4650            index.page_lookup.all_null_pages.len(),
4651        );
4652
4653        let metrics = NoOpMetricsCollector;
4654
4655        // Search for Equals(0) — value 0 doesn't exist in any page
4656        let result = index
4657            .search(
4658                &SargableQuery::Equals(ScalarValue::Int32(Some(0))),
4659                &metrics,
4660            )
4661            .await
4662            .unwrap();
4663
4664        match result {
4665            SearchResult::Exact(set) => {
4666                // No rows should be TRUE (value 0 doesn't exist)
4667                assert!(set.true_rows().is_empty(), "No rows should match Equals(0)");
4668                // NULL rows MUST be tracked as null
4669                assert!(
4670                    !set.null_rows().is_empty(),
4671                    "Null rows must be tracked even when no pages match the value"
4672                );
4673            }
4674            _ => panic!("BTree search should return Exact"),
4675        }
4676
4677        // Also verify Range query tracks nulls when no values match
4678        let result = index
4679            .search(
4680                &SargableQuery::Range(
4681                    std::ops::Bound::Unbounded,
4682                    std::ops::Bound::Excluded(ScalarValue::Int32(Some(50))),
4683                ),
4684                &metrics,
4685            )
4686            .await
4687            .unwrap();
4688
4689        match result {
4690            SearchResult::Exact(set) => {
4691                assert!(set.true_rows().is_empty(), "No rows should be < 50");
4692                assert!(
4693                    !set.null_rows().is_empty(),
4694                    "Null rows must be tracked for range queries too"
4695                );
4696            }
4697            _ => panic!("BTree search should return Exact"),
4698        }
4699    }
4700}