1use std::{
5 any::Any,
6 cmp::Ordering,
7 collections::{BTreeMap, BinaryHeap, HashMap, HashSet},
8 fmt::{Debug, Display},
9 ops::Bound,
10 sync::Arc,
11};
12
13use super::{
14 AnyQuery, BuiltinIndexType, IndexReader, IndexStore, IndexWriter, MetricsCollector,
15 OldIndexDataFilter, SargableQuery, ScalarIndex, ScalarIndexParams, SearchResult,
16 compute_next_prefix,
17};
18use crate::{Index, IndexType};
19use crate::{
20 frag_reuse::FragReuseIndex,
21 progress::{IndexBuildProgress, noop_progress},
22 scalar::{
23 CreatedIndex, UpdateCriteria,
24 expression::{SargableQueryParser, ScalarQueryParser},
25 registry::{ScalarIndexPlugin, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME},
26 },
27};
28use crate::{metrics::NoOpMetricsCollector, scalar::registry::TrainingCriteria};
29use crate::{pbold, scalar::btree::flat::FlatIndex};
30use arrow_arith::numeric::add;
31use arrow_array::{Array, RecordBatch, UInt32Array, new_empty_array};
32use arrow_schema::{DataType, Field, Schema, SortOptions};
33use async_trait::async_trait;
34use datafusion::physical_plan::{
35 ExecutionPlan, SendableRecordBatchStream,
36 sorts::sort_preserving_merge::SortPreservingMergeExec, stream::RecordBatchStreamAdapter,
37 union::UnionExec,
38};
39use datafusion_common::{DataFusionError, ScalarValue};
40use datafusion_physical_expr::{PhysicalSortExpr, expressions::Column};
41use deepsize::DeepSizeOf;
42use futures::{
43 FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
44 future::BoxFuture,
45 stream::{self},
46};
47use lance_core::{
48 Error, ROW_ID, Result,
49 cache::{CacheKey, LanceCache, WeakLanceCache},
50 error::LanceOptionExt,
51 utils::{
52 mask::NullableRowAddrSet,
53 tokio::get_num_compute_intensive_cpus,
54 tracing::{IO_TYPE_LOAD_SCALAR_PART, TRACE_IO_EVENTS},
55 },
56};
57use lance_datafusion::{
58 chunker::chunk_concat_stream,
59 exec::{LanceExecutionOptions, OneShotExec, execute_plan},
60};
61use lance_io::object_store::ObjectStore;
62use log::{debug, warn};
63use object_store::path::Path;
64use rangemap::RangeInclusiveMap;
65use roaring::RoaringBitmap;
66use serde::{Deserialize, Serialize, Serializer};
67use tracing::{info, instrument};
68
69mod flat;
70
71const BTREE_LOOKUP_NAME: &str = "page_lookup.lance";
72const BTREE_PAGES_NAME: &str = "page_data.lance";
73pub const DEFAULT_BTREE_BATCH_SIZE: u64 = 4096;
74const BATCH_SIZE_META_KEY: &str = "batch_size";
75const DEFAULT_RANGE_PARTITIONED: bool = false;
76const RANGE_PARTITIONED_META_KEY: &str = "range_partitioned";
77const PAGE_NUM_PER_RANGE_PARTITION_META_KEY: &str = "page_num_per_range_partition";
78const BTREE_INDEX_VERSION: u32 = 0;
79pub(crate) const BTREE_VALUES_COLUMN: &str = "values";
80pub(crate) const BTREE_IDS_COLUMN: &str = "ids";
81
82#[derive(Clone, Debug)]
84pub struct OrderableScalarValue(pub ScalarValue);
85
86impl DeepSizeOf for OrderableScalarValue {
87 fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
88 self.0.size() - std::mem::size_of::<ScalarValue>()
90 }
91}
92
93impl Display for OrderableScalarValue {
94 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
95 std::fmt::Display::fmt(&self.0, f)
96 }
97}
98
99impl PartialEq for OrderableScalarValue {
100 fn eq(&self, other: &Self) -> bool {
101 self.0.eq(&other.0)
102 }
103}
104
105impl Eq for OrderableScalarValue {}
106
107impl PartialOrd for OrderableScalarValue {
108 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
109 Some(self.cmp(other))
110 }
111}
112
113impl Ord for OrderableScalarValue {
119 fn cmp(&self, other: &Self) -> Ordering {
120 use ScalarValue::*;
121 match (&self.0, &other.0) {
125 (Decimal32(v1, p1, s1), Decimal32(v2, p2, s2)) => {
126 if p1.eq(p2) && s1.eq(s2) {
127 v1.cmp(v2)
128 } else {
129 panic!("Attempt to compare decimals with unequal precision / scale")
131 }
132 }
133 (Decimal32(v1, _, _), Null) => {
134 if v1.is_none() {
135 Ordering::Equal
136 } else {
137 Ordering::Greater
138 }
139 }
140 (Decimal32(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
141 (Decimal64(v1, p1, s1), Decimal64(v2, p2, s2)) => {
142 if p1.eq(p2) && s1.eq(s2) {
143 v1.cmp(v2)
144 } else {
145 panic!("Attempt to compare decimals with unequal precision / scale")
147 }
148 }
149 (Decimal64(v1, _, _), Null) => {
150 if v1.is_none() {
151 Ordering::Equal
152 } else {
153 Ordering::Greater
154 }
155 }
156 (Decimal64(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
157 (Decimal128(v1, p1, s1), Decimal128(v2, p2, s2)) => {
158 if p1.eq(p2) && s1.eq(s2) {
159 v1.cmp(v2)
160 } else {
161 panic!("Attempt to compare decimals with unequal precision / scale")
163 }
164 }
165 (Decimal128(v1, _, _), Null) => {
166 if v1.is_none() {
167 Ordering::Equal
168 } else {
169 Ordering::Greater
170 }
171 }
172 (Decimal128(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
173 (Decimal256(v1, p1, s1), Decimal256(v2, p2, s2)) => {
174 if p1.eq(p2) && s1.eq(s2) {
175 v1.cmp(v2)
176 } else {
177 panic!("Attempt to compare decimals with unequal precision / scale")
179 }
180 }
181 (Decimal256(v1, _, _), Null) => {
182 if v1.is_none() {
183 Ordering::Equal
184 } else {
185 Ordering::Greater
186 }
187 }
188 (Decimal256(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
189
190 (Boolean(v1), Boolean(v2)) => v1.cmp(v2),
191 (Boolean(v1), Null) => {
192 if v1.is_none() {
193 Ordering::Equal
194 } else {
195 Ordering::Greater
196 }
197 }
198 (Boolean(_), _) => panic!("Attempt to compare boolean with non-boolean"),
199 (Float32(v1), Float32(v2)) => match (v1, v2) {
200 (Some(f1), Some(f2)) => f1.total_cmp(f2),
201 (None, Some(_)) => Ordering::Less,
202 (Some(_), None) => Ordering::Greater,
203 (None, None) => Ordering::Equal,
204 },
205 (Float32(v1), Null) => {
206 if v1.is_none() {
207 Ordering::Equal
208 } else {
209 Ordering::Greater
210 }
211 }
212 (Float32(_), _) => panic!("Attempt to compare f32 with non-f32"),
213 (Float64(v1), Float64(v2)) => match (v1, v2) {
214 (Some(f1), Some(f2)) => f1.total_cmp(f2),
215 (None, Some(_)) => Ordering::Less,
216 (Some(_), None) => Ordering::Greater,
217 (None, None) => Ordering::Equal,
218 },
219 (Float64(v1), Null) => {
220 if v1.is_none() {
221 Ordering::Equal
222 } else {
223 Ordering::Greater
224 }
225 }
226 (Float64(_), _) => panic!("Attempt to compare f64 with non-f64"),
227 (Float16(v1), Float16(v2)) => match (v1, v2) {
228 (Some(f1), Some(f2)) => f1.total_cmp(f2),
229 (None, Some(_)) => Ordering::Less,
230 (Some(_), None) => Ordering::Greater,
231 (None, None) => Ordering::Equal,
232 },
233 (Float16(v1), Null) => {
234 if v1.is_none() {
235 Ordering::Equal
236 } else {
237 Ordering::Greater
238 }
239 }
240 (Float16(_), _) => panic!("Attempt to compare f16 with non-f16"),
241 (Int8(v1), Int8(v2)) => v1.cmp(v2),
242 (Int8(v1), Null) => {
243 if v1.is_none() {
244 Ordering::Equal
245 } else {
246 Ordering::Greater
247 }
248 }
249 (Int8(_), _) => panic!("Attempt to compare Int8 with non-Int8"),
250 (Int16(v1), Int16(v2)) => v1.cmp(v2),
251 (Int16(v1), Null) => {
252 if v1.is_none() {
253 Ordering::Equal
254 } else {
255 Ordering::Greater
256 }
257 }
258 (Int16(_), _) => panic!("Attempt to compare Int16 with non-Int16"),
259 (Int32(v1), Int32(v2)) => v1.cmp(v2),
260 (Int32(v1), Null) => {
261 if v1.is_none() {
262 Ordering::Equal
263 } else {
264 Ordering::Greater
265 }
266 }
267 (Int32(_), _) => panic!("Attempt to compare Int32 with non-Int32"),
268 (Int64(v1), Int64(v2)) => v1.cmp(v2),
269 (Int64(v1), Null) => {
270 if v1.is_none() {
271 Ordering::Equal
272 } else {
273 Ordering::Greater
274 }
275 }
276 (Int64(_), _) => panic!("Attempt to compare Int64 with non-Int64"),
277 (UInt8(v1), UInt8(v2)) => v1.cmp(v2),
278 (UInt8(v1), Null) => {
279 if v1.is_none() {
280 Ordering::Equal
281 } else {
282 Ordering::Greater
283 }
284 }
285 (UInt8(_), _) => panic!("Attempt to compare UInt8 with non-UInt8"),
286 (UInt16(v1), UInt16(v2)) => v1.cmp(v2),
287 (UInt16(v1), Null) => {
288 if v1.is_none() {
289 Ordering::Equal
290 } else {
291 Ordering::Greater
292 }
293 }
294 (UInt16(_), _) => panic!("Attempt to compare UInt16 with non-UInt16"),
295 (UInt32(v1), UInt32(v2)) => v1.cmp(v2),
296 (UInt32(v1), Null) => {
297 if v1.is_none() {
298 Ordering::Equal
299 } else {
300 Ordering::Greater
301 }
302 }
303 (UInt32(_), _) => panic!("Attempt to compare UInt32 with non-UInt32"),
304 (UInt64(v1), UInt64(v2)) => v1.cmp(v2),
305 (UInt64(v1), Null) => {
306 if v1.is_none() {
307 Ordering::Equal
308 } else {
309 Ordering::Greater
310 }
311 }
312 (UInt64(_), _) => panic!("Attempt to compare UInt64 with non-UInt64"),
313 (Utf8(v1) | Utf8View(v1) | LargeUtf8(v1), Utf8(v2) | Utf8View(v2) | LargeUtf8(v2)) => {
314 v1.cmp(v2)
315 }
316 (Utf8(v1) | Utf8View(v1) | LargeUtf8(v1), Null) => {
317 if v1.is_none() {
318 Ordering::Equal
319 } else {
320 Ordering::Greater
321 }
322 }
323 (Utf8(_) | Utf8View(_) | LargeUtf8(_), _) => {
324 panic!("Attempt to compare Utf8 with non-Utf8")
325 }
326 (
327 Binary(v1) | LargeBinary(v1) | BinaryView(v1),
328 Binary(v2) | LargeBinary(v2) | BinaryView(v2),
329 ) => v1.cmp(v2),
330 (Binary(v1) | LargeBinary(v1) | BinaryView(v1), Null) => {
331 if v1.is_none() {
332 Ordering::Equal
333 } else {
334 Ordering::Greater
335 }
336 }
337 (Binary(_) | LargeBinary(_) | BinaryView(_), _) => {
338 panic!("Attempt to compare Binary with non-Binary")
339 }
340 (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.cmp(v2),
341 (FixedSizeBinary(_, v1), Null) => {
342 if v1.is_none() {
343 Ordering::Equal
344 } else {
345 Ordering::Greater
346 }
347 }
348 (FixedSizeBinary(_, _), _) => {
349 panic!("Attempt to compare FixedSizeBinary with non-FixedSizeBinary")
350 }
351 (FixedSizeList(left), FixedSizeList(right)) => {
352 if left.eq(right) {
353 todo!()
354 } else {
355 panic!(
356 "Attempt to compare fixed size list elements with different widths/fields"
357 )
358 }
359 }
360 (FixedSizeList(left), Null) => {
361 if left.is_null(0) {
362 Ordering::Equal
363 } else {
364 Ordering::Greater
365 }
366 }
367 (FixedSizeList(_), _) => {
368 panic!("Attempt to compare FixedSizeList with non-FixedSizeList")
369 }
370 (List(_), List(_)) => todo!(),
371 (List(left), Null) => {
372 if left.is_null(0) {
373 Ordering::Equal
374 } else {
375 Ordering::Greater
376 }
377 }
378 (List(_), _) => {
379 panic!("Attempt to compare List with non-List")
380 }
381 (LargeList(_), _) => todo!(),
382 (Map(_), Map(_)) => todo!(),
383 (Map(left), Null) => {
384 if left.is_null(0) {
385 Ordering::Equal
386 } else {
387 Ordering::Greater
388 }
389 }
390 (Map(_), _) => {
391 panic!("Attempt to compare Map with non-Map")
392 }
393 (Date32(v1), Date32(v2)) => v1.cmp(v2),
394 (Date32(v1), Null) => {
395 if v1.is_none() {
396 Ordering::Equal
397 } else {
398 Ordering::Greater
399 }
400 }
401 (Date32(_), _) => panic!("Attempt to compare Date32 with non-Date32"),
402 (Date64(v1), Date64(v2)) => v1.cmp(v2),
403 (Date64(v1), Null) => {
404 if v1.is_none() {
405 Ordering::Equal
406 } else {
407 Ordering::Greater
408 }
409 }
410 (Date64(_), _) => panic!("Attempt to compare Date64 with non-Date64"),
411 (Time32Second(v1), Time32Second(v2)) => v1.cmp(v2),
412 (Time32Second(v1), Null) => {
413 if v1.is_none() {
414 Ordering::Equal
415 } else {
416 Ordering::Greater
417 }
418 }
419 (Time32Second(_), _) => panic!("Attempt to compare Time32Second with non-Time32Second"),
420 (Time32Millisecond(v1), Time32Millisecond(v2)) => v1.cmp(v2),
421 (Time32Millisecond(v1), Null) => {
422 if v1.is_none() {
423 Ordering::Equal
424 } else {
425 Ordering::Greater
426 }
427 }
428 (Time32Millisecond(_), _) => {
429 panic!("Attempt to compare Time32Millisecond with non-Time32Millisecond")
430 }
431 (Time64Microsecond(v1), Time64Microsecond(v2)) => v1.cmp(v2),
432 (Time64Microsecond(v1), Null) => {
433 if v1.is_none() {
434 Ordering::Equal
435 } else {
436 Ordering::Greater
437 }
438 }
439 (Time64Microsecond(_), _) => {
440 panic!("Attempt to compare Time64Microsecond with non-Time64Microsecond")
441 }
442 (Time64Nanosecond(v1), Time64Nanosecond(v2)) => v1.cmp(v2),
443 (Time64Nanosecond(v1), Null) => {
444 if v1.is_none() {
445 Ordering::Equal
446 } else {
447 Ordering::Greater
448 }
449 }
450 (Time64Nanosecond(_), _) => {
451 panic!("Attempt to compare Time64Nanosecond with non-Time64Nanosecond")
452 }
453 (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.cmp(v2),
454 (TimestampSecond(v1, _), Null) => {
455 if v1.is_none() {
456 Ordering::Equal
457 } else {
458 Ordering::Greater
459 }
460 }
461 (TimestampSecond(_, _), _) => {
462 panic!("Attempt to compare TimestampSecond with non-TimestampSecond")
463 }
464 (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.cmp(v2),
465 (TimestampMillisecond(v1, _), Null) => {
466 if v1.is_none() {
467 Ordering::Equal
468 } else {
469 Ordering::Greater
470 }
471 }
472 (TimestampMillisecond(_, _), _) => {
473 panic!("Attempt to compare TimestampMillisecond with non-TimestampMillisecond")
474 }
475 (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => v1.cmp(v2),
476 (TimestampMicrosecond(v1, _), Null) => {
477 if v1.is_none() {
478 Ordering::Equal
479 } else {
480 Ordering::Greater
481 }
482 }
483 (TimestampMicrosecond(_, _), _) => {
484 panic!("Attempt to compare TimestampMicrosecond with non-TimestampMicrosecond")
485 }
486 (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => v1.cmp(v2),
487 (TimestampNanosecond(v1, _), Null) => {
488 if v1.is_none() {
489 Ordering::Equal
490 } else {
491 Ordering::Greater
492 }
493 }
494 (TimestampNanosecond(_, _), _) => {
495 panic!("Attempt to compare TimestampNanosecond with non-TimestampNanosecond")
496 }
497 (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.cmp(v2),
498 (IntervalYearMonth(v1), Null) => {
499 if v1.is_none() {
500 Ordering::Equal
501 } else {
502 Ordering::Greater
503 }
504 }
505 (IntervalYearMonth(_), _) => {
506 panic!("Attempt to compare IntervalYearMonth with non-IntervalYearMonth")
507 }
508 (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.cmp(v2),
509 (IntervalDayTime(v1), Null) => {
510 if v1.is_none() {
511 Ordering::Equal
512 } else {
513 Ordering::Greater
514 }
515 }
516 (IntervalDayTime(_), _) => {
517 panic!("Attempt to compare IntervalDayTime with non-IntervalDayTime")
518 }
519 (IntervalMonthDayNano(v1), IntervalMonthDayNano(v2)) => v1.cmp(v2),
520 (IntervalMonthDayNano(v1), Null) => {
521 if v1.is_none() {
522 Ordering::Equal
523 } else {
524 Ordering::Greater
525 }
526 }
527 (IntervalMonthDayNano(_), _) => {
528 panic!("Attempt to compare IntervalMonthDayNano with non-IntervalMonthDayNano")
529 }
530 (DurationSecond(v1), DurationSecond(v2)) => v1.cmp(v2),
531 (DurationSecond(v1), Null) => {
532 if v1.is_none() {
533 Ordering::Equal
534 } else {
535 Ordering::Greater
536 }
537 }
538 (DurationSecond(_), _) => {
539 panic!("Attempt to compare DurationSecond with non-DurationSecond")
540 }
541 (DurationMillisecond(v1), DurationMillisecond(v2)) => v1.cmp(v2),
542 (DurationMillisecond(v1), Null) => {
543 if v1.is_none() {
544 Ordering::Equal
545 } else {
546 Ordering::Greater
547 }
548 }
549 (DurationMillisecond(_), _) => {
550 panic!("Attempt to compare DurationMillisecond with non-DurationMillisecond")
551 }
552 (DurationMicrosecond(v1), DurationMicrosecond(v2)) => v1.cmp(v2),
553 (DurationMicrosecond(v1), Null) => {
554 if v1.is_none() {
555 Ordering::Equal
556 } else {
557 Ordering::Greater
558 }
559 }
560 (DurationMicrosecond(_), _) => {
561 panic!("Attempt to compare DurationMicrosecond with non-DurationMicrosecond")
562 }
563 (DurationNanosecond(v1), DurationNanosecond(v2)) => v1.cmp(v2),
564 (DurationNanosecond(v1), Null) => {
565 if v1.is_none() {
566 Ordering::Equal
567 } else {
568 Ordering::Greater
569 }
570 }
571 (DurationNanosecond(_), _) => {
572 panic!("Attempt to compare DurationNanosecond with non-DurationNanosecond")
573 }
574 (Struct(_arr), Struct(_arr2)) => todo!(),
575 (Struct(arr), Null) => {
576 if arr.is_empty() {
577 Ordering::Equal
578 } else {
579 Ordering::Greater
580 }
581 }
582 (Struct(_arr), _) => panic!("Attempt to compare Struct with non-Struct"),
583 (Dictionary(_k1, _v1), Dictionary(_k2, _v2)) => todo!(),
584 (Dictionary(_, v1), Null) => Self(*v1.clone()).cmp(&Self(ScalarValue::Null)),
585 (Dictionary(_, _), _) => panic!("Attempt to compare Dictionary with non-Dictionary"),
586 (Union(_, _, _), _) => todo!("Support for union scalars"),
588 (RunEndEncoded(_, _, _), _) => {
589 todo!("Support for run-end encoded scalars")
590 }
591 (Null, Null) => Ordering::Equal,
592 (Null, _) => todo!(),
593 }
594 }
595}
596
597#[derive(Debug, DeepSizeOf, PartialEq, Eq)]
598struct PageRecord {
599 max: OrderableScalarValue,
600 page_number: u32,
601}
602
603trait BTreeMapExt<K, V> {
604 fn largest_node_less(&self, key: &K) -> Option<(&K, &V)>;
605}
606
607impl<K: Ord, V> BTreeMapExt<K, V> for BTreeMap<K, V> {
608 fn largest_node_less(&self, key: &K) -> Option<(&K, &V)> {
609 self.range((Bound::Unbounded, Bound::Excluded(key)))
610 .next_back()
611 }
612}
613
614#[derive(Debug, DeepSizeOf, PartialEq, Eq)]
616pub struct BTreeLookup {
617 tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>,
618 null_pages: Vec<u32>,
620 all_null_pages: Vec<u32>,
622}
623
624impl BTreeLookup {
625 fn empty() -> Self {
626 Self {
627 tree: BTreeMap::new(),
628 null_pages: Vec::new(),
629 all_null_pages: Vec::new(),
630 }
631 }
632}
633
634#[derive(Debug, Copy, Clone)]
635enum Matches {
636 Some(u32),
637 All(u32),
638}
639
640impl Matches {
641 fn page_id(&self) -> u32 {
642 match self {
643 Self::Some(page_id) => *page_id,
644 Self::All(page_id) => *page_id,
645 }
646 }
647}
648
649impl BTreeLookup {
650 fn new(
651 tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>,
652 null_pages: Vec<u32>,
653 all_null_pages: Vec<u32>,
654 ) -> Self {
655 Self {
656 tree,
657 null_pages,
658 all_null_pages,
659 }
660 }
661
662 fn pages_eq(&self, query: &OrderableScalarValue) -> Vec<Matches> {
664 if query.0.is_null() {
665 self.pages_null()
666 } else {
667 self.pages_between((Bound::Included(query), Bound::Excluded(query)))
668 }
669 }
670
671 fn pages_in(&self, values: impl IntoIterator<Item = OrderableScalarValue>) -> Vec<Matches> {
673 let page_lists = values
676 .into_iter()
677 .map(|val| {
678 self.pages_eq(&val)
679 .into_iter()
680 .map(|matches| matches.page_id())
681 })
682 .collect::<Vec<_>>();
683 let total_size = page_lists.iter().map(|set| set.len()).sum();
684 let mut heap = BinaryHeap::with_capacity(total_size);
685 for page_list in page_lists {
686 heap.extend(page_list);
687 }
688 let mut all_pages = heap.into_sorted_vec();
689 all_pages.dedup();
690 all_pages.into_iter().map(Matches::Some).collect()
691 }
692
693 fn pages_between(
695 &self,
696 range: (Bound<&OrderableScalarValue>, Bound<&OrderableScalarValue>),
697 ) -> Vec<Matches> {
698 let lower_bound = match range.0 {
701 Bound::Unbounded => Bound::Unbounded,
702 Bound::Included(lower) => self
709 .tree
710 .largest_node_less(lower)
711 .map(|val| Bound::Included(val.0))
712 .unwrap_or(Bound::Unbounded),
713 Bound::Excluded(lower) => self
714 .tree
715 .largest_node_less(lower)
716 .map(|val| Bound::Included(val.0))
717 .unwrap_or(Bound::Unbounded),
718 };
719 let upper_bound = match range.1 {
720 Bound::Unbounded => Bound::Unbounded,
721 Bound::Included(upper) => Bound::Included(upper),
722 Bound::Excluded(upper) => Bound::Included(upper),
731 };
732
733 match (lower_bound, upper_bound) {
734 (Bound::Excluded(lower), Bound::Excluded(upper))
735 | (Bound::Excluded(lower), Bound::Included(upper))
736 | (Bound::Included(lower), Bound::Excluded(upper)) => {
737 if lower >= upper {
740 return vec![];
741 }
742 }
743 (Bound::Included(lower), Bound::Included(upper)) => {
744 if lower > upper {
745 return vec![];
746 }
747 }
748 _ => {}
749 }
750
751 let mut matches = Vec::new();
752
753 for (min, page_records) in self.tree.range((lower_bound, upper_bound)) {
754 for page_record in page_records {
755 match lower_bound {
756 Bound::Unbounded => {}
757 Bound::Included(lower) => {
758 if page_record.max.cmp(lower) == Ordering::Less {
759 continue;
760 }
761 }
762 Bound::Excluded(lower) => {
763 if page_record.max.cmp(lower) != Ordering::Greater {
764 continue;
765 }
766 }
767 }
768 if min.0.is_null() || page_record.max.0.is_null() {
772 matches.push(Matches::Some(page_record.page_number));
774 continue;
775 }
776
777 match range.0 {
778 Bound::Excluded(lower) => {
781 if min.cmp(lower) != Ordering::Greater {
782 matches.push(Matches::Some(page_record.page_number));
783 continue;
784 }
785 }
786 Bound::Included(lower) => {
789 if min.cmp(lower) == Ordering::Less {
790 matches.push(Matches::Some(page_record.page_number));
791 continue;
792 }
793 }
794 Bound::Unbounded => {}
795 }
796 match range.1 {
797 Bound::Excluded(upper) => {
800 if page_record.max.cmp(upper) != Ordering::Less {
801 matches.push(Matches::Some(page_record.page_number));
802 continue;
803 }
804 }
805 Bound::Included(upper) => {
808 if page_record.max.cmp(upper) == Ordering::Greater {
809 matches.push(Matches::Some(page_record.page_number));
810 continue;
811 }
812 }
813 Bound::Unbounded => {}
814 }
815 matches.push(Matches::All(page_record.page_number));
818 }
819 }
820
821 matches
822 }
823
824 fn pages_null(&self) -> Vec<Matches> {
825 self.null_pages
826 .iter()
827 .map(|page_id| Matches::Some(*page_id))
828 .chain(self.all_null_pages.iter().copied().map(Matches::All))
829 .collect()
830 }
831}
832
833#[derive(Clone)]
836struct LazyIndexReader {
837 index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
838 store: Arc<dyn IndexStore>,
839 ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
840}
841
842impl LazyIndexReader {
843 fn new(
844 store: Arc<dyn IndexStore>,
845 ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
846 ) -> Self {
847 Self {
848 index_reader: Arc::new(tokio::sync::Mutex::new(None)),
849 store,
850 ranges_to_files,
851 }
852 }
853
854 async fn get(&self) -> Result<Arc<dyn IndexReader>> {
855 let mut reader = self.index_reader.lock().await;
856 if reader.is_none() {
857 let index_reader = if let Some(ranges_to_files) = &self.ranges_to_files {
858 Arc::new(LazyRangedIndexReader::new(
859 self.store.clone(),
860 ranges_to_files.clone(),
861 ))
862 } else {
863 self.store.open_index_file(BTREE_PAGES_NAME).await?
864 };
865 *reader = Some(index_reader);
866 }
867 Ok(reader.as_ref().unwrap().clone())
868 }
869}
870
871struct LazyRangedIndexReader {
873 #[allow(clippy::type_complexity)]
874 readers:
875 Arc<tokio::sync::Mutex<HashMap<String, Arc<tokio::sync::OnceCell<Arc<dyn IndexReader>>>>>>,
876 store: Arc<dyn IndexStore>,
877 ranges_to_files: Arc<RangeInclusiveMap<u32, (String, u32)>>,
878}
879
880impl LazyRangedIndexReader {
881 fn new(
882 store: Arc<dyn IndexStore>,
883 ranges_to_files: Arc<RangeInclusiveMap<u32, (String, u32)>>,
884 ) -> Self {
885 Self {
886 readers: Arc::new(tokio::sync::Mutex::new(HashMap::new())),
887 store,
888 ranges_to_files,
889 }
890 }
891
892 async fn get_reader(&self, file_name: &str) -> Result<Arc<dyn IndexReader>> {
893 let reader_cell = {
894 let mut guard = self.readers.lock().await;
895 guard
896 .entry(file_name.to_string())
897 .or_insert_with(|| Arc::new(tokio::sync::OnceCell::new()))
898 .clone()
899 };
900 let reader = reader_cell
901 .get_or_try_init(|| async { self.store.open_index_file(file_name).await })
902 .await?;
903 Ok(reader.clone())
904 }
905
906 async fn get_reader_and_local_page_idx(
907 &self,
908 page_idx: u32,
909 ) -> Result<(Arc<dyn IndexReader>, u32)> {
910 let (page_file_name, offset) = self.ranges_to_files.get(&page_idx).ok_or_else(|| {
911 Error::internal(format!(
912 "Unexpected page index, index {} is out of range.",
913 page_idx
914 ))
915 })?;
916 let reader = self.get_reader(page_file_name).await?;
917 Ok((reader.clone(), page_idx - *offset))
918 }
919}
920
921#[async_trait]
922impl IndexReader for LazyRangedIndexReader {
923 async fn read_record_batch(&self, n: u64, batch_size: u64) -> Result<RecordBatch> {
924 let (reader, local_page_idx) = self.get_reader_and_local_page_idx(n as u32).await?;
925 reader
926 .read_record_batch(local_page_idx as u64, batch_size)
927 .await
928 }
929
930 async fn read_range(
931 &self,
932 _range: std::ops::Range<usize>,
933 _projection: Option<&[&str]>,
934 ) -> Result<RecordBatch> {
935 unimplemented!("Read range is not implemented for lazy page file reader.");
936 }
937
938 async fn num_batches(&self, batch_size: u64) -> u32 {
939 let mut total_batches = 0;
940 for (_, (file_name, _)) in self.ranges_to_files.iter() {
941 let reader = self
942 .get_reader(file_name)
943 .await
944 .unwrap_or_else(|_| panic!("Cannot open page file {}.", file_name));
945 total_batches += reader.as_ref().num_batches(batch_size).await;
946 }
947 total_batches
948 }
949
950 fn num_rows(&self) -> usize {
951 unimplemented!("only async functions are available for lazy page index reader.");
952 }
953
954 fn schema(&self) -> &lance_core::datatypes::Schema {
955 unimplemented!("only async functions are available for lazy page index reader.");
956 }
957}
958
959#[derive(Debug, Clone, DeepSizeOf)]
974pub struct CachedScalarIndex(Arc<dyn ScalarIndex>);
975
976impl CachedScalarIndex {
977 pub fn new(index: Arc<dyn ScalarIndex>) -> Self {
978 Self(index)
979 }
980
981 pub fn into_inner(self) -> Arc<dyn ScalarIndex> {
982 self.0
983 }
984}
985
986#[derive(Debug, Clone)]
987pub struct BTreePageKey {
988 pub page_number: u32,
989}
990
991impl CacheKey for BTreePageKey {
992 type ValueType = FlatIndex;
993
994 fn key(&self) -> std::borrow::Cow<'_, str> {
995 format!("page-{}", self.page_number).into()
996 }
997
998 fn type_name() -> &'static str {
999 "BTreePage"
1000 }
1001}
1002
1003#[derive(Clone, Debug)]
1006pub struct BTreeIndex {
1007 page_lookup: Arc<BTreeLookup>,
1008 index_cache: WeakLanceCache,
1009 store: Arc<dyn IndexStore>,
1010 data_type: DataType,
1011 batch_size: u64,
1012
1013 ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
1042 frag_reuse_index: Option<Arc<FragReuseIndex>>,
1043}
1044
1045impl DeepSizeOf for BTreeIndex {
1046 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
1047 self.page_lookup.deep_size_of_children(context) + self.store.deep_size_of_children(context)
1050 }
1051}
1052
1053impl BTreeIndex {
1054 #[allow(clippy::too_many_arguments)]
1055 fn new(
1056 page_lookup: Arc<BTreeLookup>,
1057 store: Arc<dyn IndexStore>,
1058 data_type: DataType,
1059 index_cache: WeakLanceCache,
1060 batch_size: u64,
1061 ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
1062 frag_reuse_index: Option<Arc<FragReuseIndex>>,
1063 ) -> Self {
1064 Self {
1065 page_lookup,
1066 store,
1067 data_type,
1068 index_cache,
1069 batch_size,
1070 ranges_to_files,
1071 frag_reuse_index,
1072 }
1073 }
1074
1075 async fn lookup_page(
1076 &self,
1077 page_number: u32,
1078 index_reader: LazyIndexReader,
1079 metrics: &dyn MetricsCollector,
1080 ) -> Result<Arc<FlatIndex>> {
1081 self.index_cache
1082 .get_or_insert_with_key(BTreePageKey { page_number }, move || async move {
1083 self.read_page(page_number, index_reader, metrics).await
1084 })
1085 .await
1086 }
1087
1088 #[instrument(level = "debug", skip_all)]
1089 async fn read_page(
1090 &self,
1091 page_number: u32,
1092 index_reader: LazyIndexReader,
1093 metrics: &dyn MetricsCollector,
1094 ) -> Result<FlatIndex> {
1095 metrics.record_part_load();
1096 info!(target: TRACE_IO_EVENTS, r#type=IO_TYPE_LOAD_SCALAR_PART, index_type="btree", part_id=page_number);
1097 let index_reader = index_reader.get().await?;
1098 let mut serialized_page = index_reader
1099 .read_record_batch(page_number as u64, self.batch_size)
1100 .await?;
1101 if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
1102 serialized_page =
1103 frag_reuse_index_ref.remap_row_ids_record_batch(serialized_page, 1)?;
1104 }
1105 FlatIndex::try_new(serialized_page)
1106 }
1107
1108 async fn search_page(
1109 &self,
1110 query: &SargableQuery,
1111 matches: Matches,
1112 index_reader: LazyIndexReader,
1113 metrics: &dyn MetricsCollector,
1114 ) -> Result<NullableRowAddrSet> {
1115 let subindex = self
1116 .lookup_page(matches.page_id(), index_reader, metrics)
1117 .await?;
1118
1119 match matches {
1120 Matches::Some(_) => {
1121 subindex.search(query, metrics)
1126 }
1127 Matches::All(_) => Ok(match query {
1128 SargableQuery::IsNull() => subindex.all_ignore_nulls(),
1130 _ => subindex.all(),
1131 }),
1132 }
1133 }
1134
1135 #[instrument(level = "debug", skip_all)]
1136 fn try_from_serialized(
1137 data: RecordBatch,
1138 store: Arc<dyn IndexStore>,
1139 index_cache: &LanceCache,
1140 batch_size: u64,
1141 ranges_to_files: Option<Arc<RangeInclusiveMap<u32, (String, u32)>>>,
1142 frag_reuse_index: Option<Arc<FragReuseIndex>>,
1143 ) -> Result<Self> {
1144 let mut map = BTreeMap::<OrderableScalarValue, Vec<PageRecord>>::new();
1145 let mut null_pages = Vec::<u32>::new();
1147 let mut all_null_pages = Vec::<u32>::new();
1149
1150 if data.num_rows() == 0 {
1151 let data_type = data.column(0).data_type().clone();
1152 let page_lookup = Arc::new(BTreeLookup::empty());
1153 return Ok(Self::new(
1154 page_lookup,
1155 store,
1156 data_type,
1157 WeakLanceCache::from(index_cache),
1158 batch_size,
1159 ranges_to_files,
1160 frag_reuse_index,
1161 ));
1162 }
1163
1164 let mins = data.column(0);
1165 let maxs = data.column(1);
1166 let null_counts = data
1167 .column(2)
1168 .as_any()
1169 .downcast_ref::<UInt32Array>()
1170 .unwrap();
1171 let page_numbers = data
1172 .column(3)
1173 .as_any()
1174 .downcast_ref::<UInt32Array>()
1175 .unwrap();
1176
1177 for idx in 0..data.num_rows() {
1178 let min = OrderableScalarValue(ScalarValue::try_from_array(&mins, idx)?);
1179 let max = OrderableScalarValue(ScalarValue::try_from_array(&maxs, idx)?);
1180 let null_count = null_counts.values()[idx];
1181 let page_number = page_numbers.values()[idx];
1182
1183 if max.0.is_null() {
1185 all_null_pages.push(page_number);
1186 continue;
1188 } else {
1189 map.entry(min)
1190 .or_default()
1191 .push(PageRecord { max, page_number });
1192 }
1193
1194 if null_count > 0 {
1195 null_pages.push(page_number);
1196 }
1197 }
1198
1199 let last_max = ScalarValue::try_from_array(&maxs, data.num_rows() - 1)?;
1200 map.entry(OrderableScalarValue(last_max)).or_default();
1201
1202 let data_type = mins.data_type();
1203
1204 let page_lookup = Arc::new(BTreeLookup::new(map, null_pages, all_null_pages));
1205
1206 Ok(Self::new(
1207 page_lookup,
1208 store,
1209 data_type.clone(),
1210 WeakLanceCache::from(index_cache),
1211 batch_size,
1212 ranges_to_files,
1213 frag_reuse_index,
1214 ))
1215 }
1216
1217 async fn load(
1218 store: Arc<dyn IndexStore>,
1219 frag_reuse_index: Option<Arc<FragReuseIndex>>,
1220 index_cache: &LanceCache,
1221 ) -> Result<Arc<Self>> {
1222 let page_lookup_file = store.open_index_file(BTREE_LOOKUP_NAME).await?;
1223 let num_rows_in_lookup = page_lookup_file.num_rows();
1224 let serialized_lookup = page_lookup_file
1225 .read_range(0..num_rows_in_lookup, None)
1226 .await?;
1227 let file_schema = page_lookup_file.schema();
1228 let batch_size = file_schema
1229 .metadata
1230 .get(BATCH_SIZE_META_KEY)
1231 .map(|bs| bs.parse().unwrap_or(DEFAULT_BTREE_BATCH_SIZE))
1232 .unwrap_or(DEFAULT_BTREE_BATCH_SIZE);
1233
1234 let range_partitioned = file_schema
1235 .metadata
1236 .get(RANGE_PARTITIONED_META_KEY)
1237 .map(|bs| bs.parse().unwrap_or(DEFAULT_RANGE_PARTITIONED))
1238 .unwrap_or(DEFAULT_RANGE_PARTITIONED);
1239 let ranges_to_files = if range_partitioned {
1243 let part_sizes_str = file_schema
1244 .metadata
1245 .get(PAGE_NUM_PER_RANGE_PARTITION_META_KEY)
1246 .expect("Range-partitioned Btree lookup file must have page-number-per-range-file metadata!");
1247 let part_sizes_vec: Vec<(u64, u32)> = serde_json::from_str(part_sizes_str)?;
1248 let mut offset: u32 = 0;
1249
1250 let range_map = part_sizes_vec
1251 .into_iter()
1252 .map(|(id, size)| {
1253 let range = offset..=(offset + size - 1);
1254 let file_with_size = (part_page_data_file_path(id), offset);
1255 offset += size;
1256 (range, file_with_size)
1257 })
1258 .collect();
1259
1260 Some(Arc::new(range_map))
1261 } else {
1262 None
1263 };
1264
1265 Ok(Arc::new(Self::try_from_serialized(
1266 serialized_lookup,
1267 store,
1268 index_cache,
1269 batch_size,
1270 ranges_to_files,
1271 frag_reuse_index,
1272 )?))
1273 }
1274
1275 fn train_schema(&self) -> Schema {
1277 let value_field = Field::new(VALUE_COLUMN_NAME, self.data_type.clone(), true);
1278 let row_id_field = Field::new(ROW_ID, DataType::UInt64, false);
1279 Schema::new(vec![value_field, row_id_field])
1280 }
1281
1282 async fn into_data_stream(self) -> Result<SendableRecordBatchStream> {
1284 let lazy_reader = LazyIndexReader::new(self.store.clone(), self.ranges_to_files.clone());
1285 let reader = lazy_reader.get().await?;
1286 let new_schema = Arc::new(self.train_schema());
1287 let new_schema_clone = new_schema.clone();
1288 let reader_stream = IndexReaderStream::new(reader, self.batch_size).await;
1289 let batches = reader_stream
1290 .map(|fut| fut.map_err(DataFusionError::from))
1291 .buffered(self.store.io_parallelism())
1292 .map_ok(move |batch| {
1293 RecordBatch::try_new(
1294 new_schema.clone(),
1295 vec![batch.column(0).clone(), batch.column(1).clone()],
1296 )
1297 .unwrap()
1298 })
1299 .boxed();
1300 Ok(Box::pin(RecordBatchStreamAdapter::new(
1301 new_schema_clone,
1302 batches,
1303 )))
1304 }
1305
1306 async fn combine_old_new(
1307 self,
1308 new_data: SendableRecordBatchStream,
1309 chunk_size: u64,
1310 old_data_filter: Option<OldIndexDataFilter>,
1311 ) -> Result<SendableRecordBatchStream> {
1312 let value_column_index = new_data.schema().index_of(VALUE_COLUMN_NAME)?;
1313
1314 let new_input = Arc::new(OneShotExec::new(new_data));
1315 let old_stream = self.into_data_stream().await?;
1316 let old_stream = match old_data_filter {
1317 Some(filter) => filter_row_ids(old_stream, filter),
1318 None => old_stream,
1319 };
1320 let old_input = Arc::new(OneShotExec::new(old_stream));
1321 debug_assert_eq!(
1322 old_input.schema().flattened_fields().len(),
1323 new_input.schema().flattened_fields().len()
1324 );
1325
1326 let sort_expr = PhysicalSortExpr {
1327 expr: Arc::new(Column::new(VALUE_COLUMN_NAME, value_column_index)),
1328 options: SortOptions {
1329 descending: false,
1330 nulls_first: true,
1331 },
1332 };
1333 let all_data = UnionExec::try_new(vec![old_input, new_input])?;
1336 let ordered = Arc::new(SortPreservingMergeExec::new([sort_expr].into(), all_data));
1337
1338 let unchunked = execute_plan(
1339 ordered,
1340 LanceExecutionOptions {
1341 use_spilling: true,
1342 ..Default::default()
1343 },
1344 )?;
1345 Ok(chunk_concat_stream(unchunked, chunk_size as usize))
1346 }
1347}
1348
1349fn filter_row_ids(
1352 stream: SendableRecordBatchStream,
1353 old_data_filter: OldIndexDataFilter,
1354) -> SendableRecordBatchStream {
1355 let schema = stream.schema();
1356 let filtered = stream.map(move |batch_result| {
1357 let batch = batch_result?;
1358 let row_ids = batch[ROW_ID]
1359 .as_any()
1360 .downcast_ref::<arrow_array::UInt64Array>()
1361 .ok_or_else(|| Error::internal("expected UInt64Array for row_id column"))?;
1362 let mask = old_data_filter.filter_row_ids(row_ids);
1363 Ok(arrow_select::filter::filter_record_batch(&batch, &mask)?)
1364 });
1365 Box::pin(RecordBatchStreamAdapter::new(schema, filtered))
1366}
1367
1368fn wrap_bound(bound: &Bound<ScalarValue>) -> Bound<OrderableScalarValue> {
1369 match bound {
1370 Bound::Unbounded => Bound::Unbounded,
1371 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
1372 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
1373 }
1374}
1375
1376fn serialize_with_display<T: Display, S: Serializer>(
1377 value: &Option<T>,
1378 serializer: S,
1379) -> std::result::Result<S::Ok, S::Error> {
1380 if let Some(value) = value {
1381 serializer.collect_str(value)
1382 } else {
1383 serializer.collect_str("N/A")
1384 }
1385}
1386
1387#[derive(Serialize)]
1388struct BTreeStatistics {
1389 #[serde(serialize_with = "serialize_with_display")]
1390 min: Option<OrderableScalarValue>,
1391 #[serde(serialize_with = "serialize_with_display")]
1392 max: Option<OrderableScalarValue>,
1393 num_pages: u32,
1394}
1395
1396#[async_trait]
1397impl Index for BTreeIndex {
1398 fn as_any(&self) -> &dyn Any {
1399 self
1400 }
1401
1402 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
1403 self
1404 }
1405
1406 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
1407 Err(Error::not_supported_source(
1408 "BTreeIndex is not vector index".into(),
1409 ))
1410 }
1411
1412 async fn prewarm(&self) -> Result<()> {
1413 let index_reader = LazyIndexReader::new(self.store.clone(), self.ranges_to_files.clone());
1414 let reader = index_reader.get().await?;
1415 let num_pages = reader.num_batches(self.batch_size).await;
1416 let mut pages = stream::iter(0..num_pages)
1417 .map(|page_idx| {
1418 let index_reader = index_reader.clone();
1419 async move {
1420 let page = self
1421 .read_page(page_idx, index_reader, &NoOpMetricsCollector)
1422 .await?;
1423 Result::Ok((page_idx, page))
1424 }
1425 })
1426 .buffer_unordered(get_num_compute_intensive_cpus());
1427
1428 while let Some((page_idx, page)) = pages.try_next().await? {
1429 let inserted = self
1430 .index_cache
1431 .insert_with_key(
1432 &BTreePageKey {
1433 page_number: page_idx,
1434 },
1435 Arc::new(page),
1436 )
1437 .await;
1438
1439 if !inserted {
1440 return Err(Error::internal(
1441 "Failed to prewarm index: cache is no longer available".to_string(),
1442 ));
1443 }
1444 }
1445
1446 Ok(())
1447 }
1448
1449 fn index_type(&self) -> IndexType {
1450 IndexType::BTree
1451 }
1452
1453 fn statistics(&self) -> Result<serde_json::Value> {
1454 let min = self
1455 .page_lookup
1456 .tree
1457 .first_key_value()
1458 .map(|(k, _)| k.clone());
1459 let max = self
1460 .page_lookup
1461 .tree
1462 .last_key_value()
1463 .map(|(k, _)| k.clone());
1464 serde_json::to_value(&BTreeStatistics {
1465 num_pages: self.page_lookup.tree.len() as u32,
1466 min,
1467 max,
1468 })
1469 .map_err(|err| err.into())
1470 }
1471
1472 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
1473 let mut frag_ids = RoaringBitmap::default();
1474
1475 let lazy_reader = LazyIndexReader::new(self.store.clone(), self.ranges_to_files.clone());
1476 let sub_index_reader = lazy_reader.get().await?;
1477 let mut reader_stream = IndexReaderStream::new(sub_index_reader, self.batch_size)
1478 .await
1479 .buffered(self.store.io_parallelism());
1480 while let Some(serialized) = reader_stream.try_next().await? {
1481 let page = FlatIndex::try_new(serialized)?;
1482 frag_ids |= page.calculate_included_frags()?;
1483 }
1484
1485 Ok(frag_ids)
1486 }
1487}
1488
1489#[async_trait]
1490impl ScalarIndex for BTreeIndex {
1491 async fn search(
1492 &self,
1493 query: &dyn AnyQuery,
1494 metrics: &dyn MetricsCollector,
1495 ) -> Result<SearchResult> {
1496 let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
1497 let mut pages = match query {
1498 SargableQuery::Equals(val) => self
1499 .page_lookup
1500 .pages_eq(&OrderableScalarValue(val.clone())),
1501 SargableQuery::Range(start, end) => self
1502 .page_lookup
1503 .pages_between((wrap_bound(start).as_ref(), wrap_bound(end).as_ref())),
1504 SargableQuery::IsIn(values) => self
1505 .page_lookup
1506 .pages_in(values.iter().map(|val| OrderableScalarValue(val.clone()))),
1507 SargableQuery::FullTextSearch(_) => {
1508 return Err(Error::invalid_input(
1509 "full text search is not supported for BTree index, build a inverted index for it",
1510 ));
1511 }
1512 SargableQuery::IsNull() => self.page_lookup.pages_null(),
1513 SargableQuery::LikePrefix(prefix) => {
1514 match prefix {
1516 ScalarValue::Utf8(Some(s)) => {
1517 let start = Bound::Included(OrderableScalarValue(prefix.clone()));
1518 let end = match compute_next_prefix(s) {
1519 Some(next) => {
1520 Bound::Excluded(OrderableScalarValue(ScalarValue::Utf8(Some(next))))
1521 }
1522 None => Bound::Unbounded,
1523 };
1524 self.page_lookup
1525 .pages_between((start.as_ref(), end.as_ref()))
1526 }
1527 ScalarValue::LargeUtf8(Some(s)) => {
1528 let start = Bound::Included(OrderableScalarValue(prefix.clone()));
1529 let end = match compute_next_prefix(s) {
1530 Some(next) => Bound::Excluded(OrderableScalarValue(
1531 ScalarValue::LargeUtf8(Some(next)),
1532 )),
1533 None => Bound::Unbounded,
1534 };
1535 self.page_lookup
1536 .pages_between((start.as_ref(), end.as_ref()))
1537 }
1538 _ => {
1539 self.page_lookup
1542 .pages_between((Bound::Unbounded, Bound::Unbounded))
1543 }
1544 }
1545 }
1546 };
1547
1548 if !matches!(query, SargableQuery::IsNull()) {
1558 let existing: HashSet<u32> = pages.iter().map(|m| m.page_id()).collect();
1559 for &page_id in self
1560 .page_lookup
1561 .null_pages
1562 .iter()
1563 .chain(self.page_lookup.all_null_pages.iter())
1564 {
1565 if !existing.contains(&page_id) {
1566 pages.push(Matches::Some(page_id));
1567 }
1568 }
1569 }
1570
1571 let lazy_index_reader =
1572 LazyIndexReader::new(self.store.clone(), self.ranges_to_files.clone());
1573 let page_tasks = pages
1574 .into_iter()
1575 .map(|page_index| {
1576 self.search_page(query, page_index, lazy_index_reader.clone(), metrics)
1577 .boxed()
1578 })
1579 .collect::<Vec<_>>();
1580 debug!("Searching {} btree pages", page_tasks.len());
1581
1582 let results: Vec<NullableRowAddrSet> = stream::iter(page_tasks)
1584 .buffered(get_num_compute_intensive_cpus())
1587 .try_collect()
1588 .await?;
1589
1590 let selection = NullableRowAddrSet::union_all(&results);
1592
1593 Ok(SearchResult::Exact(selection))
1594 }
1595
1596 fn can_remap(&self) -> bool {
1597 true
1598 }
1599
1600 async fn remap(
1601 &self,
1602 mapping: &HashMap<u64, Option<u64>>,
1603 dest_store: &dyn IndexStore,
1604 ) -> Result<CreatedIndex> {
1605 let part_page_files: Vec<(Option<u32>, &str)> =
1610 if let Some(ranges_to_files) = &self.ranges_to_files {
1611 ranges_to_files
1613 .iter()
1614 .enumerate()
1615 .map(|(part_id, (_, (path, _)))| (Some(part_id as u32), path.as_str()))
1616 .collect()
1617 } else {
1618 vec![(None, BTREE_PAGES_NAME)]
1620 };
1621
1622 let mapping = Arc::new(mapping.clone());
1623 let train_schema = Arc::new(self.train_schema());
1624
1625 for (part_id, page_file) in part_page_files {
1627 let sub_index_reader = self.store.open_index_file(page_file).await?;
1629 let mapping = mapping.clone();
1630
1631 let train_schema_clone = train_schema.clone();
1632 let train_schema = train_schema.clone();
1633
1634 let remapped_stream = IndexReaderStream::new(sub_index_reader, self.batch_size)
1635 .await
1636 .buffered(self.store.io_parallelism())
1637 .map_err(DataFusionError::from)
1638 .and_then(move |batch| {
1639 let remapped =
1641 FlatIndex::remap_batch(batch, &mapping).map_err(DataFusionError::from);
1642 let with_train_schema = remapped.and_then(|batch| {
1643 RecordBatch::try_new(train_schema.clone(), batch.columns().to_vec())
1644 .map_err(DataFusionError::from)
1645 });
1646 std::future::ready(with_train_schema)
1647 });
1648
1649 let remapped_stream = Box::pin(RecordBatchStreamAdapter::new(
1650 train_schema_clone,
1651 remapped_stream,
1652 ));
1653
1654 train_btree_index(remapped_stream, dest_store, self.batch_size, None, part_id).await?;
1655 }
1656
1657 if let Some(ranges_to_files) = &self.ranges_to_files {
1658 let num_parts = ranges_to_files.len();
1659 let page_files = (0..num_parts)
1661 .map(|part_id| part_page_data_file_path((part_id as u64) << 32))
1662 .collect::<Vec<_>>();
1663 let lookup_files = (0..num_parts)
1664 .map(|part_id| part_lookup_file_path((part_id as u64) << 32))
1665 .collect::<Vec<_>>();
1666 merge_metadata_files(
1667 dest_store,
1668 &page_files,
1669 &lookup_files,
1670 None,
1671 noop_progress(),
1672 )
1673 .await?;
1674 }
1675
1676 Ok(CreatedIndex {
1677 index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1678 .unwrap(),
1679 index_version: BTREE_INDEX_VERSION,
1680 files: Some(dest_store.list_files_with_sizes().await?),
1681 })
1682 }
1683
1684 async fn update(
1685 &self,
1686 new_data: SendableRecordBatchStream,
1687 dest_store: &dyn IndexStore,
1688 old_data_filter: Option<OldIndexDataFilter>,
1689 ) -> Result<CreatedIndex> {
1690 let merged_data_source = self
1692 .clone()
1693 .combine_old_new(new_data, self.batch_size, old_data_filter)
1694 .await?;
1695 train_btree_index(merged_data_source, dest_store, self.batch_size, None, None).await?;
1696
1697 Ok(CreatedIndex {
1698 index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1699 .unwrap(),
1700 index_version: BTREE_INDEX_VERSION,
1701 files: Some(dest_store.list_files_with_sizes().await?),
1702 })
1703 }
1704
1705 fn update_criteria(&self) -> UpdateCriteria {
1706 UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::Values).with_row_id())
1707 }
1708
1709 fn derive_index_params(&self) -> Result<ScalarIndexParams> {
1710 let params = serde_json::to_value(BTreeParameters {
1711 zone_size: Some(self.batch_size),
1712 range_id: None,
1713 })?;
1714 Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::BTree).with_params(¶ms))
1715 }
1716}
1717
1718struct BatchStats {
1719 min: ScalarValue,
1720 max: ScalarValue,
1721 null_count: u32,
1722}
1723
1724fn analyze_batch(batch: &RecordBatch) -> Result<BatchStats> {
1725 let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
1726 if values.is_empty() {
1727 return Err(Error::internal(
1728 "received an empty batch in btree training".to_string(),
1729 ));
1730 }
1731 let min = ScalarValue::try_from_array(&values, 0)
1732 .map_err(|e| Error::internal(format!("failed to get min value from batch: {}", e)))?;
1733 let max = ScalarValue::try_from_array(&values, values.len() - 1)
1734 .map_err(|e| Error::internal(format!("failed to get max value from batch: {}", e)))?;
1735
1736 Ok(BatchStats {
1737 min,
1738 max,
1739 null_count: values.null_count() as u32,
1740 })
1741}
1742
1743#[async_trait]
1745pub trait BTreeSubIndex: Debug + Send + Sync + DeepSizeOf {
1746 async fn train(&self, batch: RecordBatch) -> Result<RecordBatch>;
1748
1749 async fn load_subindex(&self, serialized: RecordBatch) -> Result<Arc<dyn ScalarIndex>>;
1751
1752 async fn retrieve_data(&self, serialized: RecordBatch) -> Result<RecordBatch>;
1759
1760 fn schema(&self) -> &Arc<Schema>;
1762
1763 async fn remap_subindex(
1765 &self,
1766 serialized: RecordBatch,
1767 mapping: &HashMap<u64, Option<u64>>,
1768 ) -> Result<RecordBatch>;
1769}
1770
1771struct EncodedBatch {
1772 stats: BatchStats,
1773 page_number: u32,
1774}
1775
1776async fn train_btree_page(
1777 batch: RecordBatch,
1778 batch_idx: u32,
1779 writer: &mut dyn IndexWriter,
1780 schema: Arc<Schema>,
1781) -> Result<EncodedBatch> {
1782 let stats = analyze_batch(&batch)?;
1783
1784 let trained = RecordBatch::try_new(
1786 schema.clone(),
1787 vec![
1788 batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?.clone(),
1789 batch.column_by_name(ROW_ID).expect_ok()?.clone(),
1790 ],
1791 )?;
1792
1793 writer.write_record_batch(trained).await?;
1794 Ok(EncodedBatch {
1795 stats,
1796 page_number: batch_idx,
1797 })
1798}
1799
1800fn btree_stats_as_batch(stats: Vec<EncodedBatch>, value_type: &DataType) -> Result<RecordBatch> {
1801 let mins = if stats.is_empty() {
1802 new_empty_array(value_type)
1803 } else {
1804 ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.min.clone()))?
1805 };
1806 let maxs = if stats.is_empty() {
1807 new_empty_array(value_type)
1808 } else {
1809 ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.max.clone()))?
1810 };
1811 let null_counts = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.stats.null_count));
1812 let page_numbers = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.page_number));
1813
1814 let schema = Arc::new(Schema::new(vec![
1815 Field::new("min", mins.data_type().clone(), true),
1817 Field::new("max", maxs.data_type().clone(), true),
1818 Field::new("null_count", null_counts.data_type().clone(), false),
1819 Field::new("page_idx", page_numbers.data_type().clone(), false),
1820 ]));
1821
1822 let columns = vec![
1823 mins,
1824 maxs,
1825 Arc::new(null_counts) as Arc<dyn Array>,
1826 Arc::new(page_numbers) as Arc<dyn Array>,
1827 ];
1828
1829 Ok(RecordBatch::try_new(schema, columns)?)
1830}
1831
1832pub async fn train_btree_index(
1834 batches_source: SendableRecordBatchStream,
1835 index_store: &dyn IndexStore,
1836 batch_size: u64,
1837 fragment_ids: Option<Vec<u32>>,
1838 range_id: Option<u32>,
1839) -> Result<()> {
1840 let partition_id = fragment_ids
1845 .as_ref()
1846 .and_then(|frag_ids| frag_ids.first())
1850 .map(|&first_frag_id| (first_frag_id as u64) << 32)
1851 .or_else(|| range_id.map(|id| (id as u64) << 32));
1855
1856 let flat_schema = Arc::new(Schema::new(vec![
1857 Field::new(
1858 BTREE_VALUES_COLUMN,
1859 batches_source.schema().field(0).data_type().clone(),
1860 true,
1861 ),
1862 Field::new(BTREE_IDS_COLUMN, DataType::UInt64, false),
1863 ]));
1864
1865 let mut sub_index_file = match partition_id {
1866 None => {
1867 index_store
1868 .new_index_file(BTREE_PAGES_NAME, flat_schema.clone())
1869 .await?
1870 }
1871 Some(partition_id) => {
1872 index_store
1873 .new_index_file(
1874 part_page_data_file_path(partition_id).as_str(),
1875 flat_schema.clone(),
1876 )
1877 .await?
1878 }
1879 };
1880
1881 let mut encoded_batches = Vec::new();
1882 let mut batch_idx = 0;
1883
1884 let value_type = batches_source
1885 .schema()
1886 .field_with_name(VALUE_COLUMN_NAME)?
1887 .data_type()
1888 .clone();
1889
1890 let mut batches_source = chunk_concat_stream(batches_source, batch_size as usize);
1891
1892 while let Some(batch) = batches_source.try_next().await? {
1893 encoded_batches.push(
1894 train_btree_page(
1895 batch,
1896 batch_idx,
1897 sub_index_file.as_mut(),
1898 flat_schema.clone(),
1899 )
1900 .await?,
1901 );
1902 batch_idx += 1;
1903 }
1904 sub_index_file.finish().await?;
1905 let record_batch = btree_stats_as_batch(encoded_batches, &value_type)?;
1906 let mut file_schema = record_batch.schema().as_ref().clone();
1907 file_schema
1908 .metadata
1909 .insert(BATCH_SIZE_META_KEY.to_string(), batch_size.to_string());
1910 file_schema.metadata.insert(
1911 RANGE_PARTITIONED_META_KEY.to_string(),
1912 range_id.is_some().to_string(),
1913 );
1914 let mut btree_index_file = match partition_id {
1915 None => {
1916 index_store
1917 .new_index_file(BTREE_LOOKUP_NAME, Arc::new(file_schema))
1918 .await?
1919 }
1920 Some(partition_id) => {
1921 index_store
1922 .new_index_file(
1923 part_lookup_file_path(partition_id).as_str(),
1924 Arc::new(file_schema),
1925 )
1926 .await?
1927 }
1928 };
1929 btree_index_file.write_record_batch(record_batch).await?;
1930 btree_index_file.finish().await?;
1931 Ok(())
1932}
1933
1934pub async fn merge_index_files(
1935 object_store: &ObjectStore,
1936 index_dir: &Path,
1937 store: Arc<dyn IndexStore>,
1938 batch_readhead: Option<usize>,
1939 progress: Arc<dyn IndexBuildProgress>,
1940) -> Result<()> {
1941 let (part_page_files, part_lookup_files) =
1943 list_page_lookup_files(object_store, index_dir).await?;
1944 merge_metadata_files(
1945 store.as_ref(),
1946 &part_page_files,
1947 &part_lookup_files,
1948 batch_readhead,
1949 progress,
1950 )
1951 .await
1952}
1953
1954async fn list_page_lookup_files(
1957 object_store: &ObjectStore,
1958 index_dir: &Path,
1959) -> Result<(Vec<String>, Vec<String>)> {
1960 let mut part_page_files = Vec::new();
1961 let mut part_lookup_files = Vec::new();
1962
1963 let mut list_stream = object_store.list(Some(index_dir.clone()));
1964
1965 while let Some(item) = list_stream.next().await {
1966 match item {
1967 Ok(meta) => {
1968 let file_name = meta.location.filename().unwrap_or_default();
1969 if file_name.starts_with("part_") && file_name.ends_with("_page_data.lance") {
1971 part_page_files.push(file_name.to_string());
1972 }
1973 if file_name.starts_with("part_") && file_name.ends_with("_page_lookup.lance") {
1975 part_lookup_files.push(file_name.to_string());
1976 }
1977 }
1978 Err(_) => continue,
1979 }
1980 }
1981
1982 if part_page_files.is_empty() || part_lookup_files.is_empty() {
1983 return Err(Error::internal(format!(
1984 "No partition metadata files found in index directory: {} (page_files: {}, lookup_files: {})",
1985 index_dir,
1986 part_page_files.len(),
1987 part_lookup_files.len()
1988 )));
1989 }
1990
1991 Ok((part_page_files, part_lookup_files))
1992}
1993
1994async fn merge_metadata_files(
2001 store: &dyn IndexStore,
2002 part_page_files: &[String],
2003 part_lookup_files: &[String],
2004 batch_readhead: Option<usize>,
2005 progress: Arc<dyn IndexBuildProgress>,
2006) -> Result<()> {
2007 if part_lookup_files.is_empty() || part_page_files.is_empty() {
2008 return Err(Error::internal(
2009 "No partition files provided for merging".to_string(),
2010 ));
2011 }
2012
2013 if part_lookup_files.len() != part_page_files.len() {
2015 return Err(Error::internal(format!(
2016 "Number of partition lookup files ({}) does not match number of partition page files ({})",
2017 part_lookup_files.len(),
2018 part_page_files.len()
2019 )));
2020 }
2021 let mut page_files_map = HashMap::new();
2022 for page_file in part_page_files {
2023 let partition_id = extract_partition_id(page_file)?;
2024 page_files_map.insert(partition_id, page_file);
2025 }
2026
2027 for lookup_file in part_lookup_files {
2029 let partition_id = extract_partition_id(lookup_file)?;
2030 if !page_files_map.contains_key(&partition_id) {
2031 return Err(Error::internal(format!(
2032 "No corresponding page file found for lookup file: {} (partition_id: {})",
2033 lookup_file, partition_id
2034 )));
2035 }
2036 }
2037
2038 let first_lookup_reader = store.open_index_file(&part_lookup_files[0]).await?;
2040 let batch_size = first_lookup_reader
2041 .schema()
2042 .metadata
2043 .get(BATCH_SIZE_META_KEY)
2044 .map(|bs| bs.parse().unwrap_or(DEFAULT_BTREE_BATCH_SIZE))
2045 .unwrap_or(DEFAULT_BTREE_BATCH_SIZE);
2046 let range_partitioned = first_lookup_reader
2047 .schema()
2048 .metadata
2049 .get(RANGE_PARTITIONED_META_KEY)
2050 .map(|bs| bs.parse().unwrap_or(DEFAULT_RANGE_PARTITIONED))
2051 .unwrap_or(DEFAULT_RANGE_PARTITIONED);
2052
2053 let value_type = first_lookup_reader
2055 .schema()
2056 .fields
2057 .first()
2058 .unwrap()
2059 .data_type();
2060
2061 let mut metadata = HashMap::new();
2062 metadata.insert(BATCH_SIZE_META_KEY.to_string(), batch_size.to_string());
2063 let lookup_schema = Arc::new(Schema::new(vec![
2064 Field::new("min", value_type.clone(), true),
2065 Field::new("max", value_type.clone(), true),
2066 Field::new("null_count", DataType::UInt32, false),
2067 Field::new("page_idx", DataType::UInt32, false),
2068 ]));
2069
2070 if range_partitioned {
2072 merge_range_partitioned_lookups(
2073 store,
2074 part_lookup_files,
2075 lookup_schema,
2076 metadata,
2077 batch_size,
2078 batch_readhead,
2079 progress,
2080 )
2081 .await
2082 } else {
2083 merge_pages_and_lookups(
2084 store,
2085 part_page_files,
2086 part_lookup_files,
2087 &page_files_map,
2088 lookup_schema,
2089 metadata,
2090 batch_size,
2091 batch_readhead,
2092 progress,
2093 )
2094 .await
2095 }
2096}
2097
2098async fn merge_range_partitioned_lookups(
2128 store: &dyn IndexStore,
2129 part_lookup_files: &[String],
2130 lookup_schema: Arc<Schema>,
2131 mut metadata: HashMap<String, String>,
2132 batch_size: u64,
2133 batch_readhead: Option<usize>,
2134 progress: Arc<dyn IndexBuildProgress>,
2135) -> Result<()> {
2136 let sorted_part_lookup_files = sort_files_by_partition_id(part_lookup_files)?;
2137 let mut lookup_file = store
2138 .new_index_file(BTREE_LOOKUP_NAME, lookup_schema)
2139 .await?;
2140
2141 let mut pages_per_file: Vec<(u64, u32)> = Vec::with_capacity(sorted_part_lookup_files.len());
2143 let mut num_pages_written = 0u32;
2144
2145 progress
2146 .stage_start(
2147 "merge_lookups",
2148 Some(sorted_part_lookup_files.len() as u64),
2149 "files",
2150 )
2151 .await?;
2152
2153 for (idx, (part_id, part_lookup_file)) in sorted_part_lookup_files.into_iter().enumerate() {
2154 let lookup_reader = store.open_index_file(&part_lookup_file).await?;
2155 let reader_stream = IndexReaderStream::new(lookup_reader.clone(), batch_size).await;
2156 let mut stream = reader_stream.buffered(batch_readhead.unwrap_or(1)).boxed();
2157 while let Some(batch) = stream.next().await {
2158 let original_batch = batch?;
2159 let modified_batch = add_offset_to_page_idx(&original_batch, num_pages_written)?;
2160 lookup_file.write_record_batch(modified_batch).await?;
2161 }
2162 pages_per_file.push((part_id, lookup_reader.num_rows() as u32));
2163 num_pages_written += lookup_reader.num_rows() as u32;
2164 progress
2165 .stage_progress("merge_lookups", idx as u64 + 1)
2166 .await?;
2167 }
2168
2169 metadata.insert(RANGE_PARTITIONED_META_KEY.to_string(), "true".to_string());
2170 metadata.insert(
2171 PAGE_NUM_PER_RANGE_PARTITION_META_KEY.to_string(),
2172 serde_json::to_string(&pages_per_file)?,
2173 );
2174
2175 lookup_file.finish_with_metadata(metadata).await?;
2176 progress.stage_complete("merge_lookups").await?;
2177
2178 cleanup_partition_files(store, part_lookup_files, &[]).await;
2180 Ok(())
2181}
2182
2183#[allow(clippy::too_many_arguments)]
2189async fn merge_pages_and_lookups(
2190 store: &dyn IndexStore,
2191 part_page_files: &[String],
2192 part_lookup_files: &[String],
2193 page_files_map: &HashMap<u64, &String>,
2194 lookup_schema: Arc<Schema>,
2195 metadata: HashMap<String, String>,
2196 batch_size: u64,
2197 batch_readhead: Option<usize>,
2198 progress: Arc<dyn IndexBuildProgress>,
2199) -> Result<()> {
2200 let partition_id = extract_partition_id(part_lookup_files[0].as_str())?;
2202 let page_file = page_files_map.get(&partition_id).unwrap();
2203 let page_reader = store.open_index_file(page_file).await?;
2204 let page_schema = page_reader.schema().clone();
2205
2206 let arrow_schema = Arc::new(Schema::from(&page_schema));
2207 let mut page_file = store
2208 .new_index_file(BTREE_PAGES_NAME, arrow_schema.clone())
2209 .await?;
2210 progress.stage_start("merge_pages", None, "pages").await?;
2211 let lookup_entries = merge_pages(
2212 part_lookup_files,
2213 page_files_map,
2214 store,
2215 batch_size,
2216 &mut page_file,
2217 arrow_schema.clone(),
2218 batch_readhead,
2219 progress.clone(),
2220 )
2221 .await?;
2222 page_file.finish().await?;
2223 progress.stage_complete("merge_pages").await?;
2224
2225 let lookup_batch = RecordBatch::try_new(
2226 lookup_schema.clone(),
2227 vec![
2228 ScalarValue::iter_to_array(lookup_entries.iter().map(|(min, _, _, _)| min.clone()))?,
2229 ScalarValue::iter_to_array(lookup_entries.iter().map(|(_, max, _, _)| max.clone()))?,
2230 Arc::new(UInt32Array::from_iter_values(
2231 lookup_entries
2232 .iter()
2233 .map(|(_, _, null_count, _)| *null_count),
2234 )),
2235 Arc::new(UInt32Array::from_iter_values(
2236 lookup_entries.iter().map(|(_, _, _, page_idx)| *page_idx),
2237 )),
2238 ],
2239 )?;
2240 let mut lookup_file = store
2241 .new_index_file(BTREE_LOOKUP_NAME, lookup_schema)
2242 .await?;
2243 progress
2244 .stage_start("write_lookup_file", Some(1), "files")
2245 .await?;
2246 lookup_file.write_record_batch(lookup_batch).await?;
2247 lookup_file.finish_with_metadata(metadata).await?;
2248 progress.stage_progress("write_lookup_file", 1).await?;
2249 progress.stage_complete("write_lookup_file").await?;
2250
2251 cleanup_partition_files(store, part_lookup_files, part_page_files).await;
2254
2255 Ok(())
2256}
2257
2258fn add_offset_to_page_idx(batch: &RecordBatch, offset: u32) -> Result<RecordBatch> {
2260 let (page_idx_pos, _) = batch.schema().column_with_name("page_idx").ok_or_else(|| {
2261 Error::internal("Column 'page_idx' not found in RecordBatch schema".to_string())
2262 })?;
2263 let page_idx_array = batch
2264 .column(page_idx_pos)
2265 .as_any()
2266 .downcast_ref::<UInt32Array>()
2267 .ok_or_else(|| {
2268 Error::internal("Failed to downcast 'page_idx' column to UInt32Array".to_string())
2269 })?;
2270 let offset_array = UInt32Array::from(vec![offset; page_idx_array.len()]);
2271 let new_page_idx_array_ref = add(page_idx_array, &offset_array)?;
2272 let mut new_columns = batch.columns().to_vec();
2273 new_columns[page_idx_pos] = new_page_idx_array_ref;
2274 let new_batch = RecordBatch::try_new(batch.schema(), new_columns)?;
2275 Ok(new_batch)
2276}
2277
2278#[allow(clippy::too_many_arguments)]
2281async fn merge_pages(
2282 part_lookup_files: &[String],
2283 page_files_map: &HashMap<u64, &String>,
2284 store: &dyn IndexStore,
2285 batch_size: u64,
2286 page_file: &mut Box<dyn IndexWriter>,
2287 arrow_schema: Arc<Schema>,
2288 batch_readhead: Option<usize>,
2289 progress: Arc<dyn IndexBuildProgress>,
2290) -> Result<Vec<(ScalarValue, ScalarValue, u32, u32)>> {
2291 let mut lookup_entries = Vec::new();
2292 let mut page_idx = 0u32;
2293
2294 debug!(
2295 "Starting SortPreservingMerge with {} partitions",
2296 part_lookup_files.len()
2297 );
2298
2299 let value_field = arrow_schema.field(0).clone().with_name(VALUE_COLUMN_NAME);
2300 let row_id_field = arrow_schema.field(1).clone().with_name(ROW_ID);
2301 let stream_schema = Arc::new(Schema::new(vec![value_field, row_id_field]));
2302
2303 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = Vec::new();
2305 for lookup_file in part_lookup_files {
2306 let partition_id = extract_partition_id(lookup_file)?;
2307 let page_file_name = (*page_files_map.get(&partition_id).ok_or_else(|| {
2308 Error::internal(format!(
2309 "Page file not found for partition ID: {}",
2310 partition_id
2311 ))
2312 })?)
2313 .clone();
2314
2315 let reader = store.open_index_file(&page_file_name).await?;
2316
2317 let reader_stream = IndexReaderStream::new(reader, batch_size).await;
2318
2319 let stream = reader_stream
2320 .map(|fut| fut.map_err(DataFusionError::from))
2321 .buffered(batch_readhead.unwrap_or(1))
2322 .boxed();
2323
2324 let sendable_stream =
2325 Box::pin(RecordBatchStreamAdapter::new(stream_schema.clone(), stream));
2326 inputs.push(Arc::new(OneShotExec::new(sendable_stream)));
2327 }
2328
2329 let union_inputs = UnionExec::try_new(inputs)?;
2331
2332 let value_column_index = stream_schema.index_of(VALUE_COLUMN_NAME)?;
2334 let sort_expr = PhysicalSortExpr {
2335 expr: Arc::new(Column::new(VALUE_COLUMN_NAME, value_column_index)),
2336 options: SortOptions {
2337 descending: false,
2338 nulls_first: true,
2339 },
2340 };
2341
2342 let merge_exec = Arc::new(SortPreservingMergeExec::new(
2343 [sort_expr].into(),
2344 union_inputs,
2345 ));
2346
2347 let unchunked = execute_plan(
2348 merge_exec,
2349 LanceExecutionOptions {
2350 use_spilling: false,
2351 ..Default::default()
2352 },
2353 )?;
2354
2355 let mut chunked_stream = chunk_concat_stream(unchunked, batch_size as usize);
2357
2358 while let Some(batch) = chunked_stream.try_next().await? {
2360 let writer_batch = RecordBatch::try_new(
2361 arrow_schema.clone(),
2362 vec![batch.column(0).clone(), batch.column(1).clone()],
2363 )?;
2364
2365 page_file.write_record_batch(writer_batch).await?;
2366
2367 let min_val = ScalarValue::try_from_array(batch.column(0), 0)?;
2368 let max_val = ScalarValue::try_from_array(batch.column(0), batch.num_rows() - 1)?;
2369 let null_count = batch.column(0).null_count() as u32;
2370
2371 lookup_entries.push((min_val, max_val, null_count, page_idx));
2372 page_idx += 1;
2373 progress
2374 .stage_progress("merge_pages", page_idx as u64)
2375 .await?;
2376 }
2377
2378 Ok(lookup_entries)
2379}
2380
2381fn sort_files_by_partition_id(part_files: &[String]) -> Result<Vec<(u64, String)>> {
2383 let mut files_with_ids: Vec<(u64, &String)> = part_files
2384 .iter()
2385 .map(|file| extract_partition_id(file).map(|id| (id, file)))
2386 .collect::<Result<Vec<_>>>()?;
2387
2388 files_with_ids.sort_unstable_by_key(|k| k.0);
2389
2390 let sorted_files = files_with_ids
2391 .into_iter()
2392 .map(|(id, file)| (id, file.clone()))
2393 .collect();
2394
2395 Ok(sorted_files)
2396}
2397
2398fn extract_partition_id(filename: &str) -> Result<u64> {
2401 if !filename.starts_with("part_") {
2402 return Err(Error::internal(format!(
2403 "Invalid partition file name format: {}",
2404 filename
2405 )));
2406 }
2407
2408 let parts: Vec<&str> = filename.split('_').collect();
2409 if parts.len() < 3 {
2410 return Err(Error::internal(format!(
2411 "Invalid partition file name format: {}",
2412 filename
2413 )));
2414 }
2415
2416 parts[1].parse::<u64>().map_err(|_| {
2417 Error::internal(format!(
2418 "Failed to parse partition ID from filename: {}",
2419 filename
2420 ))
2421 })
2422}
2423
2424async fn cleanup_partition_files(
2429 store: &dyn IndexStore,
2430 part_lookup_files: &[String],
2431 part_page_files: &[String],
2432) {
2433 for file_name in part_lookup_files {
2435 cleanup_single_file(
2436 store,
2437 file_name,
2438 "part_",
2439 "_page_lookup.lance",
2440 "partition lookup",
2441 )
2442 .await;
2443 }
2444
2445 for file_name in part_page_files {
2447 cleanup_single_file(
2448 store,
2449 file_name,
2450 "part_",
2451 "_page_data.lance",
2452 "partition page",
2453 )
2454 .await;
2455 }
2456}
2457
2458async fn cleanup_single_file(
2462 store: &dyn IndexStore,
2463 file_name: &str,
2464 expected_prefix: &str,
2465 expected_suffix: &str,
2466 file_type: &str,
2467) {
2468 if file_name.starts_with(expected_prefix) && file_name.ends_with(expected_suffix) {
2469 match store.delete_index_file(file_name).await {
2470 Ok(()) => {
2471 debug!("Successfully deleted {} file: {}", file_type, file_name);
2472 }
2473 Err(e) => {
2474 warn!(
2475 "Failed to delete {} file '{}': {}. \
2476 This does not affect the merge operation, but may leave \
2477 partition files that should be cleaned up manually.",
2478 file_type, file_name, e
2479 );
2480 }
2481 }
2482 } else {
2483 warn!(
2485 "Skipping deletion of file '{}' as it does not match the expected \
2486 {} file pattern ({}*{})",
2487 file_name, file_type, expected_prefix, expected_suffix
2488 );
2489 }
2490}
2491
2492pub(crate) fn part_page_data_file_path(partition_id: u64) -> String {
2493 format!("part_{}_{}", partition_id, BTREE_PAGES_NAME)
2494}
2495
2496pub(crate) fn part_lookup_file_path(partition_id: u64) -> String {
2497 format!("part_{}_{}", partition_id, BTREE_LOOKUP_NAME)
2498}
2499
2500struct IndexReaderStream {
2504 reader: Arc<dyn IndexReader>,
2505 batch_size: u64,
2506 num_batches: u32,
2507 batch_idx: u32,
2508}
2509
2510impl IndexReaderStream {
2511 async fn new(reader: Arc<dyn IndexReader>, batch_size: u64) -> Self {
2512 let num_batches = reader.num_batches(batch_size).await;
2513 Self {
2514 reader,
2515 batch_size,
2516 num_batches,
2517 batch_idx: 0,
2518 }
2519 }
2520}
2521
2522impl Stream for IndexReaderStream {
2523 type Item = BoxFuture<'static, Result<RecordBatch>>;
2524
2525 fn poll_next(
2526 self: std::pin::Pin<&mut Self>,
2527 _cx: &mut std::task::Context<'_>,
2528 ) -> std::task::Poll<Option<Self::Item>> {
2529 let this = self.get_mut();
2530 if this.batch_idx >= this.num_batches {
2531 return std::task::Poll::Ready(None);
2532 }
2533 let batch_num = this.batch_idx;
2534 this.batch_idx += 1;
2535 let reader_copy = this.reader.clone();
2536 let batch_size = this.batch_size;
2537 let read_task = async move {
2538 reader_copy
2539 .read_record_batch(batch_num as u64, batch_size)
2540 .await
2541 }
2542 .boxed();
2543 std::task::Poll::Ready(Some(read_task))
2544 }
2545}
2546
2547#[derive(Debug, Serialize, Deserialize)]
2549pub struct BTreeParameters {
2550 pub zone_size: Option<u64>,
2552
2553 pub range_id: Option<u32>,
2577}
2578
2579struct BTreeTrainingRequest {
2580 parameters: BTreeParameters,
2581 criteria: TrainingCriteria,
2582}
2583
2584impl BTreeTrainingRequest {
2585 pub fn new(parameters: BTreeParameters) -> Self {
2586 Self {
2587 parameters,
2588 criteria: TrainingCriteria::new(TrainingOrdering::Values).with_row_id(),
2590 }
2591 }
2592}
2593
2594impl TrainingRequest for BTreeTrainingRequest {
2595 fn as_any(&self) -> &dyn std::any::Any {
2596 self
2597 }
2598
2599 fn criteria(&self) -> &TrainingCriteria {
2600 &self.criteria
2601 }
2602}
2603
2604#[derive(Debug, Default)]
2605pub struct BTreeIndexPlugin;
2606
2607#[async_trait]
2608impl ScalarIndexPlugin for BTreeIndexPlugin {
2609 fn name(&self) -> &str {
2610 "BTree"
2611 }
2612
2613 fn new_training_request(
2614 &self,
2615 params: &str,
2616 field: &Field,
2617 ) -> Result<Box<dyn TrainingRequest>> {
2618 if field.data_type().is_nested() {
2619 return Err(Error::invalid_input_source(
2620 "A btree index can only be created on a non-nested field.".into(),
2621 ));
2622 }
2623
2624 let params = serde_json::from_str::<BTreeParameters>(params)?;
2625 Ok(Box::new(BTreeTrainingRequest::new(params)))
2626 }
2627
2628 fn provides_exact_answer(&self) -> bool {
2629 true
2630 }
2631
2632 fn version(&self) -> u32 {
2633 BTREE_INDEX_VERSION
2634 }
2635
2636 fn new_query_parser(
2637 &self,
2638 index_name: String,
2639 _index_details: &prost_types::Any,
2640 ) -> Option<Box<dyn ScalarQueryParser>> {
2641 Some(Box::new(SargableQueryParser::new(index_name, false)))
2642 }
2643
2644 async fn train_index(
2645 &self,
2646 data: SendableRecordBatchStream,
2647 index_store: &dyn IndexStore,
2648 request: Box<dyn TrainingRequest>,
2649 fragment_ids: Option<Vec<u32>>,
2650 _progress: Arc<dyn crate::progress::IndexBuildProgress>,
2651 ) -> Result<CreatedIndex> {
2652 let request = request
2653 .as_any()
2654 .downcast_ref::<BTreeTrainingRequest>()
2655 .unwrap();
2656 train_btree_index(
2657 data,
2658 index_store,
2659 request
2660 .parameters
2661 .zone_size
2662 .unwrap_or(DEFAULT_BTREE_BATCH_SIZE),
2663 fragment_ids,
2664 request.parameters.range_id,
2665 )
2666 .await?;
2667 Ok(CreatedIndex {
2668 index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
2669 .unwrap(),
2670 index_version: BTREE_INDEX_VERSION,
2671 files: Some(index_store.list_files_with_sizes().await?),
2672 })
2673 }
2674
2675 async fn load_index(
2676 &self,
2677 index_store: Arc<dyn IndexStore>,
2678 _index_details: &prost_types::Any,
2679 frag_reuse_index: Option<Arc<FragReuseIndex>>,
2680 cache: &LanceCache,
2681 ) -> Result<Arc<dyn ScalarIndex>> {
2682 Ok(BTreeIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
2683 }
2684}
2685
2686#[cfg(test)]
2687mod tests {
2688 use std::sync::atomic::Ordering;
2689 use std::{collections::HashMap, sync::Arc};
2690
2691 use arrow::datatypes::{Float32Type, Float64Type, Int32Type, UInt64Type};
2692 use arrow_array::{FixedSizeListArray, record_batch};
2693 use datafusion::{
2694 execution::{SendableRecordBatchStream, TaskContext},
2695 physical_plan::{ExecutionPlan, sorts::sort::SortExec, stream::RecordBatchStreamAdapter},
2696 };
2697 use datafusion_common::{DataFusionError, ScalarValue};
2698 use datafusion_physical_expr::{PhysicalSortExpr, expressions::col};
2699 use deepsize::DeepSizeOf;
2700 use futures::TryStreamExt;
2701 use futures::stream;
2702 use lance_core::utils::mask::RowSetOps;
2703 use lance_core::utils::tempfile::TempObjDir;
2704 use lance_core::{cache::LanceCache, utils::mask::RowAddrTreeMap};
2705 use lance_datafusion::{chunker::break_stream, datagen::DatafusionDatagenExt};
2706 use lance_datagen::{ArrayGeneratorExt, BatchCount, RowCount, array, gen_batch};
2707 use lance_io::object_store::ObjectStore;
2708 use object_store::path::Path;
2709
2710 use crate::metrics::LocalMetricsCollector;
2711 use crate::progress::{IndexBuildProgress, noop_progress};
2712 use crate::{
2713 metrics::NoOpMetricsCollector,
2714 scalar::{
2715 IndexStore, OldIndexDataFilter, SargableQuery, ScalarIndex, SearchResult,
2716 btree::{BTREE_PAGES_NAME, BTreeIndex},
2717 lance_format::LanceIndexStore,
2718 },
2719 };
2720
2721 use super::{
2722 DEFAULT_BTREE_BATCH_SIZE, OrderableScalarValue, part_lookup_file_path,
2723 part_page_data_file_path, train_btree_index,
2724 };
2725
2726 lance_testing::define_stage_event_progress!(
2727 RecordingProgress,
2728 IndexBuildProgress,
2729 lance_core::Result<()>
2730 );
2731 #[test]
2732 fn test_scalar_value_size() {
2733 let size_of_i32 = OrderableScalarValue(ScalarValue::Int32(Some(0))).deep_size_of();
2734 let size_of_many_i32 = OrderableScalarValue(ScalarValue::FixedSizeList(Arc::new(
2735 FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
2736 vec![Some(vec![Some(0); 128])],
2737 128,
2738 ),
2739 )))
2740 .deep_size_of();
2741
2742 assert!(size_of_i32 > 4);
2744 assert!(size_of_many_i32 > 128 * 4);
2745 }
2746
2747 #[tokio::test]
2748 async fn test_null_ids() {
2749 let tmpdir = TempObjDir::default();
2750 let test_store = Arc::new(LanceIndexStore::new(
2751 Arc::new(ObjectStore::local()),
2752 tmpdir.clone(),
2753 Arc::new(LanceCache::no_cache()),
2754 ));
2755
2756 let stream = gen_batch()
2758 .col(
2759 "value",
2760 array::rand::<Float32Type>().with_nulls(&[true, false, false, false, false]),
2761 )
2762 .col("_rowid", array::step::<UInt64Type>())
2763 .into_df_stream(RowCount::from(5000), BatchCount::from(10));
2764
2765 train_btree_index(stream, test_store.as_ref(), 5000, None, None)
2766 .await
2767 .unwrap();
2768
2769 let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2770 .await
2771 .unwrap();
2772
2773 assert_eq!(index.page_lookup.null_pages.len(), 10);
2774
2775 let remap_dir = TempObjDir::default();
2776 let remap_store = Arc::new(LanceIndexStore::new(
2777 Arc::new(ObjectStore::local()),
2778 remap_dir.clone(),
2779 Arc::new(LanceCache::no_cache()),
2780 ));
2781
2782 index
2784 .remap(&HashMap::default(), remap_store.as_ref())
2785 .await
2786 .unwrap();
2787
2788 let remap_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache())
2789 .await
2790 .unwrap();
2791
2792 assert_eq!(remap_index.page_lookup, index.page_lookup);
2793
2794 let original_pages = test_store.open_index_file(BTREE_PAGES_NAME).await.unwrap();
2795 let remapped_pages = remap_store.open_index_file(BTREE_PAGES_NAME).await.unwrap();
2796
2797 assert_eq!(original_pages.num_rows(), remapped_pages.num_rows());
2798
2799 let original_data = original_pages
2800 .read_record_batch(0, original_pages.num_rows() as u64)
2801 .await
2802 .unwrap();
2803 let remapped_data = remapped_pages
2804 .read_record_batch(0, remapped_pages.num_rows() as u64)
2805 .await
2806 .unwrap();
2807
2808 assert_eq!(original_data, remapped_data);
2809 }
2810
2811 #[tokio::test]
2812 async fn test_nan_ordering() {
2813 let tmpdir = TempObjDir::default();
2814 let test_store = Arc::new(LanceIndexStore::new(
2815 Arc::new(ObjectStore::local()),
2816 tmpdir.clone(),
2817 Arc::new(LanceCache::no_cache()),
2818 ));
2819
2820 let values = vec![
2821 0.0,
2822 1.0,
2823 2.0,
2824 3.0,
2825 f64::NAN,
2826 f64::NEG_INFINITY,
2827 f64::INFINITY,
2828 ];
2829
2830 let data = gen_batch()
2834 .col("value", array::cycle::<Float64Type>(values.clone()))
2835 .col("_rowid", array::step::<UInt64Type>())
2836 .into_df_exec(RowCount::from(10), BatchCount::from(100));
2837 let schema = data.schema();
2838 let sort_expr = PhysicalSortExpr::new_default(col("value", schema.as_ref()).unwrap());
2839 let plan = Arc::new(SortExec::new([sort_expr].into(), data));
2840 let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap();
2841 let stream = break_stream(stream, 64);
2842 let stream = stream.map_err(DataFusionError::from);
2843 let stream =
2844 Box::pin(RecordBatchStreamAdapter::new(schema, stream)) as SendableRecordBatchStream;
2845
2846 train_btree_index(stream, test_store.as_ref(), 64, None, None)
2847 .await
2848 .unwrap();
2849
2850 let index = BTreeIndex::load(test_store, None, &LanceCache::no_cache())
2851 .await
2852 .unwrap();
2853
2854 for (idx, value) in values.into_iter().enumerate() {
2855 let query = SargableQuery::Equals(ScalarValue::Float64(Some(value)));
2856 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2857 assert_eq!(
2858 result,
2859 SearchResult::exact(RowAddrTreeMap::from_iter(((idx as u64)..1000).step_by(7)))
2860 );
2861 }
2862 }
2863
2864 #[tokio::test]
2865 async fn test_page_cache() {
2866 let tmpdir = TempObjDir::default();
2867 let test_store = Arc::new(LanceIndexStore::new(
2868 Arc::new(ObjectStore::local()),
2869 tmpdir.clone(),
2870 Arc::new(LanceCache::no_cache()),
2871 ));
2872
2873 let data = gen_batch()
2874 .col("value", array::step::<Float32Type>())
2875 .col("_rowid", array::step::<UInt64Type>())
2876 .into_df_exec(RowCount::from(1000), BatchCount::from(10));
2877 let schema = data.schema();
2878 let sort_expr = PhysicalSortExpr::new_default(col("value", schema.as_ref()).unwrap());
2879 let plan = Arc::new(SortExec::new([sort_expr].into(), data));
2880 let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap();
2881 let stream = break_stream(stream, 64);
2882 let stream = stream.map_err(DataFusionError::from);
2883 let stream =
2884 Box::pin(RecordBatchStreamAdapter::new(schema, stream)) as SendableRecordBatchStream;
2885
2886 train_btree_index(stream, test_store.as_ref(), 64, None, None)
2887 .await
2888 .unwrap();
2889
2890 let cache = Arc::new(LanceCache::with_capacity(100 * 1024 * 1024));
2891 let index = BTreeIndex::load(test_store, None, cache.as_ref())
2892 .await
2893 .unwrap();
2894
2895 let query = SargableQuery::Equals(ScalarValue::Float32(Some(0.0)));
2896 let metrics = LocalMetricsCollector::default();
2897 let query1 = index.search(&query, &metrics);
2898 let query2 = index.search(&query, &metrics);
2899 tokio::join!(query1, query2).0.unwrap();
2900 assert_eq!(metrics.parts_loaded.load(Ordering::Relaxed), 1);
2901 }
2902
2903 #[tokio::test]
2904 async fn test_like_prefix_search() {
2905 use arrow::datatypes::DataType;
2906 use arrow_array::StringArray;
2907
2908 let tmpdir = TempObjDir::default();
2909 let test_store = Arc::new(LanceIndexStore::new(
2910 Arc::new(ObjectStore::local()),
2911 tmpdir.clone(),
2912 Arc::new(LanceCache::no_cache()),
2913 ));
2914
2915 let values = vec![
2917 "apple",
2918 "app",
2919 "application",
2920 "banana",
2921 "band",
2922 "test_ns$table1",
2923 "test_ns$table2",
2924 "test_ns2$table1",
2925 "test",
2926 "testing",
2927 ];
2928 let row_ids: Vec<u64> = (0..values.len() as u64).collect();
2929
2930 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
2931 arrow::datatypes::Field::new("value", DataType::Utf8, false),
2932 arrow::datatypes::Field::new("_rowid", DataType::UInt64, false),
2933 ]));
2934
2935 let batch = arrow::record_batch::RecordBatch::try_new(
2936 schema.clone(),
2937 vec![
2938 Arc::new(StringArray::from(values.clone())),
2939 Arc::new(arrow_array::UInt64Array::from(row_ids)),
2940 ],
2941 )
2942 .unwrap();
2943
2944 let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
2945 schema,
2946 stream::once(async { Ok(batch) }),
2947 ));
2948
2949 train_btree_index(stream, test_store.as_ref(), 100, None, None)
2950 .await
2951 .unwrap();
2952
2953 let index = BTreeIndex::load(test_store, None, &LanceCache::no_cache())
2954 .await
2955 .unwrap();
2956
2957 let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("app".to_string())));
2959 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2960
2961 match &result {
2962 SearchResult::Exact(row_ids) => {
2963 let ids: Vec<u64> = row_ids
2964 .true_rows()
2965 .row_addrs()
2966 .unwrap()
2967 .map(u64::from)
2968 .collect();
2969 assert!(ids.contains(&0), "Should contain row 0 (apple)");
2970 assert!(ids.contains(&1), "Should contain row 1 (app)");
2971 assert!(ids.contains(&2), "Should contain row 2 (application)");
2972 assert!(!ids.contains(&3), "Should not contain row 3 (banana)");
2973 }
2974 _ => panic!("Expected Exact result"),
2975 }
2976
2977 let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("test_ns$".to_string())));
2979 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2980
2981 match &result {
2982 SearchResult::Exact(row_ids) => {
2983 let ids: Vec<u64> = row_ids
2984 .true_rows()
2985 .row_addrs()
2986 .unwrap()
2987 .map(u64::from)
2988 .collect();
2989 assert!(ids.contains(&5), "Should contain row 5 (test_ns$table1)");
2990 assert!(ids.contains(&6), "Should contain row 6 (test_ns$table2)");
2991 assert!(
2992 !ids.contains(&7),
2993 "Should not contain row 7 (test_ns2$table1)"
2994 );
2995 }
2996 _ => panic!("Expected Exact result"),
2997 }
2998
2999 let query = SargableQuery::LikePrefix(ScalarValue::Utf8(Some("test".to_string())));
3001 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3002
3003 match &result {
3004 SearchResult::Exact(row_ids) => {
3005 let ids: Vec<u64> = row_ids
3006 .true_rows()
3007 .row_addrs()
3008 .unwrap()
3009 .map(u64::from)
3010 .collect();
3011 assert!(
3012 ids.contains(&5),
3013 "Should contain row 5 (test_ns$table1): {:?}",
3014 ids
3015 );
3016 assert!(
3017 ids.contains(&6),
3018 "Should contain row 6 (test_ns$table2): {:?}",
3019 ids
3020 );
3021 assert!(
3022 ids.contains(&7),
3023 "Should contain row 7 (test_ns2$table1): {:?}",
3024 ids
3025 );
3026 assert!(ids.contains(&8), "Should contain row 8 (test): {:?}", ids);
3027 assert!(
3028 ids.contains(&9),
3029 "Should contain row 9 (testing): {:?}",
3030 ids
3031 );
3032 }
3033 _ => panic!("Expected Exact result"),
3034 }
3035 }
3036
3037 #[tokio::test]
3038 async fn test_like_prefix_search_large_utf8() {
3039 use arrow::datatypes::DataType;
3040 use arrow_array::LargeStringArray;
3041
3042 let tmpdir = TempObjDir::default();
3043 let test_store = Arc::new(LanceIndexStore::new(
3044 Arc::new(ObjectStore::local()),
3045 tmpdir.clone(),
3046 Arc::new(LanceCache::no_cache()),
3047 ));
3048
3049 let values = vec!["apple", "app", "application", "banana"];
3050 let row_ids: Vec<u64> = (0..values.len() as u64).collect();
3051
3052 let schema = Arc::new(arrow::datatypes::Schema::new(vec![
3053 arrow::datatypes::Field::new("value", DataType::LargeUtf8, false),
3054 arrow::datatypes::Field::new("_rowid", DataType::UInt64, false),
3055 ]));
3056
3057 let batch = arrow::record_batch::RecordBatch::try_new(
3058 schema.clone(),
3059 vec![
3060 Arc::new(LargeStringArray::from(values)),
3061 Arc::new(arrow_array::UInt64Array::from(row_ids)),
3062 ],
3063 )
3064 .unwrap();
3065
3066 let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
3067 schema,
3068 stream::once(async { Ok(batch) }),
3069 ));
3070
3071 train_btree_index(stream, test_store.as_ref(), 100, None, None)
3072 .await
3073 .unwrap();
3074
3075 let index = BTreeIndex::load(test_store, None, &LanceCache::no_cache())
3076 .await
3077 .unwrap();
3078
3079 let query = SargableQuery::LikePrefix(ScalarValue::LargeUtf8(Some("app".to_string())));
3081 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3082
3083 match &result {
3084 SearchResult::Exact(row_ids) => {
3085 let ids: Vec<u64> = row_ids
3086 .true_rows()
3087 .row_addrs()
3088 .unwrap()
3089 .map(u64::from)
3090 .collect();
3091 assert!(ids.contains(&0), "Should contain row 0 (apple)");
3092 assert!(ids.contains(&1), "Should contain row 1 (app)");
3093 assert!(ids.contains(&2), "Should contain row 2 (application)");
3094 assert!(!ids.contains(&3), "Should not contain row 3 (banana)");
3095 }
3096 _ => panic!("Expected Exact result"),
3097 }
3098 }
3099
3100 #[tokio::test]
3101 async fn test_fragment_btree_index_consistency() {
3102 let full_tmpdir = TempObjDir::default();
3104 let full_store = Arc::new(LanceIndexStore::new(
3105 Arc::new(ObjectStore::local()),
3106 full_tmpdir.clone(),
3107 Arc::new(LanceCache::no_cache()),
3108 ));
3109
3110 let fragment_tmpdir = TempObjDir::default();
3111 let fragment_store = Arc::new(LanceIndexStore::new(
3112 Arc::new(ObjectStore::local()),
3113 fragment_tmpdir.clone(),
3114 Arc::new(LanceCache::no_cache()),
3115 ));
3116
3117 let total_count = 2 * DEFAULT_BTREE_BATCH_SIZE;
3120 let full_data_gen = gen_batch()
3121 .col("value", array::step::<Int32Type>())
3122 .col("_rowid", array::step::<UInt64Type>())
3123 .into_df_stream(RowCount::from(total_count / 2), BatchCount::from(2));
3124 let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
3125 full_data_gen.schema(),
3126 full_data_gen,
3127 ));
3128
3129 train_btree_index(
3130 full_data_source,
3131 full_store.as_ref(),
3132 DEFAULT_BTREE_BATCH_SIZE,
3133 None,
3134 None,
3135 )
3136 .await
3137 .unwrap();
3138
3139 let half_count = DEFAULT_BTREE_BATCH_SIZE;
3142 let fragment1_gen = gen_batch()
3143 .col("value", array::step::<Int32Type>())
3144 .col("_rowid", array::step::<UInt64Type>())
3145 .into_df_stream(RowCount::from(half_count), BatchCount::from(1));
3146 let fragment1_data_source = Box::pin(RecordBatchStreamAdapter::new(
3147 fragment1_gen.schema(),
3148 fragment1_gen,
3149 ));
3150
3151 train_btree_index(
3152 fragment1_data_source,
3153 fragment_store.as_ref(),
3154 DEFAULT_BTREE_BATCH_SIZE,
3155 Some(vec![1]), None,
3157 )
3158 .await
3159 .unwrap();
3160
3161 let start_val = DEFAULT_BTREE_BATCH_SIZE as i32;
3163 let end_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3164 let values_second_half: Vec<i32> = (start_val..end_val).collect();
3165 let row_ids_second_half: Vec<u64> = (start_val as u64..end_val as u64).collect();
3166 let fragment2_gen = gen_batch()
3167 .col("value", array::cycle::<Int32Type>(values_second_half))
3168 .col("_rowid", array::cycle::<UInt64Type>(row_ids_second_half))
3169 .into_df_stream(RowCount::from(half_count), BatchCount::from(1));
3170 let fragment2_data_source = Box::pin(RecordBatchStreamAdapter::new(
3171 fragment2_gen.schema(),
3172 fragment2_gen,
3173 ));
3174
3175 train_btree_index(
3176 fragment2_data_source,
3177 fragment_store.as_ref(),
3178 DEFAULT_BTREE_BATCH_SIZE,
3179 Some(vec![2]), None,
3181 )
3182 .await
3183 .unwrap();
3184
3185 let part_page_files = vec![
3187 part_page_data_file_path(1 << 32),
3188 part_page_data_file_path(2 << 32),
3189 ];
3190
3191 let part_lookup_files = vec![
3192 part_lookup_file_path(1 << 32),
3193 part_lookup_file_path(2 << 32),
3194 ];
3195
3196 let progress = Arc::new(RecordingProgress::default());
3197 super::merge_metadata_files(
3198 fragment_store.as_ref(),
3199 &part_page_files,
3200 &part_lookup_files,
3201 Option::from(1usize),
3202 progress.clone(),
3203 )
3204 .await
3205 .unwrap();
3206
3207 let tags = progress
3208 .recorded_events()
3209 .iter()
3210 .map(|(kind, stage, _)| format!("{kind}:{stage}"))
3211 .collect::<Vec<_>>();
3212 let merge_start = tags
3213 .iter()
3214 .position(|e| e == "start:merge_pages")
3215 .expect("missing merge_pages start");
3216 let merge_complete = tags
3217 .iter()
3218 .position(|e| e == "complete:merge_pages")
3219 .expect("missing merge_pages complete");
3220 let lookup_start = tags
3221 .iter()
3222 .position(|e| e == "start:write_lookup_file")
3223 .expect("missing write_lookup_file start");
3224 let lookup_complete = tags
3225 .iter()
3226 .position(|e| e == "complete:write_lookup_file")
3227 .expect("missing write_lookup_file complete");
3228 assert!(merge_start < merge_complete);
3229 assert!(merge_complete < lookup_start);
3230 assert!(lookup_start < lookup_complete);
3231 assert!(
3232 tags.iter().any(|e| e == "progress:merge_pages"),
3233 "expected merge_pages progress callbacks"
3234 );
3235 assert!(
3236 tags.iter().any(|e| e == "progress:write_lookup_file"),
3237 "expected write_lookup_file progress callbacks"
3238 );
3239
3240 let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
3242 .await
3243 .unwrap();
3244
3245 let merged_index = BTreeIndex::load(fragment_store.clone(), None, &LanceCache::no_cache())
3246 .await
3247 .unwrap();
3248
3249 let query_0 = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
3253 let full_result_0 = full_index
3254 .search(&query_0, &NoOpMetricsCollector)
3255 .await
3256 .unwrap();
3257 let merged_result_0 = merged_index
3258 .search(&query_0, &NoOpMetricsCollector)
3259 .await
3260 .unwrap();
3261 assert_eq!(full_result_0, merged_result_0, "Query for value 0 failed");
3262
3263 let mid_first_batch = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
3265 let query_mid_first = SargableQuery::Equals(ScalarValue::Int32(Some(mid_first_batch)));
3266 let full_result_mid_first = full_index
3267 .search(&query_mid_first, &NoOpMetricsCollector)
3268 .await
3269 .unwrap();
3270 let merged_result_mid_first = merged_index
3271 .search(&query_mid_first, &NoOpMetricsCollector)
3272 .await
3273 .unwrap();
3274 assert_eq!(
3275 full_result_mid_first, merged_result_mid_first,
3276 "Query for value {} failed",
3277 mid_first_batch
3278 );
3279
3280 let first_second_batch = DEFAULT_BTREE_BATCH_SIZE as i32;
3282 let query_first_second =
3283 SargableQuery::Equals(ScalarValue::Int32(Some(first_second_batch)));
3284 let full_result_first_second = full_index
3285 .search(&query_first_second, &NoOpMetricsCollector)
3286 .await
3287 .unwrap();
3288 let merged_result_first_second = merged_index
3289 .search(&query_first_second, &NoOpMetricsCollector)
3290 .await
3291 .unwrap();
3292 assert_eq!(
3293 full_result_first_second, merged_result_first_second,
3294 "Query for value {} failed",
3295 first_second_batch
3296 );
3297
3298 let mid_second_batch = (DEFAULT_BTREE_BATCH_SIZE + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
3300 let query_mid_second = SargableQuery::Equals(ScalarValue::Int32(Some(mid_second_batch)));
3301
3302 let full_result_mid_second = full_index
3303 .search(&query_mid_second, &NoOpMetricsCollector)
3304 .await
3305 .unwrap();
3306 let merged_result_mid_second = merged_index
3307 .search(&query_mid_second, &NoOpMetricsCollector)
3308 .await
3309 .unwrap();
3310 assert_eq!(
3311 full_result_mid_second, merged_result_mid_second,
3312 "Query for value {} failed",
3313 mid_second_batch
3314 );
3315 }
3316
3317 #[tokio::test]
3318 async fn test_fragment_btree_index_boundary_queries() {
3319 let full_tmpdir = TempObjDir::default();
3321 let full_store = Arc::new(LanceIndexStore::new(
3322 Arc::new(ObjectStore::local()),
3323 full_tmpdir.clone(),
3324 Arc::new(LanceCache::no_cache()),
3325 ));
3326
3327 let fragment_tmpdir = TempObjDir::default();
3328 let fragment_store = Arc::new(LanceIndexStore::new(
3329 Arc::new(ObjectStore::local()),
3330 fragment_tmpdir.clone(),
3331 Arc::new(LanceCache::no_cache()),
3332 ));
3333
3334 let total_count = 3 * DEFAULT_BTREE_BATCH_SIZE;
3336
3337 let full_data_gen = gen_batch()
3339 .col("value", array::step::<Int32Type>())
3340 .col("_rowid", array::step::<UInt64Type>())
3341 .into_df_stream(RowCount::from(total_count / 3), BatchCount::from(3));
3342 let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
3343 full_data_gen.schema(),
3344 full_data_gen,
3345 ));
3346
3347 train_btree_index(
3348 full_data_source,
3349 full_store.as_ref(),
3350 DEFAULT_BTREE_BATCH_SIZE,
3351 None,
3352 None,
3353 )
3354 .await
3355 .unwrap();
3356
3357 let fragment_size = DEFAULT_BTREE_BATCH_SIZE;
3360 let fragment1_gen = gen_batch()
3361 .col("value", array::step::<Int32Type>())
3362 .col("_rowid", array::step::<UInt64Type>())
3363 .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
3364 let fragment1_data_source = Box::pin(RecordBatchStreamAdapter::new(
3365 fragment1_gen.schema(),
3366 fragment1_gen,
3367 ));
3368
3369 train_btree_index(
3370 fragment1_data_source,
3371 fragment_store.as_ref(),
3372 DEFAULT_BTREE_BATCH_SIZE,
3373 Some(vec![1]),
3374 None,
3375 )
3376 .await
3377 .unwrap();
3378
3379 let start_val2 = DEFAULT_BTREE_BATCH_SIZE as i32;
3381 let end_val2 = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3382 let values_fragment2: Vec<i32> = (start_val2..end_val2).collect();
3383 let row_ids_fragment2: Vec<u64> = (start_val2 as u64..end_val2 as u64).collect();
3384 let fragment2_gen = gen_batch()
3385 .col("value", array::cycle::<Int32Type>(values_fragment2))
3386 .col("_rowid", array::cycle::<UInt64Type>(row_ids_fragment2))
3387 .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
3388 let fragment2_data_source = Box::pin(RecordBatchStreamAdapter::new(
3389 fragment2_gen.schema(),
3390 fragment2_gen,
3391 ));
3392
3393 train_btree_index(
3394 fragment2_data_source,
3395 fragment_store.as_ref(),
3396 DEFAULT_BTREE_BATCH_SIZE,
3397 Some(vec![2]),
3398 None,
3399 )
3400 .await
3401 .unwrap();
3402
3403 let start_val3 = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3405 let end_val3 = (3 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3406 let values_fragment3: Vec<i32> = (start_val3..end_val3).collect();
3407 let row_ids_fragment3: Vec<u64> = (start_val3 as u64..end_val3 as u64).collect();
3408 let fragment3_gen = gen_batch()
3409 .col("value", array::cycle::<Int32Type>(values_fragment3))
3410 .col("_rowid", array::cycle::<UInt64Type>(row_ids_fragment3))
3411 .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
3412 let fragment3_data_source = Box::pin(RecordBatchStreamAdapter::new(
3413 fragment3_gen.schema(),
3414 fragment3_gen,
3415 ));
3416
3417 train_btree_index(
3418 fragment3_data_source,
3419 fragment_store.as_ref(),
3420 DEFAULT_BTREE_BATCH_SIZE,
3421 Some(vec![3]),
3422 None,
3423 )
3424 .await
3425 .unwrap();
3426
3427 let part_page_files = vec![
3429 part_page_data_file_path(1 << 32),
3430 part_page_data_file_path(2 << 32),
3431 part_page_data_file_path(3 << 32),
3432 ];
3433
3434 let part_lookup_files = vec![
3435 part_lookup_file_path(1 << 32),
3436 part_lookup_file_path(2 << 32),
3437 part_lookup_file_path(3 << 32),
3438 ];
3439
3440 super::merge_metadata_files(
3441 fragment_store.as_ref(),
3442 &part_page_files,
3443 &part_lookup_files,
3444 Option::from(1usize),
3445 noop_progress(),
3446 )
3447 .await
3448 .unwrap();
3449
3450 let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
3452 .await
3453 .unwrap();
3454
3455 let merged_index = BTreeIndex::load(fragment_store.clone(), None, &LanceCache::no_cache())
3456 .await
3457 .unwrap();
3458
3459 let query_min = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
3463 let full_result_min = full_index
3464 .search(&query_min, &NoOpMetricsCollector)
3465 .await
3466 .unwrap();
3467 let merged_result_min = merged_index
3468 .search(&query_min, &NoOpMetricsCollector)
3469 .await
3470 .unwrap();
3471 assert_eq!(
3472 full_result_min, merged_result_min,
3473 "Query for minimum value 0 failed"
3474 );
3475
3476 let max_val = (3 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
3478 let query_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val)));
3479 let full_result_max = full_index
3480 .search(&query_max, &NoOpMetricsCollector)
3481 .await
3482 .unwrap();
3483 let merged_result_max = merged_index
3484 .search(&query_max, &NoOpMetricsCollector)
3485 .await
3486 .unwrap();
3487 assert_eq!(
3488 full_result_max, merged_result_max,
3489 "Query for maximum value {} failed",
3490 max_val
3491 );
3492
3493 let fragment1_last = (DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
3495 let query_frag1_last = SargableQuery::Equals(ScalarValue::Int32(Some(fragment1_last)));
3496 let full_result_frag1_last = full_index
3497 .search(&query_frag1_last, &NoOpMetricsCollector)
3498 .await
3499 .unwrap();
3500 let merged_result_frag1_last = merged_index
3501 .search(&query_frag1_last, &NoOpMetricsCollector)
3502 .await
3503 .unwrap();
3504 assert_eq!(
3505 full_result_frag1_last, merged_result_frag1_last,
3506 "Query for fragment 1 last value {} failed",
3507 fragment1_last
3508 );
3509
3510 let fragment2_first = DEFAULT_BTREE_BATCH_SIZE as i32;
3512 let query_frag2_first = SargableQuery::Equals(ScalarValue::Int32(Some(fragment2_first)));
3513 let full_result_frag2_first = full_index
3514 .search(&query_frag2_first, &NoOpMetricsCollector)
3515 .await
3516 .unwrap();
3517 let merged_result_frag2_first = merged_index
3518 .search(&query_frag2_first, &NoOpMetricsCollector)
3519 .await
3520 .unwrap();
3521 assert_eq!(
3522 full_result_frag2_first, merged_result_frag2_first,
3523 "Query for fragment 2 first value {} failed",
3524 fragment2_first
3525 );
3526
3527 let fragment2_last = (2 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
3529 let query_frag2_last = SargableQuery::Equals(ScalarValue::Int32(Some(fragment2_last)));
3530 let full_result_frag2_last = full_index
3531 .search(&query_frag2_last, &NoOpMetricsCollector)
3532 .await
3533 .unwrap();
3534 let merged_result_frag2_last = merged_index
3535 .search(&query_frag2_last, &NoOpMetricsCollector)
3536 .await
3537 .unwrap();
3538 assert_eq!(
3539 full_result_frag2_last, merged_result_frag2_last,
3540 "Query for fragment 2 last value {} failed",
3541 fragment2_last
3542 );
3543
3544 let fragment3_first = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3546 let query_frag3_first = SargableQuery::Equals(ScalarValue::Int32(Some(fragment3_first)));
3547 let full_result_frag3_first = full_index
3548 .search(&query_frag3_first, &NoOpMetricsCollector)
3549 .await
3550 .unwrap();
3551 let merged_result_frag3_first = merged_index
3552 .search(&query_frag3_first, &NoOpMetricsCollector)
3553 .await
3554 .unwrap();
3555 assert_eq!(
3556 full_result_frag3_first, merged_result_frag3_first,
3557 "Query for fragment 3 first value {} failed",
3558 fragment3_first
3559 );
3560
3561 let query_below_min = SargableQuery::Equals(ScalarValue::Int32(Some(-1)));
3565 let full_result_below = full_index
3566 .search(&query_below_min, &NoOpMetricsCollector)
3567 .await
3568 .unwrap();
3569 let merged_result_below = merged_index
3570 .search(&query_below_min, &NoOpMetricsCollector)
3571 .await
3572 .unwrap();
3573 assert_eq!(
3574 full_result_below, merged_result_below,
3575 "Query for value below minimum (-1) failed"
3576 );
3577
3578 let query_above_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val + 1)));
3580 let full_result_above = full_index
3581 .search(&query_above_max, &NoOpMetricsCollector)
3582 .await
3583 .unwrap();
3584 let merged_result_above = merged_index
3585 .search(&query_above_max, &NoOpMetricsCollector)
3586 .await
3587 .unwrap();
3588 assert_eq!(
3589 full_result_above,
3590 merged_result_above,
3591 "Query for value above maximum ({}) failed",
3592 max_val + 1
3593 );
3594
3595 let range_start = (DEFAULT_BTREE_BATCH_SIZE - 100) as i32;
3599 let range_end = (DEFAULT_BTREE_BATCH_SIZE + 100) as i32;
3600 let query_cross_frag = SargableQuery::Range(
3601 std::collections::Bound::Included(ScalarValue::Int32(Some(range_start))),
3602 std::collections::Bound::Excluded(ScalarValue::Int32(Some(range_end))),
3603 );
3604 let full_result_cross = full_index
3605 .search(&query_cross_frag, &NoOpMetricsCollector)
3606 .await
3607 .unwrap();
3608 let merged_result_cross = merged_index
3609 .search(&query_cross_frag, &NoOpMetricsCollector)
3610 .await
3611 .unwrap();
3612 assert_eq!(
3613 full_result_cross, merged_result_cross,
3614 "Cross-fragment range query [{}, {}] failed",
3615 range_start, range_end
3616 );
3617
3618 let single_frag_start = 100i32;
3620 let single_frag_end = 200i32;
3621 let query_single_frag = SargableQuery::Range(
3622 std::collections::Bound::Included(ScalarValue::Int32(Some(single_frag_start))),
3623 std::collections::Bound::Excluded(ScalarValue::Int32(Some(single_frag_end))),
3624 );
3625 let full_result_single = full_index
3626 .search(&query_single_frag, &NoOpMetricsCollector)
3627 .await
3628 .unwrap();
3629 let merged_result_single = merged_index
3630 .search(&query_single_frag, &NoOpMetricsCollector)
3631 .await
3632 .unwrap();
3633 assert_eq!(
3634 full_result_single, merged_result_single,
3635 "Single fragment range query [{}, {}] failed",
3636 single_frag_start, single_frag_end
3637 );
3638
3639 let large_range_start = 100i32;
3641 let large_range_end = (3 * DEFAULT_BTREE_BATCH_SIZE - 100) as i32;
3642 let query_large_range = SargableQuery::Range(
3643 std::collections::Bound::Included(ScalarValue::Int32(Some(large_range_start))),
3644 std::collections::Bound::Excluded(ScalarValue::Int32(Some(large_range_end))),
3645 );
3646 let full_result_large = full_index
3647 .search(&query_large_range, &NoOpMetricsCollector)
3648 .await
3649 .unwrap();
3650 let merged_result_large = merged_index
3651 .search(&query_large_range, &NoOpMetricsCollector)
3652 .await
3653 .unwrap();
3654 assert_eq!(
3655 full_result_large, merged_result_large,
3656 "Large range query [{}, {}] failed",
3657 large_range_start, large_range_end
3658 );
3659
3660 let lt_val = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
3664 let query_lt = SargableQuery::Range(
3665 std::collections::Bound::Included(ScalarValue::Int32(Some(0))),
3666 std::collections::Bound::Excluded(ScalarValue::Int32(Some(lt_val))),
3667 );
3668 let full_result_lt = full_index
3669 .search(&query_lt, &NoOpMetricsCollector)
3670 .await
3671 .unwrap();
3672 let merged_result_lt = merged_index
3673 .search(&query_lt, &NoOpMetricsCollector)
3674 .await
3675 .unwrap();
3676 assert_eq!(
3677 full_result_lt, merged_result_lt,
3678 "Less than query (<{}) failed",
3679 lt_val
3680 );
3681
3682 let gt_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3684 let max_range_val = (3 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3685 let query_gt = SargableQuery::Range(
3686 std::collections::Bound::Excluded(ScalarValue::Int32(Some(gt_val))),
3687 std::collections::Bound::Excluded(ScalarValue::Int32(Some(max_range_val))),
3688 );
3689 let full_result_gt = full_index
3690 .search(&query_gt, &NoOpMetricsCollector)
3691 .await
3692 .unwrap();
3693 let merged_result_gt = merged_index
3694 .search(&query_gt, &NoOpMetricsCollector)
3695 .await
3696 .unwrap();
3697 assert_eq!(
3698 full_result_gt, merged_result_gt,
3699 "Greater than query (>{}) failed",
3700 gt_val
3701 );
3702
3703 let lte_val = (DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
3705 let query_lte = SargableQuery::Range(
3706 std::collections::Bound::Included(ScalarValue::Int32(Some(0))),
3707 std::collections::Bound::Included(ScalarValue::Int32(Some(lte_val))),
3708 );
3709 let full_result_lte = full_index
3710 .search(&query_lte, &NoOpMetricsCollector)
3711 .await
3712 .unwrap();
3713 let merged_result_lte = merged_index
3714 .search(&query_lte, &NoOpMetricsCollector)
3715 .await
3716 .unwrap();
3717 assert_eq!(
3718 full_result_lte, merged_result_lte,
3719 "Less than or equal query (<={}) failed",
3720 lte_val
3721 );
3722
3723 let gte_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3725 let query_gte = SargableQuery::Range(
3726 std::collections::Bound::Included(ScalarValue::Int32(Some(gte_val))),
3727 std::collections::Bound::Excluded(ScalarValue::Int32(Some(max_range_val))),
3728 );
3729 let full_result_gte = full_index
3730 .search(&query_gte, &NoOpMetricsCollector)
3731 .await
3732 .unwrap();
3733 let merged_result_gte = merged_index
3734 .search(&query_gte, &NoOpMetricsCollector)
3735 .await
3736 .unwrap();
3737 assert_eq!(
3738 full_result_gte, merged_result_gte,
3739 "Greater than or equal query (>={}) failed",
3740 gte_val
3741 );
3742 }
3743
3744 #[test]
3745 fn test_extract_partition_id() {
3746 assert_eq!(
3748 super::extract_partition_id("part_123_page_data.lance").unwrap(),
3749 123
3750 );
3751 assert_eq!(
3752 super::extract_partition_id("part_456_page_lookup.lance").unwrap(),
3753 456
3754 );
3755 assert_eq!(
3756 super::extract_partition_id("part_4294967296_page_data.lance").unwrap(),
3757 4294967296
3758 );
3759
3760 assert!(super::extract_partition_id("invalid_filename.lance").is_err());
3762 assert!(super::extract_partition_id("part_abc_page_data.lance").is_err());
3763 assert!(super::extract_partition_id("part_123").is_err());
3764 assert!(super::extract_partition_id("part_").is_err());
3765 }
3766
3767 #[tokio::test]
3768 async fn test_cleanup_partition_files() {
3769 let tmpdir = TempObjDir::default();
3771 let test_store: Arc<dyn crate::scalar::IndexStore> = Arc::new(LanceIndexStore::new(
3772 Arc::new(ObjectStore::local()),
3773 tmpdir.clone(),
3774 Arc::new(LanceCache::no_cache()),
3775 ));
3776
3777 let lookup_files = vec![
3779 "part_123_page_lookup.lance".to_string(),
3780 "invalid_lookup_file.lance".to_string(),
3781 "part_456_page_lookup.lance".to_string(),
3782 ];
3783
3784 let page_files = vec![
3785 "part_123_page_data.lance".to_string(),
3786 "invalid_page_file.lance".to_string(),
3787 "part_456_page_data.lance".to_string(),
3788 ];
3789
3790 super::cleanup_partition_files(test_store.as_ref(), &lookup_files, &page_files).await;
3793 }
3794
3795 #[tokio::test]
3796 async fn test_btree_null_handling_in_queries() {
3797 let store = Arc::new(LanceIndexStore::new(
3798 Arc::new(ObjectStore::memory()),
3799 Path::default(),
3800 Arc::new(LanceCache::no_cache()),
3801 ));
3802
3803 let batch = record_batch!(
3806 ("value", Int32, [None, Some(0), Some(5)]),
3807 ("_rowid", UInt64, [0, 1, 2])
3808 )
3809 .unwrap();
3810 let stream = stream::once(futures::future::ok(batch.clone()));
3811 let stream = Box::pin(RecordBatchStreamAdapter::new(batch.schema(), stream));
3812
3813 super::train_btree_index(stream, store.as_ref(), 256, None, None)
3815 .await
3816 .unwrap();
3817
3818 let cache = LanceCache::with_capacity(1024 * 1024);
3819 let index = super::BTreeIndex::load(store.clone(), None, &cache)
3820 .await
3821 .unwrap();
3822
3823 let query = SargableQuery::Equals(ScalarValue::Int32(Some(5)));
3825 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3826
3827 match result {
3828 SearchResult::Exact(row_ids) => {
3829 let actual_rows: Vec<u64> = row_ids
3830 .true_rows()
3831 .row_addrs()
3832 .unwrap()
3833 .map(u64::from)
3834 .collect();
3835 assert_eq!(actual_rows, vec![2], "Should find row 2 where value == 5");
3836
3837 let null_row_ids = row_ids.null_rows();
3839 assert!(!null_row_ids.is_empty(), "null_row_ids should be non-empty");
3840 let null_rows: Vec<u64> =
3841 null_row_ids.row_addrs().unwrap().map(u64::from).collect();
3842 assert_eq!(null_rows, vec![0], "Should report row 0 as null");
3843 }
3844 _ => panic!("Expected Exact search result"),
3845 }
3846
3847 let query = SargableQuery::Range(
3849 std::ops::Bound::Included(ScalarValue::Int32(Some(0))),
3850 std::ops::Bound::Included(ScalarValue::Int32(Some(3))),
3851 );
3852 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3853
3854 match result {
3855 SearchResult::Exact(row_ids) => {
3856 let actual_rows: Vec<u64> = row_ids
3857 .true_rows()
3858 .row_addrs()
3859 .unwrap()
3860 .map(u64::from)
3861 .collect();
3862 assert_eq!(actual_rows, vec![1], "Should find row 1 where value == 0");
3863
3864 let null_row_ids = row_ids.null_rows();
3866 assert!(!null_row_ids.is_empty(), "null_row_ids should be non-empty");
3867 let null_rows: Vec<u64> =
3868 null_row_ids.row_addrs().unwrap().map(u64::from).collect();
3869 assert_eq!(null_rows, vec![0], "Should report row 0 as null");
3870 }
3871 _ => panic!("Expected Exact search result"),
3872 }
3873
3874 let query = SargableQuery::IsIn(vec![
3876 ScalarValue::Int32(Some(0)),
3877 ScalarValue::Int32(Some(5)),
3878 ]);
3879 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
3880
3881 match result {
3882 SearchResult::Exact(row_ids) => {
3883 let mut actual_rows: Vec<u64> = row_ids
3884 .true_rows()
3885 .row_addrs()
3886 .unwrap()
3887 .map(u64::from)
3888 .collect();
3889 actual_rows.sort();
3890 assert_eq!(
3891 actual_rows,
3892 vec![1, 2],
3893 "Should find rows 1 and 2 where value in [0, 5]"
3894 );
3895
3896 let null_row_ids = row_ids.null_rows();
3898 assert!(!null_row_ids.is_empty(), "null_row_ids should be non-empty");
3899 let null_rows: Vec<u64> =
3900 null_row_ids.row_addrs().unwrap().map(u64::from).collect();
3901 assert_eq!(null_rows, vec![0], "Should report row 0 as null");
3902 }
3903 _ => panic!("Expected Exact search result"),
3904 }
3905 }
3906
3907 #[tokio::test]
3908 async fn test_range_btree_index_consistency() {
3909 let full_tmpdir = TempObjDir::default();
3911 let full_store = Arc::new(LanceIndexStore::new(
3912 Arc::new(ObjectStore::local()),
3913 full_tmpdir.clone(),
3914 Arc::new(LanceCache::no_cache()),
3915 ));
3916
3917 let range_tmpdir = TempObjDir::default();
3918 let range_store = Arc::new(LanceIndexStore::new(
3919 Arc::new(ObjectStore::local()),
3920 range_tmpdir.clone(),
3921 Arc::new(LanceCache::no_cache()),
3922 ));
3923
3924 let total_count = 4 * DEFAULT_BTREE_BATCH_SIZE;
3927 let full_data_gen = gen_batch()
3928 .col("value", array::step::<Int32Type>())
3929 .col("_rowid", array::step::<UInt64Type>())
3930 .into_df_stream(RowCount::from(total_count / 4), BatchCount::from(4));
3931 let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
3932 full_data_gen.schema(),
3933 full_data_gen,
3934 ));
3935
3936 train_btree_index(
3937 full_data_source,
3938 full_store.as_ref(),
3939 DEFAULT_BTREE_BATCH_SIZE,
3940 None,
3941 None,
3942 )
3943 .await
3944 .unwrap();
3945
3946 let range1_gen = gen_batch()
3949 .col("value", array::step::<Int32Type>())
3950 .col("_rowid", array::step::<UInt64Type>())
3951 .into_df_stream(
3952 RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
3953 BatchCount::from(5),
3954 );
3955 let range1_data_source = Box::pin(RecordBatchStreamAdapter::new(
3956 range1_gen.schema(),
3957 range1_gen,
3958 ));
3959
3960 train_btree_index(
3961 range1_data_source,
3962 range_store.as_ref(),
3963 DEFAULT_BTREE_BATCH_SIZE,
3964 None,
3965 Option::from(0u32),
3966 )
3967 .await
3968 .unwrap();
3969
3970 let start_val = (DEFAULT_BTREE_BATCH_SIZE * 2 + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
3972 let end_val = (4 * DEFAULT_BTREE_BATCH_SIZE) as i32;
3973 let values_second_half: Vec<i32> = (start_val..end_val).collect();
3974 let row_ids_second_half: Vec<u64> = (start_val as u64..end_val as u64).collect();
3975 let range2_gen = gen_batch()
3976 .col("value", array::cycle::<Int32Type>(values_second_half))
3977 .col("_rowid", array::cycle::<UInt64Type>(row_ids_second_half))
3978 .into_df_stream(
3979 RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
3980 BatchCount::from(3),
3981 );
3982 let range2_data_source = Box::pin(RecordBatchStreamAdapter::new(
3983 range2_gen.schema(),
3984 range2_gen,
3985 ));
3986
3987 train_btree_index(
3988 range2_data_source,
3989 range_store.as_ref(),
3990 DEFAULT_BTREE_BATCH_SIZE,
3991 None,
3992 Option::from(1u32),
3993 )
3994 .await
3995 .unwrap();
3996
3997 let part_page_files = vec![
3999 part_page_data_file_path(0 << 32),
4000 part_page_data_file_path(1 << 32),
4001 ];
4002
4003 let part_lookup_files = vec![
4004 part_lookup_file_path(0 << 32),
4005 part_lookup_file_path(1 << 32),
4006 ];
4007
4008 super::merge_metadata_files(
4009 range_store.as_ref(),
4010 &part_page_files,
4011 &part_lookup_files,
4012 Option::from(1usize),
4013 noop_progress(),
4014 )
4015 .await
4016 .unwrap();
4017
4018 let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
4019 .await
4020 .unwrap();
4021
4022 let ranged_index = BTreeIndex::load(range_store.clone(), None, &LanceCache::no_cache())
4023 .await
4024 .unwrap();
4025
4026 let query_0 = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
4030 let full_result_0 = full_index
4031 .search(&query_0, &NoOpMetricsCollector)
4032 .await
4033 .unwrap();
4034 let ranged_result_0 = ranged_index
4035 .search(&query_0, &NoOpMetricsCollector)
4036 .await
4037 .unwrap();
4038 assert_eq!(full_result_0, ranged_result_0, "Query for value 0 failed");
4039
4040 let mid_first_batch = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
4042 let query_mid_first = SargableQuery::Equals(ScalarValue::Int32(Some(mid_first_batch)));
4043 let full_result_mid_first = full_index
4044 .search(&query_mid_first, &NoOpMetricsCollector)
4045 .await
4046 .unwrap();
4047 let ranged_result_mid_first = ranged_index
4048 .search(&query_mid_first, &NoOpMetricsCollector)
4049 .await
4050 .unwrap();
4051 assert_eq!(
4052 full_result_mid_first, ranged_result_mid_first,
4053 "Query for value {} failed",
4054 mid_first_batch
4055 );
4056
4057 let mid_last_batch = (DEFAULT_BTREE_BATCH_SIZE * 3 + (DEFAULT_BTREE_BATCH_SIZE / 2)) as i32;
4059 let query_mid_last = SargableQuery::Equals(ScalarValue::Int32(Some(mid_last_batch)));
4060 let full_result_mid_last = full_index
4061 .search(&query_mid_last, &NoOpMetricsCollector)
4062 .await
4063 .unwrap();
4064 let ranged_result_mid_last = ranged_index
4065 .search(&query_mid_last, &NoOpMetricsCollector)
4066 .await
4067 .unwrap();
4068 assert_eq!(
4069 full_result_mid_last, ranged_result_mid_last,
4070 "Query for value {} failed",
4071 mid_last_batch
4072 );
4073
4074 let max_val = (4 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
4076 let query_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val)));
4077 let full_result_max = full_index
4078 .search(&query_max, &NoOpMetricsCollector)
4079 .await
4080 .unwrap();
4081 let ranged_result_max = ranged_index
4082 .search(&query_max, &NoOpMetricsCollector)
4083 .await
4084 .unwrap();
4085 assert_eq!(
4086 full_result_max, ranged_result_max,
4087 "Query for maximum value {} failed",
4088 max_val
4089 );
4090
4091 let second_first_val = (DEFAULT_BTREE_BATCH_SIZE * 2 + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
4093 let query_second_first = SargableQuery::Equals(ScalarValue::Int32(Some(second_first_val)));
4094 let full_result_second_first = full_index
4095 .search(&query_second_first, &NoOpMetricsCollector)
4096 .await
4097 .unwrap();
4098 let ranged_result_second_first = ranged_index
4099 .search(&query_second_first, &NoOpMetricsCollector)
4100 .await
4101 .unwrap();
4102 assert_eq!(
4103 full_result_second_first, ranged_result_second_first,
4104 "Query for first value of the second page file {} failed",
4105 second_first_val
4106 );
4107
4108 let query_below_min = SargableQuery::Equals(ScalarValue::Int32(Some(-1)));
4110 let full_result_below = full_index
4111 .search(&query_below_min, &NoOpMetricsCollector)
4112 .await
4113 .unwrap();
4114 let ranged_result_below = ranged_index
4115 .search(&query_below_min, &NoOpMetricsCollector)
4116 .await
4117 .unwrap();
4118 assert_eq!(
4119 full_result_below, ranged_result_below,
4120 "Query for value below minimum (-1) failed"
4121 );
4122
4123 let query_above_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val + 1)));
4125 let full_result_above = full_index
4126 .search(&query_above_max, &NoOpMetricsCollector)
4127 .await
4128 .unwrap();
4129 let ranged_result_above = ranged_index
4130 .search(&query_above_max, &NoOpMetricsCollector)
4131 .await
4132 .unwrap();
4133 assert_eq!(
4134 full_result_above,
4135 ranged_result_above,
4136 "Query for value above maximum ({}) failed",
4137 max_val + 1
4138 );
4139
4140 let range_start =
4144 (DEFAULT_BTREE_BATCH_SIZE * 2 + DEFAULT_BTREE_BATCH_SIZE / 2 - 100) as i32;
4145 let range_end = range_start + 200;
4146 let query_cross_range = SargableQuery::Range(
4147 std::collections::Bound::Included(ScalarValue::Int32(Some(range_start))),
4148 std::collections::Bound::Excluded(ScalarValue::Int32(Some(range_end))),
4149 );
4150 let full_result_cross = full_index
4151 .search(&query_cross_range, &NoOpMetricsCollector)
4152 .await
4153 .unwrap();
4154 let ranged_result_cross = ranged_index
4155 .search(&query_cross_range, &NoOpMetricsCollector)
4156 .await
4157 .unwrap();
4158 assert_eq!(
4159 full_result_cross, ranged_result_cross,
4160 "Cross-range range query [{}, {}] failed",
4161 range_start, range_end
4162 );
4163
4164 let single_range_start = (DEFAULT_BTREE_BATCH_SIZE * 4 - 300) as i32;
4166 let single_range_end = single_range_start + 200;
4167 let query_single_range = SargableQuery::Range(
4168 std::collections::Bound::Included(ScalarValue::Int32(Some(single_range_start))),
4169 std::collections::Bound::Excluded(ScalarValue::Int32(Some(single_range_end))),
4170 );
4171 let full_result_single = full_index
4172 .search(&query_single_range, &NoOpMetricsCollector)
4173 .await
4174 .unwrap();
4175 let ranged_result_single = ranged_index
4176 .search(&query_single_range, &NoOpMetricsCollector)
4177 .await
4178 .unwrap();
4179 assert_eq!(
4180 full_result_single, ranged_result_single,
4181 "Single range query [{}, {}] failed",
4182 single_range_start, single_range_end
4183 );
4184
4185 let large_range_start = 100_i32;
4187 let large_range_end = (DEFAULT_BTREE_BATCH_SIZE * 4 - 100) as i32;
4188 let query_large_range = SargableQuery::Range(
4189 std::collections::Bound::Included(ScalarValue::Int32(Some(large_range_start))),
4190 std::collections::Bound::Excluded(ScalarValue::Int32(Some(large_range_end))),
4191 );
4192 let full_result_single = full_index
4193 .search(&query_large_range, &NoOpMetricsCollector)
4194 .await
4195 .unwrap();
4196 let ranged_result_single = ranged_index
4197 .search(&query_large_range, &NoOpMetricsCollector)
4198 .await
4199 .unwrap();
4200 assert_eq!(
4201 full_result_single, ranged_result_single,
4202 "Single fragment range query [{}, {}] failed",
4203 large_range_start, large_range_end
4204 );
4205
4206 let remap_dir = TempObjDir::default();
4207 let remap_store = Arc::new(LanceIndexStore::new(
4208 Arc::new(ObjectStore::local()),
4209 remap_dir.clone(),
4210 Arc::new(LanceCache::no_cache()),
4211 ));
4212
4213 ranged_index
4215 .remap(&HashMap::default(), remap_store.as_ref())
4216 .await
4217 .unwrap();
4218
4219 let remap_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache())
4220 .await
4221 .unwrap();
4222
4223 assert_eq!(remap_index.page_lookup, ranged_index.page_lookup);
4224
4225 let ranged_pages = range_store
4226 .open_index_file(part_page_data_file_path(1 << 32).as_str())
4227 .await
4228 .unwrap();
4229 let remapped_pages = remap_store
4230 .open_index_file(part_page_data_file_path(1 << 32).as_str())
4231 .await
4232 .unwrap();
4233
4234 assert_eq!(ranged_pages.num_rows(), remapped_pages.num_rows());
4235
4236 let original_data = ranged_pages
4237 .read_record_batch(0, ranged_pages.num_rows() as u64)
4238 .await
4239 .unwrap();
4240 let remapped_data = remapped_pages
4241 .read_record_batch(0, remapped_pages.num_rows() as u64)
4242 .await
4243 .unwrap();
4244
4245 assert_eq!(original_data, remapped_data);
4246 }
4247
4248 #[tokio::test]
4249 async fn test_update_ranged_index() {
4250 let old_tmpdir = TempObjDir::default();
4252 let old_store = Arc::new(LanceIndexStore::new(
4253 Arc::new(ObjectStore::local()),
4254 old_tmpdir.clone(),
4255 Arc::new(LanceCache::no_cache()),
4256 ));
4257
4258 let new_tmpdir = TempObjDir::default();
4259 let new_store = Arc::new(LanceIndexStore::new(
4260 Arc::new(ObjectStore::local()),
4261 new_tmpdir.clone(),
4262 Arc::new(LanceCache::no_cache()),
4263 ));
4264
4265 let range1_gen = gen_batch()
4267 .col("value", array::step::<Int32Type>())
4268 .col("_rowid", array::step::<UInt64Type>())
4269 .into_df_stream(
4270 RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
4271 BatchCount::from(5),
4272 );
4273 let range1_data_source = Box::pin(RecordBatchStreamAdapter::new(
4274 range1_gen.schema(),
4275 range1_gen,
4276 ));
4277
4278 train_btree_index(
4279 range1_data_source,
4280 old_store.as_ref(),
4281 DEFAULT_BTREE_BATCH_SIZE,
4282 None,
4283 Option::from(1u32),
4284 )
4285 .await
4286 .unwrap();
4287
4288 let start_val = (DEFAULT_BTREE_BATCH_SIZE * 2 + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
4290 let end_val = (4 * DEFAULT_BTREE_BATCH_SIZE) as i32;
4291 let values_second_half: Vec<i32> = (start_val..end_val).collect();
4292 let row_ids_second_half: Vec<u64> = (start_val as u64..end_val as u64).collect();
4293 let range2_gen = gen_batch()
4294 .col("value", array::cycle::<Int32Type>(values_second_half))
4295 .col("_rowid", array::cycle::<UInt64Type>(row_ids_second_half))
4296 .into_df_stream(
4297 RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
4298 BatchCount::from(3),
4299 );
4300 let range2_data_source = Box::pin(RecordBatchStreamAdapter::new(
4301 range2_gen.schema(),
4302 range2_gen,
4303 ));
4304
4305 train_btree_index(
4306 range2_data_source,
4307 old_store.as_ref(),
4308 DEFAULT_BTREE_BATCH_SIZE,
4309 None,
4310 Option::from(2u32),
4311 )
4312 .await
4313 .unwrap();
4314
4315 let part_page_files = vec![
4317 part_page_data_file_path(1 << 32),
4318 part_page_data_file_path(2 << 32),
4319 ];
4320
4321 let part_lookup_files = vec![
4322 part_lookup_file_path(1 << 32),
4323 part_lookup_file_path(2 << 32),
4324 ];
4325
4326 super::merge_metadata_files(
4327 old_store.as_ref(),
4328 &part_page_files,
4329 &part_lookup_files,
4330 Option::from(1usize),
4331 noop_progress(),
4332 )
4333 .await
4334 .unwrap();
4335
4336 let start_val = (DEFAULT_BTREE_BATCH_SIZE * 2) as i32;
4338 let end_val = (DEFAULT_BTREE_BATCH_SIZE * 3) as i32;
4339 let row_id_delta = (DEFAULT_BTREE_BATCH_SIZE * 3) as i32;
4340 let values: Vec<i32> = (start_val..end_val).collect();
4341 let row_ids: Vec<u64> =
4342 ((start_val + row_id_delta) as u64..(end_val + row_id_delta) as u64).collect();
4343 let update_data = gen_batch()
4344 .col("value", array::cycle::<Int32Type>(values))
4345 .col("_rowid", array::cycle::<UInt64Type>(row_ids))
4346 .into_df_stream(
4347 RowCount::from(DEFAULT_BTREE_BATCH_SIZE / 2),
4348 BatchCount::from(2),
4349 );
4350 let update_data_source = Box::pin(RecordBatchStreamAdapter::new(
4351 update_data.schema(),
4352 update_data,
4353 ));
4354
4355 let ranged_index = BTreeIndex::load(old_store.clone(), None, &LanceCache::no_cache())
4356 .await
4357 .unwrap();
4358
4359 ranged_index
4361 .update(update_data_source, new_store.as_ref(), None)
4362 .await
4363 .expect("Error in updating ranged index");
4364
4365 let updated_index = BTreeIndex::load(new_store.clone(), None, &LanceCache::no_cache())
4366 .await
4367 .unwrap();
4368
4369 assert!(
4370 updated_index.ranges_to_files.is_none(),
4371 "Updated ranged-btree-index should fall back to non-ranged"
4372 );
4373
4374 let updated_value = (DEFAULT_BTREE_BATCH_SIZE * 2 + (DEFAULT_BTREE_BATCH_SIZE / 2)) as i32;
4375 let updated_query = SargableQuery::Equals(ScalarValue::Int32(Some(updated_value)));
4376
4377 let query_result = updated_index
4378 .search(&updated_query, &NoOpMetricsCollector)
4379 .await
4380 .unwrap();
4381 match query_result {
4382 SearchResult::Exact(row_id_map) => {
4383 assert!(
4384 row_id_map.selected(updated_value as u64),
4385 "Updated index should contain original rowids."
4386 );
4387 assert!(
4388 row_id_map.selected((updated_value + row_id_delta) as u64),
4389 "Updated index should contain new rowids"
4390 );
4391 }
4392 _ => {
4393 panic!("Btree search result should always be Exact.");
4394 }
4395 }
4396 }
4397
4398 #[tokio::test]
4399 async fn test_update_with_exact_row_id_filter() {
4400 let old_tmpdir = TempObjDir::default();
4401 let old_store = Arc::new(LanceIndexStore::new(
4402 Arc::new(ObjectStore::local()),
4403 old_tmpdir.clone(),
4404 Arc::new(LanceCache::no_cache()),
4405 ));
4406
4407 let new_tmpdir = TempObjDir::default();
4408 let new_store = Arc::new(LanceIndexStore::new(
4409 Arc::new(ObjectStore::local()),
4410 new_tmpdir.clone(),
4411 Arc::new(LanceCache::no_cache()),
4412 ));
4413
4414 let old_data = gen_batch()
4415 .col("value", array::step::<Int32Type>())
4416 .col("_rowid", array::step::<UInt64Type>())
4417 .into_df_stream(RowCount::from(512), BatchCount::from(2));
4418 let old_data_source = Box::pin(RecordBatchStreamAdapter::new(old_data.schema(), old_data));
4419 train_btree_index(
4420 old_data_source,
4421 old_store.as_ref(),
4422 DEFAULT_BTREE_BATCH_SIZE,
4423 None,
4424 None,
4425 )
4426 .await
4427 .unwrap();
4428
4429 let index = BTreeIndex::load(old_store.clone(), None, &LanceCache::no_cache())
4430 .await
4431 .unwrap();
4432
4433 let new_data = gen_batch()
4434 .col("value", array::step_custom::<Int32Type>(2000, 1))
4435 .col("_rowid", array::step_custom::<UInt64Type>(2000, 1))
4436 .into_df_stream(RowCount::from(100), BatchCount::from(1));
4437 let new_data_source = Box::pin(RecordBatchStreamAdapter::new(new_data.schema(), new_data));
4438
4439 let mut retained_old_rows = RowAddrTreeMap::new();
4440 retained_old_rows.insert_range(0..64);
4441 retained_old_rows.insert_range(300..364);
4442
4443 index
4444 .update(
4445 new_data_source,
4446 new_store.as_ref(),
4447 Some(OldIndexDataFilter::RowIds(retained_old_rows)),
4448 )
4449 .await
4450 .unwrap();
4451
4452 let updated_index = BTreeIndex::load(new_store.clone(), None, &LanceCache::no_cache())
4453 .await
4454 .unwrap();
4455
4456 let present = |value: i32| {
4457 let updated_index = updated_index.clone();
4458 async move {
4459 let query = SargableQuery::Equals(ScalarValue::Int32(Some(value)));
4460 match updated_index
4461 .search(&query, &NoOpMetricsCollector)
4462 .await
4463 .unwrap()
4464 {
4465 SearchResult::Exact(row_id_map) => row_id_map.selected(value as u64),
4466 _ => unreachable!("Btree search result should always be Exact"),
4467 }
4468 }
4469 };
4470
4471 assert!(present(12).await);
4472 assert!(present(320).await);
4473 assert!(!present(120).await);
4474 assert!(!present(420).await);
4475 assert!(present(2005).await);
4476 }
4477
4478 #[tokio::test]
4488 async fn test_btree_remap_big_deletions() {
4489 let tmpdir = TempObjDir::default();
4490 let test_store = Arc::new(LanceIndexStore::new(
4491 Arc::new(ObjectStore::local()),
4492 tmpdir.clone(),
4493 Arc::new(LanceCache::no_cache()),
4494 ));
4495
4496 let batch_size = 4096;
4499 let total_rows = 15000;
4500
4501 let stream = gen_batch()
4502 .col("value", array::step::<Int32Type>())
4503 .col("_rowid", array::step::<UInt64Type>())
4504 .into_df_stream(RowCount::from(total_rows), BatchCount::from(1));
4505
4506 train_btree_index(stream, test_store.as_ref(), batch_size, None, None)
4507 .await
4508 .unwrap();
4509
4510 let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache())
4511 .await
4512 .unwrap();
4513
4514 let mut mapping: HashMap<u64, Option<u64>> = HashMap::new();
4520
4521 for old_id in 1001..10000 {
4523 mapping.insert(old_id, None);
4524 }
4525
4526 let mut new_id_counter = 100_000;
4527
4528 for old_id in (0..1000).chain(10000..15000) {
4530 let new_id = new_id_counter;
4531 new_id_counter += 1;
4532 mapping.insert(old_id, Some(new_id));
4533 }
4534
4535 let remap_dir = TempObjDir::default();
4536 let remap_store = Arc::new(LanceIndexStore::new(
4537 Arc::new(ObjectStore::local()),
4538 remap_dir.clone(),
4539 Arc::new(LanceCache::no_cache()),
4540 ));
4541
4542 index.remap(&mapping, remap_store.as_ref()).await.unwrap();
4544
4545 let remapped_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache())
4546 .await
4547 .unwrap();
4548
4549 let should_exist = vec![0, 500, 1000, 10000, 13000, 14000, 14999];
4553 for value in should_exist {
4554 let query = SargableQuery::Equals(ScalarValue::Int32(Some(value)));
4555 let result = remapped_index
4556 .search(&query, &NoOpMetricsCollector)
4557 .await
4558 .unwrap();
4559 match result {
4560 SearchResult::Exact(row_id_map) => {
4561 assert!(
4562 !row_id_map.is_empty(),
4563 "Value {} should exist in remapped index but was not found",
4564 value
4565 );
4566 }
4567 _ => {
4568 panic!("Btree search result should always be Exact.");
4569 }
4570 }
4571 }
4572
4573 let should_not_exist = vec![1001, 5000, 8000, 9999];
4575 for value in should_not_exist {
4576 let query = SargableQuery::Equals(ScalarValue::Int32(Some(value)));
4577 let result = remapped_index
4578 .search(&query, &NoOpMetricsCollector)
4579 .await
4580 .unwrap();
4581 match result {
4582 SearchResult::Exact(row_id_map) => {
4583 assert!(
4584 row_id_map.is_empty(),
4585 "Value {} should NOT exist in remapped index but was found",
4586 value
4587 );
4588 }
4589 _ => {
4590 panic!("Btree search result should always be Exact.");
4591 }
4592 }
4593 }
4594 }
4595
4596 #[tokio::test]
4602 async fn test_search_tracks_nulls_for_absent_value() {
4603 use arrow_array::{Int32Array, UInt64Array};
4604
4605 let tmpdir = TempObjDir::default();
4606 let test_store = Arc::new(LanceIndexStore::new(
4607 Arc::new(ObjectStore::local()),
4608 tmpdir.clone(),
4609 Arc::new(LanceCache::no_cache()),
4610 ));
4611
4612 let num_rows = 5000u64;
4616 let values: Int32Array = (0..num_rows)
4617 .map(|i| {
4618 if i % 5 != 0 {
4619 None } else {
4621 Some(100 + i as i32) }
4623 })
4624 .collect();
4625 let row_ids = UInt64Array::from_iter_values(0..num_rows);
4626 let data = arrow_array::RecordBatch::try_from_iter(vec![
4627 ("value", Arc::new(values) as arrow_array::ArrayRef),
4628 ("_rowid", Arc::new(row_ids) as arrow_array::ArrayRef),
4629 ])
4630 .unwrap();
4631
4632 let schema = data.schema();
4633 let stream: SendableRecordBatchStream = Box::pin(RecordBatchStreamAdapter::new(
4634 schema,
4635 stream::iter(vec![Ok(data)]),
4636 ));
4637 train_btree_index(stream, test_store.as_ref(), num_rows, None, None)
4638 .await
4639 .unwrap();
4640
4641 let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache())
4642 .await
4643 .unwrap();
4644
4645 assert!(
4647 !index.page_lookup.all_null_pages.is_empty(),
4648 "Test setup requires all-null pages; got null_pages={}, all_null_pages={}",
4649 index.page_lookup.null_pages.len(),
4650 index.page_lookup.all_null_pages.len(),
4651 );
4652
4653 let metrics = NoOpMetricsCollector;
4654
4655 let result = index
4657 .search(
4658 &SargableQuery::Equals(ScalarValue::Int32(Some(0))),
4659 &metrics,
4660 )
4661 .await
4662 .unwrap();
4663
4664 match result {
4665 SearchResult::Exact(set) => {
4666 assert!(set.true_rows().is_empty(), "No rows should match Equals(0)");
4668 assert!(
4670 !set.null_rows().is_empty(),
4671 "Null rows must be tracked even when no pages match the value"
4672 );
4673 }
4674 _ => panic!("BTree search should return Exact"),
4675 }
4676
4677 let result = index
4679 .search(
4680 &SargableQuery::Range(
4681 std::ops::Bound::Unbounded,
4682 std::ops::Bound::Excluded(ScalarValue::Int32(Some(50))),
4683 ),
4684 &metrics,
4685 )
4686 .await
4687 .unwrap();
4688
4689 match result {
4690 SearchResult::Exact(set) => {
4691 assert!(set.true_rows().is_empty(), "No rows should be < 50");
4692 assert!(
4693 !set.null_rows().is_empty(),
4694 "Null rows must be tracked for range queries too"
4695 );
4696 }
4697 _ => panic!("BTree search should return Exact"),
4698 }
4699 }
4700}