1use std::{
5 any::Any,
6 cmp::Ordering,
7 collections::{BTreeMap, BinaryHeap, HashMap},
8 fmt::{Debug, Display},
9 ops::Bound,
10 sync::Arc,
11};
12
13use super::{
14 flat::FlatIndexMetadata, AnyQuery, BuiltinIndexType, IndexReader, IndexStore, IndexWriter,
15 MetricsCollector, SargableQuery, ScalarIndex, ScalarIndexParams, SearchResult,
16};
17use crate::pbold;
18use crate::{
19 frag_reuse::FragReuseIndex,
20 scalar::{
21 expression::{SargableQueryParser, ScalarQueryParser},
22 registry::{ScalarIndexPlugin, TrainingOrdering, TrainingRequest, VALUE_COLUMN_NAME},
23 CreatedIndex, UpdateCriteria,
24 },
25};
26use crate::{metrics::NoOpMetricsCollector, scalar::registry::TrainingCriteria};
27use crate::{Index, IndexType};
28use arrow_array::{new_empty_array, Array, RecordBatch, UInt32Array};
29use arrow_schema::{DataType, Field, Schema, SortOptions};
30use async_trait::async_trait;
31use datafusion::physical_plan::{
32 sorts::sort_preserving_merge::SortPreservingMergeExec, stream::RecordBatchStreamAdapter,
33 union::UnionExec, ExecutionPlan, SendableRecordBatchStream,
34};
35use datafusion_common::{DataFusionError, ScalarValue};
36use datafusion_physical_expr::{expressions::Column, PhysicalSortExpr};
37use deepsize::DeepSizeOf;
38use futures::{
39 future::BoxFuture,
40 stream::{self},
41 FutureExt, Stream, StreamExt, TryFutureExt, TryStreamExt,
42};
43use lance_core::{
44 cache::{CacheKey, LanceCache, WeakLanceCache},
45 error::LanceOptionExt,
46 utils::{
47 mask::RowIdTreeMap,
48 tokio::get_num_compute_intensive_cpus,
49 tracing::{IO_TYPE_LOAD_SCALAR_PART, TRACE_IO_EVENTS},
50 },
51 Error, Result, ROW_ID,
52};
53use lance_datafusion::{
54 chunker::chunk_concat_stream,
55 exec::{execute_plan, LanceExecutionOptions, OneShotExec},
56};
57use lance_io::object_store::ObjectStore;
58use log::{debug, warn};
59use object_store::path::Path;
60use roaring::RoaringBitmap;
61use serde::{Deserialize, Serialize, Serializer};
62use snafu::location;
63use tracing::info;
64
65const BTREE_LOOKUP_NAME: &str = "page_lookup.lance";
66const BTREE_PAGES_NAME: &str = "page_data.lance";
67pub const DEFAULT_BTREE_BATCH_SIZE: u64 = 4096;
68const BATCH_SIZE_META_KEY: &str = "batch_size";
69const BTREE_INDEX_VERSION: u32 = 0;
70pub(crate) const BTREE_VALUES_COLUMN: &str = "values";
71pub(crate) const BTREE_IDS_COLUMN: &str = "ids";
72
73#[derive(Clone, Debug)]
75pub struct OrderableScalarValue(pub ScalarValue);
76
77impl DeepSizeOf for OrderableScalarValue {
78 fn deep_size_of_children(&self, _context: &mut deepsize::Context) -> usize {
79 self.0.size() - std::mem::size_of::<ScalarValue>()
81 }
82}
83
84impl Display for OrderableScalarValue {
85 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
86 std::fmt::Display::fmt(&self.0, f)
87 }
88}
89
90impl PartialEq for OrderableScalarValue {
91 fn eq(&self, other: &Self) -> bool {
92 self.0.eq(&other.0)
93 }
94}
95
96impl Eq for OrderableScalarValue {}
97
98impl PartialOrd for OrderableScalarValue {
99 fn partial_cmp(&self, other: &Self) -> Option<std::cmp::Ordering> {
100 Some(self.cmp(other))
101 }
102}
103
104impl Ord for OrderableScalarValue {
110 fn cmp(&self, other: &Self) -> Ordering {
111 use ScalarValue::*;
112 match (&self.0, &other.0) {
116 (Decimal128(v1, p1, s1), Decimal128(v2, p2, s2)) => {
117 if p1.eq(p2) && s1.eq(s2) {
118 v1.cmp(v2)
119 } else {
120 panic!("Attempt to compare decimals with unequal precision / scale")
122 }
123 }
124 (Decimal128(v1, _, _), Null) => {
125 if v1.is_none() {
126 Ordering::Equal
127 } else {
128 Ordering::Greater
129 }
130 }
131 (Decimal128(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
132 (Decimal256(v1, p1, s1), Decimal256(v2, p2, s2)) => {
133 if p1.eq(p2) && s1.eq(s2) {
134 v1.cmp(v2)
135 } else {
136 panic!("Attempt to compare decimals with unequal precision / scale")
138 }
139 }
140 (Decimal256(v1, _, _), Null) => {
141 if v1.is_none() {
142 Ordering::Equal
143 } else {
144 Ordering::Greater
145 }
146 }
147 (Decimal256(_, _, _), _) => panic!("Attempt to compare decimal with non-decimal"),
148 (Boolean(v1), Boolean(v2)) => v1.cmp(v2),
149 (Boolean(v1), Null) => {
150 if v1.is_none() {
151 Ordering::Equal
152 } else {
153 Ordering::Greater
154 }
155 }
156 (Boolean(_), _) => panic!("Attempt to compare boolean with non-boolean"),
157 (Float32(v1), Float32(v2)) => match (v1, v2) {
158 (Some(f1), Some(f2)) => f1.total_cmp(f2),
159 (None, Some(_)) => Ordering::Less,
160 (Some(_), None) => Ordering::Greater,
161 (None, None) => Ordering::Equal,
162 },
163 (Float32(v1), Null) => {
164 if v1.is_none() {
165 Ordering::Equal
166 } else {
167 Ordering::Greater
168 }
169 }
170 (Float32(_), _) => panic!("Attempt to compare f32 with non-f32"),
171 (Float64(v1), Float64(v2)) => match (v1, v2) {
172 (Some(f1), Some(f2)) => f1.total_cmp(f2),
173 (None, Some(_)) => Ordering::Less,
174 (Some(_), None) => Ordering::Greater,
175 (None, None) => Ordering::Equal,
176 },
177 (Float64(v1), Null) => {
178 if v1.is_none() {
179 Ordering::Equal
180 } else {
181 Ordering::Greater
182 }
183 }
184 (Float64(_), _) => panic!("Attempt to compare f64 with non-f64"),
185 (Float16(v1), Float16(v2)) => match (v1, v2) {
186 (Some(f1), Some(f2)) => f1.total_cmp(f2),
187 (None, Some(_)) => Ordering::Less,
188 (Some(_), None) => Ordering::Greater,
189 (None, None) => Ordering::Equal,
190 },
191 (Float16(v1), Null) => {
192 if v1.is_none() {
193 Ordering::Equal
194 } else {
195 Ordering::Greater
196 }
197 }
198 (Float16(_), _) => panic!("Attempt to compare f16 with non-f16"),
199 (Int8(v1), Int8(v2)) => v1.cmp(v2),
200 (Int8(v1), Null) => {
201 if v1.is_none() {
202 Ordering::Equal
203 } else {
204 Ordering::Greater
205 }
206 }
207 (Int8(_), _) => panic!("Attempt to compare Int8 with non-Int8"),
208 (Int16(v1), Int16(v2)) => v1.cmp(v2),
209 (Int16(v1), Null) => {
210 if v1.is_none() {
211 Ordering::Equal
212 } else {
213 Ordering::Greater
214 }
215 }
216 (Int16(_), _) => panic!("Attempt to compare Int16 with non-Int16"),
217 (Int32(v1), Int32(v2)) => v1.cmp(v2),
218 (Int32(v1), Null) => {
219 if v1.is_none() {
220 Ordering::Equal
221 } else {
222 Ordering::Greater
223 }
224 }
225 (Int32(_), _) => panic!("Attempt to compare Int32 with non-Int32"),
226 (Int64(v1), Int64(v2)) => v1.cmp(v2),
227 (Int64(v1), Null) => {
228 if v1.is_none() {
229 Ordering::Equal
230 } else {
231 Ordering::Greater
232 }
233 }
234 (Int64(_), _) => panic!("Attempt to compare Int16 with non-Int64"),
235 (UInt8(v1), UInt8(v2)) => v1.cmp(v2),
236 (UInt8(v1), Null) => {
237 if v1.is_none() {
238 Ordering::Equal
239 } else {
240 Ordering::Greater
241 }
242 }
243 (UInt8(_), _) => panic!("Attempt to compare UInt8 with non-UInt8"),
244 (UInt16(v1), UInt16(v2)) => v1.cmp(v2),
245 (UInt16(v1), Null) => {
246 if v1.is_none() {
247 Ordering::Equal
248 } else {
249 Ordering::Greater
250 }
251 }
252 (UInt16(_), _) => panic!("Attempt to compare UInt16 with non-UInt16"),
253 (UInt32(v1), UInt32(v2)) => v1.cmp(v2),
254 (UInt32(v1), Null) => {
255 if v1.is_none() {
256 Ordering::Equal
257 } else {
258 Ordering::Greater
259 }
260 }
261 (UInt32(_), _) => panic!("Attempt to compare UInt32 with non-UInt32"),
262 (UInt64(v1), UInt64(v2)) => v1.cmp(v2),
263 (UInt64(v1), Null) => {
264 if v1.is_none() {
265 Ordering::Equal
266 } else {
267 Ordering::Greater
268 }
269 }
270 (UInt64(_), _) => panic!("Attempt to compare Int16 with non-UInt64"),
271 (Utf8(v1) | Utf8View(v1) | LargeUtf8(v1), Utf8(v2) | Utf8View(v2) | LargeUtf8(v2)) => {
272 v1.cmp(v2)
273 }
274 (Utf8(v1) | Utf8View(v1) | LargeUtf8(v1), Null) => {
275 if v1.is_none() {
276 Ordering::Equal
277 } else {
278 Ordering::Greater
279 }
280 }
281 (Utf8(_) | Utf8View(_) | LargeUtf8(_), _) => {
282 panic!("Attempt to compare Utf8 with non-Utf8")
283 }
284 (
285 Binary(v1) | LargeBinary(v1) | BinaryView(v1),
286 Binary(v2) | LargeBinary(v2) | BinaryView(v2),
287 ) => v1.cmp(v2),
288 (Binary(v1) | LargeBinary(v1) | BinaryView(v1), Null) => {
289 if v1.is_none() {
290 Ordering::Equal
291 } else {
292 Ordering::Greater
293 }
294 }
295 (Binary(_) | LargeBinary(_) | BinaryView(_), _) => {
296 panic!("Attempt to compare Binary with non-Binary")
297 }
298 (FixedSizeBinary(_, v1), FixedSizeBinary(_, v2)) => v1.cmp(v2),
299 (FixedSizeBinary(_, v1), Null) => {
300 if v1.is_none() {
301 Ordering::Equal
302 } else {
303 Ordering::Greater
304 }
305 }
306 (FixedSizeBinary(_, _), _) => {
307 panic!("Attempt to compare FixedSizeBinary with non-FixedSizeBinary")
308 }
309 (FixedSizeList(left), FixedSizeList(right)) => {
310 if left.eq(right) {
311 todo!()
312 } else {
313 panic!(
314 "Attempt to compare fixed size list elements with different widths/fields"
315 )
316 }
317 }
318 (FixedSizeList(left), Null) => {
319 if left.is_null(0) {
320 Ordering::Equal
321 } else {
322 Ordering::Greater
323 }
324 }
325 (FixedSizeList(_), _) => {
326 panic!("Attempt to compare FixedSizeList with non-FixedSizeList")
327 }
328 (List(_), List(_)) => todo!(),
329 (List(left), Null) => {
330 if left.is_null(0) {
331 Ordering::Equal
332 } else {
333 Ordering::Greater
334 }
335 }
336 (List(_), _) => {
337 panic!("Attempt to compare List with non-List")
338 }
339 (LargeList(_), _) => todo!(),
340 (Map(_), Map(_)) => todo!(),
341 (Map(left), Null) => {
342 if left.is_null(0) {
343 Ordering::Equal
344 } else {
345 Ordering::Greater
346 }
347 }
348 (Map(_), _) => {
349 panic!("Attempt to compare Map with non-Map")
350 }
351 (Date32(v1), Date32(v2)) => v1.cmp(v2),
352 (Date32(v1), Null) => {
353 if v1.is_none() {
354 Ordering::Equal
355 } else {
356 Ordering::Greater
357 }
358 }
359 (Date32(_), _) => panic!("Attempt to compare Date32 with non-Date32"),
360 (Date64(v1), Date64(v2)) => v1.cmp(v2),
361 (Date64(v1), Null) => {
362 if v1.is_none() {
363 Ordering::Equal
364 } else {
365 Ordering::Greater
366 }
367 }
368 (Date64(_), _) => panic!("Attempt to compare Date64 with non-Date64"),
369 (Time32Second(v1), Time32Second(v2)) => v1.cmp(v2),
370 (Time32Second(v1), Null) => {
371 if v1.is_none() {
372 Ordering::Equal
373 } else {
374 Ordering::Greater
375 }
376 }
377 (Time32Second(_), _) => panic!("Attempt to compare Time32Second with non-Time32Second"),
378 (Time32Millisecond(v1), Time32Millisecond(v2)) => v1.cmp(v2),
379 (Time32Millisecond(v1), Null) => {
380 if v1.is_none() {
381 Ordering::Equal
382 } else {
383 Ordering::Greater
384 }
385 }
386 (Time32Millisecond(_), _) => {
387 panic!("Attempt to compare Time32Millisecond with non-Time32Millisecond")
388 }
389 (Time64Microsecond(v1), Time64Microsecond(v2)) => v1.cmp(v2),
390 (Time64Microsecond(v1), Null) => {
391 if v1.is_none() {
392 Ordering::Equal
393 } else {
394 Ordering::Greater
395 }
396 }
397 (Time64Microsecond(_), _) => {
398 panic!("Attempt to compare Time64Microsecond with non-Time64Microsecond")
399 }
400 (Time64Nanosecond(v1), Time64Nanosecond(v2)) => v1.cmp(v2),
401 (Time64Nanosecond(v1), Null) => {
402 if v1.is_none() {
403 Ordering::Equal
404 } else {
405 Ordering::Greater
406 }
407 }
408 (Time64Nanosecond(_), _) => {
409 panic!("Attempt to compare Time64Nanosecond with non-Time64Nanosecond")
410 }
411 (TimestampSecond(v1, _), TimestampSecond(v2, _)) => v1.cmp(v2),
412 (TimestampSecond(v1, _), Null) => {
413 if v1.is_none() {
414 Ordering::Equal
415 } else {
416 Ordering::Greater
417 }
418 }
419 (TimestampSecond(_, _), _) => {
420 panic!("Attempt to compare TimestampSecond with non-TimestampSecond")
421 }
422 (TimestampMillisecond(v1, _), TimestampMillisecond(v2, _)) => v1.cmp(v2),
423 (TimestampMillisecond(v1, _), Null) => {
424 if v1.is_none() {
425 Ordering::Equal
426 } else {
427 Ordering::Greater
428 }
429 }
430 (TimestampMillisecond(_, _), _) => {
431 panic!("Attempt to compare TimestampMillisecond with non-TimestampMillisecond")
432 }
433 (TimestampMicrosecond(v1, _), TimestampMicrosecond(v2, _)) => v1.cmp(v2),
434 (TimestampMicrosecond(v1, _), Null) => {
435 if v1.is_none() {
436 Ordering::Equal
437 } else {
438 Ordering::Greater
439 }
440 }
441 (TimestampMicrosecond(_, _), _) => {
442 panic!("Attempt to compare TimestampMicrosecond with non-TimestampMicrosecond")
443 }
444 (TimestampNanosecond(v1, _), TimestampNanosecond(v2, _)) => v1.cmp(v2),
445 (TimestampNanosecond(v1, _), Null) => {
446 if v1.is_none() {
447 Ordering::Equal
448 } else {
449 Ordering::Greater
450 }
451 }
452 (TimestampNanosecond(_, _), _) => {
453 panic!("Attempt to compare TimestampNanosecond with non-TimestampNanosecond")
454 }
455 (IntervalYearMonth(v1), IntervalYearMonth(v2)) => v1.cmp(v2),
456 (IntervalYearMonth(v1), Null) => {
457 if v1.is_none() {
458 Ordering::Equal
459 } else {
460 Ordering::Greater
461 }
462 }
463 (IntervalYearMonth(_), _) => {
464 panic!("Attempt to compare IntervalYearMonth with non-IntervalYearMonth")
465 }
466 (IntervalDayTime(v1), IntervalDayTime(v2)) => v1.cmp(v2),
467 (IntervalDayTime(v1), Null) => {
468 if v1.is_none() {
469 Ordering::Equal
470 } else {
471 Ordering::Greater
472 }
473 }
474 (IntervalDayTime(_), _) => {
475 panic!("Attempt to compare IntervalDayTime with non-IntervalDayTime")
476 }
477 (IntervalMonthDayNano(v1), IntervalMonthDayNano(v2)) => v1.cmp(v2),
478 (IntervalMonthDayNano(v1), Null) => {
479 if v1.is_none() {
480 Ordering::Equal
481 } else {
482 Ordering::Greater
483 }
484 }
485 (IntervalMonthDayNano(_), _) => {
486 panic!("Attempt to compare IntervalMonthDayNano with non-IntervalMonthDayNano")
487 }
488 (DurationSecond(v1), DurationSecond(v2)) => v1.cmp(v2),
489 (DurationSecond(v1), Null) => {
490 if v1.is_none() {
491 Ordering::Equal
492 } else {
493 Ordering::Greater
494 }
495 }
496 (DurationSecond(_), _) => {
497 panic!("Attempt to compare DurationSecond with non-DurationSecond")
498 }
499 (DurationMillisecond(v1), DurationMillisecond(v2)) => v1.cmp(v2),
500 (DurationMillisecond(v1), Null) => {
501 if v1.is_none() {
502 Ordering::Equal
503 } else {
504 Ordering::Greater
505 }
506 }
507 (DurationMillisecond(_), _) => {
508 panic!("Attempt to compare DurationMillisecond with non-DurationMillisecond")
509 }
510 (DurationMicrosecond(v1), DurationMicrosecond(v2)) => v1.cmp(v2),
511 (DurationMicrosecond(v1), Null) => {
512 if v1.is_none() {
513 Ordering::Equal
514 } else {
515 Ordering::Greater
516 }
517 }
518 (DurationMicrosecond(_), _) => {
519 panic!("Attempt to compare DurationMicrosecond with non-DurationMicrosecond")
520 }
521 (DurationNanosecond(v1), DurationNanosecond(v2)) => v1.cmp(v2),
522 (DurationNanosecond(v1), Null) => {
523 if v1.is_none() {
524 Ordering::Equal
525 } else {
526 Ordering::Greater
527 }
528 }
529 (DurationNanosecond(_), _) => {
530 panic!("Attempt to compare DurationNanosecond with non-DurationNanosecond")
531 }
532 (Struct(_arr), Struct(_arr2)) => todo!(),
533 (Struct(arr), Null) => {
534 if arr.is_empty() {
535 Ordering::Equal
536 } else {
537 Ordering::Greater
538 }
539 }
540 (Struct(_arr), _) => panic!("Attempt to compare Struct with non-Struct"),
541 (Dictionary(_k1, _v1), Dictionary(_k2, _v2)) => todo!(),
542 (Dictionary(_, v1), Null) => Self(*v1.clone()).cmp(&Self(ScalarValue::Null)),
543 (Dictionary(_, _), _) => panic!("Attempt to compare Dictionary with non-Dictionary"),
544 (Union(_, _, _), _) => todo!("Support for union scalars"),
546 (Null, Null) => Ordering::Equal,
547 (Null, _) => todo!(),
548 }
549 }
550}
551
552#[derive(Debug, DeepSizeOf, PartialEq, Eq)]
553struct PageRecord {
554 max: OrderableScalarValue,
555 page_number: u32,
556}
557
558trait BTreeMapExt<K, V> {
559 fn largest_node_less(&self, key: &K) -> Option<(&K, &V)>;
560}
561
562impl<K: Ord, V> BTreeMapExt<K, V> for BTreeMap<K, V> {
563 fn largest_node_less(&self, key: &K) -> Option<(&K, &V)> {
564 self.range((Bound::Unbounded, Bound::Excluded(key)))
565 .next_back()
566 }
567}
568
569#[derive(Debug, DeepSizeOf, PartialEq, Eq)]
571pub struct BTreeLookup {
572 tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>,
573 null_pages: Vec<u32>,
575}
576
577impl BTreeLookup {
578 fn new(tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>, null_pages: Vec<u32>) -> Self {
579 Self { tree, null_pages }
580 }
581
582 fn pages_eq(&self, query: &OrderableScalarValue) -> Vec<u32> {
584 if query.0.is_null() {
585 self.pages_null()
586 } else {
587 self.pages_between((Bound::Included(query), Bound::Excluded(query)))
588 }
589 }
590
591 fn pages_in(&self, values: impl IntoIterator<Item = OrderableScalarValue>) -> Vec<u32> {
593 let page_lists = values
594 .into_iter()
595 .map(|val| self.pages_eq(&val))
596 .collect::<Vec<_>>();
597 let total_size = page_lists.iter().map(|set| set.len()).sum();
598 let mut heap = BinaryHeap::with_capacity(total_size);
599 for page_list in page_lists {
600 heap.extend(page_list);
601 }
602 let mut all_pages = heap.into_sorted_vec();
603 all_pages.dedup();
604 all_pages
605 }
606
607 fn pages_between(
609 &self,
610 range: (Bound<&OrderableScalarValue>, Bound<&OrderableScalarValue>),
611 ) -> Vec<u32> {
612 let lower_bound = match range.0 {
615 Bound::Unbounded => Bound::Unbounded,
616 Bound::Included(lower) => self
623 .tree
624 .largest_node_less(lower)
625 .map(|val| Bound::Included(val.0))
626 .unwrap_or(Bound::Unbounded),
627 Bound::Excluded(lower) => self
628 .tree
629 .largest_node_less(lower)
630 .map(|val| Bound::Included(val.0))
631 .unwrap_or(Bound::Unbounded),
632 };
633 let upper_bound = match range.1 {
634 Bound::Unbounded => Bound::Unbounded,
635 Bound::Included(upper) => Bound::Included(upper),
636 Bound::Excluded(upper) => Bound::Included(upper),
645 };
646
647 match (lower_bound, upper_bound) {
648 (Bound::Excluded(lower), Bound::Excluded(upper))
649 | (Bound::Excluded(lower), Bound::Included(upper))
650 | (Bound::Included(lower), Bound::Excluded(upper)) => {
651 if lower >= upper {
654 return vec![];
655 }
656 }
657 (Bound::Included(lower), Bound::Included(upper)) => {
658 if lower > upper {
659 return vec![];
660 }
661 }
662 _ => {}
663 }
664
665 let candidates = self
666 .tree
667 .range((lower_bound, upper_bound))
668 .flat_map(|val| val.1);
669 match lower_bound {
670 Bound::Unbounded => candidates.map(|val| val.page_number).collect(),
671 Bound::Included(lower_bound) => candidates
672 .filter(|val| val.max.cmp(lower_bound) != Ordering::Less)
673 .map(|val| val.page_number)
674 .collect(),
675 Bound::Excluded(lower_bound) => candidates
676 .filter(|val| val.max.cmp(lower_bound) == Ordering::Greater)
677 .map(|val| val.page_number)
678 .collect(),
679 }
680 }
681
682 fn pages_null(&self) -> Vec<u32> {
683 self.null_pages.clone()
684 }
685}
686
687#[derive(Clone)]
690struct LazyIndexReader {
691 index_reader: Arc<tokio::sync::Mutex<Option<Arc<dyn IndexReader>>>>,
692 store: Arc<dyn IndexStore>,
693}
694
695impl LazyIndexReader {
696 fn new(store: Arc<dyn IndexStore>) -> Self {
697 Self {
698 index_reader: Arc::new(tokio::sync::Mutex::new(None)),
699 store,
700 }
701 }
702
703 async fn get(&self) -> Result<Arc<dyn IndexReader>> {
704 let mut reader = self.index_reader.lock().await;
705 if reader.is_none() {
706 let index_reader = self.store.open_index_file(BTREE_PAGES_NAME).await?;
707 *reader = Some(index_reader);
708 }
709 Ok(reader.as_ref().unwrap().clone())
710 }
711}
712
713#[derive(Debug, Clone, DeepSizeOf)]
728pub struct CachedScalarIndex(Arc<dyn ScalarIndex>);
729
730impl CachedScalarIndex {
731 pub fn new(index: Arc<dyn ScalarIndex>) -> Self {
732 Self(index)
733 }
734
735 pub fn into_inner(self) -> Arc<dyn ScalarIndex> {
736 self.0
737 }
738}
739
740#[derive(Debug, Clone)]
741pub struct BTreePageKey {
742 pub page_number: u32,
743}
744
745impl CacheKey for BTreePageKey {
746 type ValueType = CachedScalarIndex;
747
748 fn key(&self) -> std::borrow::Cow<'_, str> {
749 format!("page-{}", self.page_number).into()
750 }
751}
752
753#[derive(Clone, Debug)]
756pub struct BTreeIndex {
757 page_lookup: Arc<BTreeLookup>,
758 index_cache: WeakLanceCache,
759 store: Arc<dyn IndexStore>,
760 sub_index: Arc<dyn BTreeSubIndex>,
761 batch_size: u64,
762 frag_reuse_index: Option<Arc<FragReuseIndex>>,
763}
764
765impl DeepSizeOf for BTreeIndex {
766 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
767 self.page_lookup.deep_size_of_children(context) + self.store.deep_size_of_children(context)
770 }
771}
772
773impl BTreeIndex {
774 fn new(
775 tree: BTreeMap<OrderableScalarValue, Vec<PageRecord>>,
776 null_pages: Vec<u32>,
777 store: Arc<dyn IndexStore>,
778 index_cache: WeakLanceCache,
779 sub_index: Arc<dyn BTreeSubIndex>,
780 batch_size: u64,
781 frag_reuse_index: Option<Arc<FragReuseIndex>>,
782 ) -> Self {
783 let page_lookup = Arc::new(BTreeLookup::new(tree, null_pages));
784 Self {
785 page_lookup,
786 store,
787 index_cache,
788 sub_index,
789 batch_size,
790 frag_reuse_index,
791 }
792 }
793
794 async fn lookup_page(
795 &self,
796 page_number: u32,
797 index_reader: LazyIndexReader,
798 metrics: &dyn MetricsCollector,
799 ) -> Result<Arc<dyn ScalarIndex>> {
800 self.index_cache
801 .get_or_insert_with_key(BTreePageKey { page_number }, move || async move {
802 let result = self.read_page(page_number, index_reader, metrics).await?;
803 Ok(CachedScalarIndex::new(result))
804 })
805 .await
806 .map(|v| v.as_ref().clone().into_inner())
807 }
808
809 async fn read_page(
810 &self,
811 page_number: u32,
812 index_reader: LazyIndexReader,
813 metrics: &dyn MetricsCollector,
814 ) -> Result<Arc<dyn ScalarIndex>> {
815 metrics.record_part_load();
816 info!(target: TRACE_IO_EVENTS, r#type=IO_TYPE_LOAD_SCALAR_PART, index_type="btree", part_id=page_number);
817 let index_reader = index_reader.get().await?;
818 let mut serialized_page = index_reader
819 .read_record_batch(page_number as u64, self.batch_size)
820 .await?;
821 if let Some(frag_reuse_index_ref) = self.frag_reuse_index.as_ref() {
822 serialized_page =
823 frag_reuse_index_ref.remap_row_ids_record_batch(serialized_page, 1)?;
824 }
825 let result = self.sub_index.load_subindex(serialized_page).await?;
826 Ok(result)
827 }
828
829 async fn search_page(
830 &self,
831 query: &SargableQuery,
832 page_number: u32,
833 index_reader: LazyIndexReader,
834 metrics: &dyn MetricsCollector,
835 ) -> Result<RowIdTreeMap> {
836 let subindex = self.lookup_page(page_number, index_reader, metrics).await?;
837 match subindex.search(query, metrics).await? {
842 SearchResult::Exact(map) => Ok(map),
843 _ => Err(Error::Internal {
844 message: "BTree sub-indices need to return exact results".to_string(),
845 location: location!(),
846 }),
847 }
848 }
849
850 fn try_from_serialized(
851 data: RecordBatch,
852 store: Arc<dyn IndexStore>,
853 index_cache: &LanceCache,
854 batch_size: u64,
855 frag_reuse_index: Option<Arc<FragReuseIndex>>,
856 ) -> Result<Self> {
857 let mut map = BTreeMap::<OrderableScalarValue, Vec<PageRecord>>::new();
858 let mut null_pages = Vec::<u32>::new();
859
860 if data.num_rows() == 0 {
861 let data_type = data.column(0).data_type().clone();
862 let sub_index = Arc::new(FlatIndexMetadata::new(data_type));
863 return Ok(Self::new(
864 map,
865 null_pages,
866 store,
867 WeakLanceCache::from(index_cache),
868 sub_index,
869 batch_size,
870 frag_reuse_index,
871 ));
872 }
873
874 let mins = data.column(0);
875 let maxs = data.column(1);
876 let null_counts = data
877 .column(2)
878 .as_any()
879 .downcast_ref::<UInt32Array>()
880 .unwrap();
881 let page_numbers = data
882 .column(3)
883 .as_any()
884 .downcast_ref::<UInt32Array>()
885 .unwrap();
886
887 for idx in 0..data.num_rows() {
888 let min = OrderableScalarValue(ScalarValue::try_from_array(&mins, idx)?);
889 let max = OrderableScalarValue(ScalarValue::try_from_array(&maxs, idx)?);
890 let null_count = null_counts.values()[idx];
891 let page_number = page_numbers.values()[idx];
892
893 if !max.0.is_null() {
895 map.entry(min)
896 .or_default()
897 .push(PageRecord { max, page_number });
898 }
899
900 if null_count > 0 {
901 null_pages.push(page_number);
902 }
903 }
904
905 let last_max = ScalarValue::try_from_array(&maxs, data.num_rows() - 1)?;
906 map.entry(OrderableScalarValue(last_max)).or_default();
907
908 let data_type = mins.data_type();
909
910 let sub_index = Arc::new(FlatIndexMetadata::new(data_type.clone()));
912
913 Ok(Self::new(
914 map,
915 null_pages,
916 store,
917 WeakLanceCache::from(index_cache),
918 sub_index,
919 batch_size,
920 frag_reuse_index,
921 ))
922 }
923
924 async fn load(
925 store: Arc<dyn IndexStore>,
926 frag_reuse_index: Option<Arc<FragReuseIndex>>,
927 index_cache: &LanceCache,
928 ) -> Result<Arc<Self>> {
929 let page_lookup_file = store.open_index_file(BTREE_LOOKUP_NAME).await?;
930 let num_rows_in_lookup = page_lookup_file.num_rows();
931 let serialized_lookup = page_lookup_file
932 .read_range(0..num_rows_in_lookup, None)
933 .await?;
934 let file_schema = page_lookup_file.schema();
935 let batch_size = file_schema
936 .metadata
937 .get(BATCH_SIZE_META_KEY)
938 .map(|bs| bs.parse().unwrap_or(DEFAULT_BTREE_BATCH_SIZE))
939 .unwrap_or(DEFAULT_BTREE_BATCH_SIZE);
940 Ok(Arc::new(Self::try_from_serialized(
941 serialized_lookup,
942 store,
943 index_cache,
944 batch_size,
945 frag_reuse_index,
946 )?))
947 }
948
949 async fn into_data_stream(self) -> Result<SendableRecordBatchStream> {
951 let reader = self.store.open_index_file(BTREE_PAGES_NAME).await?;
952 let schema = self.sub_index.schema().clone();
953 let value_field = schema.field(0).clone().with_name(VALUE_COLUMN_NAME);
954 let row_id_field = schema.field(1).clone().with_name(ROW_ID);
955 let new_schema = Arc::new(Schema::new(vec![value_field, row_id_field]));
956 let new_schema_clone = new_schema.clone();
957 let reader_stream = IndexReaderStream::new(reader, self.batch_size).await;
958 let batches = reader_stream
959 .map(|fut| fut.map_err(DataFusionError::from))
960 .buffered(self.store.io_parallelism())
961 .map_ok(move |batch| {
962 RecordBatch::try_new(
963 new_schema.clone(),
964 vec![batch.column(0).clone(), batch.column(1).clone()],
965 )
966 .unwrap()
967 })
968 .boxed();
969 Ok(Box::pin(RecordBatchStreamAdapter::new(
970 new_schema_clone,
971 batches,
972 )))
973 }
974
975 async fn into_old_data(self) -> Result<Arc<dyn ExecutionPlan>> {
976 let stream = self.into_data_stream().await?;
977 Ok(Arc::new(OneShotExec::new(stream)))
978 }
979
980 async fn combine_old_new(
981 self,
982 new_data: SendableRecordBatchStream,
983 chunk_size: u64,
984 ) -> Result<SendableRecordBatchStream> {
985 let value_column_index = new_data.schema().index_of(VALUE_COLUMN_NAME)?;
986
987 let new_input = Arc::new(OneShotExec::new(new_data));
988 let old_input = self.into_old_data().await?;
989 debug_assert_eq!(
990 old_input.schema().flattened_fields().len(),
991 new_input.schema().flattened_fields().len()
992 );
993
994 let sort_expr = PhysicalSortExpr {
995 expr: Arc::new(Column::new(VALUE_COLUMN_NAME, value_column_index)),
996 options: SortOptions {
997 descending: false,
998 nulls_first: true,
999 },
1000 };
1001 let all_data = Arc::new(UnionExec::new(vec![old_input, new_input]));
1004 let ordered = Arc::new(SortPreservingMergeExec::new([sort_expr].into(), all_data));
1005
1006 let unchunked = execute_plan(
1007 ordered,
1008 LanceExecutionOptions {
1009 use_spilling: true,
1010 ..Default::default()
1011 },
1012 )?;
1013 Ok(chunk_concat_stream(unchunked, chunk_size as usize))
1014 }
1015}
1016
1017fn wrap_bound(bound: &Bound<ScalarValue>) -> Bound<OrderableScalarValue> {
1018 match bound {
1019 Bound::Unbounded => Bound::Unbounded,
1020 Bound::Included(val) => Bound::Included(OrderableScalarValue(val.clone())),
1021 Bound::Excluded(val) => Bound::Excluded(OrderableScalarValue(val.clone())),
1022 }
1023}
1024
1025fn serialize_with_display<T: Display, S: Serializer>(
1026 value: &Option<T>,
1027 serializer: S,
1028) -> std::result::Result<S::Ok, S::Error> {
1029 if let Some(value) = value {
1030 serializer.collect_str(value)
1031 } else {
1032 serializer.collect_str("N/A")
1033 }
1034}
1035
1036#[derive(Serialize)]
1037struct BTreeStatistics {
1038 #[serde(serialize_with = "serialize_with_display")]
1039 min: Option<OrderableScalarValue>,
1040 #[serde(serialize_with = "serialize_with_display")]
1041 max: Option<OrderableScalarValue>,
1042 num_pages: u32,
1043}
1044
1045#[async_trait]
1046impl Index for BTreeIndex {
1047 fn as_any(&self) -> &dyn Any {
1048 self
1049 }
1050
1051 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
1052 self
1053 }
1054
1055 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
1056 Err(Error::NotSupported {
1057 source: "BTreeIndex is not vector index".into(),
1058 location: location!(),
1059 })
1060 }
1061
1062 async fn prewarm(&self) -> Result<()> {
1063 let index_reader = LazyIndexReader::new(self.store.clone());
1064 let reader = index_reader.get().await?;
1065 let num_rows = reader.num_rows();
1066 let batch_size = self.batch_size as usize;
1067 let num_pages = num_rows.div_ceil(batch_size);
1068 let mut pages = stream::iter(0..num_pages)
1069 .map(|page_idx| {
1070 let index_reader = index_reader.clone();
1071 let page_idx = page_idx as u32;
1072 async move {
1073 let page = self
1074 .read_page(page_idx, index_reader, &NoOpMetricsCollector)
1075 .await?;
1076 Result::Ok((page_idx, page))
1077 }
1078 })
1079 .buffer_unordered(get_num_compute_intensive_cpus());
1080
1081 while let Some((page_idx, page)) = pages.try_next().await? {
1082 let inserted = self
1083 .index_cache
1084 .insert_with_key(
1085 &BTreePageKey {
1086 page_number: page_idx,
1087 },
1088 Arc::new(CachedScalarIndex::new(page)),
1089 )
1090 .await;
1091
1092 if !inserted {
1093 return Err(Error::Internal {
1094 message: "Failed to prewarm index: cache is no longer available".to_string(),
1095 location: location!(),
1096 });
1097 }
1098 }
1099
1100 Ok(())
1101 }
1102
1103 fn index_type(&self) -> IndexType {
1104 IndexType::BTree
1105 }
1106
1107 fn statistics(&self) -> Result<serde_json::Value> {
1108 let min = self
1109 .page_lookup
1110 .tree
1111 .first_key_value()
1112 .map(|(k, _)| k.clone());
1113 let max = self
1114 .page_lookup
1115 .tree
1116 .last_key_value()
1117 .map(|(k, _)| k.clone());
1118 serde_json::to_value(&BTreeStatistics {
1119 num_pages: self.page_lookup.tree.len() as u32,
1120 min,
1121 max,
1122 })
1123 .map_err(|err| err.into())
1124 }
1125
1126 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
1127 let mut frag_ids = RoaringBitmap::default();
1128
1129 let sub_index_reader = self.store.open_index_file(BTREE_PAGES_NAME).await?;
1130 let mut reader_stream = IndexReaderStream::new(sub_index_reader, self.batch_size)
1131 .await
1132 .buffered(self.store.io_parallelism());
1133 while let Some(serialized) = reader_stream.try_next().await? {
1134 let page = self.sub_index.load_subindex(serialized).await?;
1135 frag_ids |= page.calculate_included_frags().await?;
1136 }
1137
1138 Ok(frag_ids)
1139 }
1140}
1141
1142#[async_trait]
1143impl ScalarIndex for BTreeIndex {
1144 async fn search(
1145 &self,
1146 query: &dyn AnyQuery,
1147 metrics: &dyn MetricsCollector,
1148 ) -> Result<SearchResult> {
1149 let query = query.as_any().downcast_ref::<SargableQuery>().unwrap();
1150 let pages = match query {
1151 SargableQuery::Equals(val) => self
1152 .page_lookup
1153 .pages_eq(&OrderableScalarValue(val.clone())),
1154 SargableQuery::Range(start, end) => self
1155 .page_lookup
1156 .pages_between((wrap_bound(start).as_ref(), wrap_bound(end).as_ref())),
1157 SargableQuery::IsIn(values) => self
1158 .page_lookup
1159 .pages_in(values.iter().map(|val| OrderableScalarValue(val.clone()))),
1160 SargableQuery::FullTextSearch(_) => return Err(Error::invalid_input(
1161 "full text search is not supported for BTree index, build a inverted index for it",
1162 location!(),
1163 )),
1164 SargableQuery::IsNull() => self.page_lookup.pages_null(),
1165 };
1166 let lazy_index_reader = LazyIndexReader::new(self.store.clone());
1167 let page_tasks = pages
1168 .into_iter()
1169 .map(|page_index| {
1170 self.search_page(query, page_index, lazy_index_reader.clone(), metrics)
1171 .boxed()
1172 })
1173 .collect::<Vec<_>>();
1174 debug!("Searching {} btree pages", page_tasks.len());
1175 let row_ids = stream::iter(page_tasks)
1176 .buffered(get_num_compute_intensive_cpus())
1179 .try_collect::<RowIdTreeMap>()
1180 .await?;
1181 Ok(SearchResult::Exact(row_ids))
1182 }
1183
1184 fn can_remap(&self) -> bool {
1185 true
1186 }
1187
1188 async fn remap(
1189 &self,
1190 mapping: &HashMap<u64, Option<u64>>,
1191 dest_store: &dyn IndexStore,
1192 ) -> Result<CreatedIndex> {
1193 let mut sub_index_file = dest_store
1195 .new_index_file(BTREE_PAGES_NAME, self.sub_index.schema().clone())
1196 .await?;
1197
1198 let sub_index_reader = self.store.open_index_file(BTREE_PAGES_NAME).await?;
1199 let mut reader_stream = IndexReaderStream::new(sub_index_reader, self.batch_size)
1200 .await
1201 .buffered(self.store.io_parallelism());
1202 while let Some(serialized) = reader_stream.try_next().await? {
1203 let remapped = self.sub_index.remap_subindex(serialized, mapping).await?;
1204 sub_index_file.write_record_batch(remapped).await?;
1205 }
1206
1207 sub_index_file.finish().await?;
1208
1209 self.store
1211 .copy_index_file(BTREE_LOOKUP_NAME, dest_store)
1212 .await?;
1213
1214 Ok(CreatedIndex {
1215 index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1216 .unwrap(),
1217 index_version: BTREE_INDEX_VERSION,
1218 })
1219 }
1220
1221 async fn update(
1222 &self,
1223 new_data: SendableRecordBatchStream,
1224 dest_store: &dyn IndexStore,
1225 ) -> Result<CreatedIndex> {
1226 let merged_data_source = self
1228 .clone()
1229 .combine_old_new(new_data, self.batch_size)
1230 .await?;
1231 train_btree_index(
1232 merged_data_source,
1233 self.sub_index.as_ref(),
1234 dest_store,
1235 self.batch_size,
1236 None,
1237 )
1238 .await?;
1239
1240 Ok(CreatedIndex {
1241 index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1242 .unwrap(),
1243 index_version: BTREE_INDEX_VERSION,
1244 })
1245 }
1246
1247 fn update_criteria(&self) -> UpdateCriteria {
1248 UpdateCriteria::only_new_data(TrainingCriteria::new(TrainingOrdering::Values).with_row_id())
1249 }
1250
1251 fn derive_index_params(&self) -> Result<ScalarIndexParams> {
1252 let params = serde_json::to_value(BTreeParameters {
1253 zone_size: Some(self.batch_size),
1254 })?;
1255 Ok(ScalarIndexParams::for_builtin(BuiltinIndexType::BTree).with_params(¶ms))
1256 }
1257}
1258
1259struct BatchStats {
1260 min: ScalarValue,
1261 max: ScalarValue,
1262 null_count: u32,
1263}
1264
1265fn analyze_batch(batch: &RecordBatch) -> Result<BatchStats> {
1266 let values = batch.column_by_name(VALUE_COLUMN_NAME).expect_ok()?;
1267 if values.is_empty() {
1268 return Err(Error::Internal {
1269 message: "received an empty batch in btree training".to_string(),
1270 location: location!(),
1271 });
1272 }
1273 let min = ScalarValue::try_from_array(&values, 0).map_err(|e| Error::Internal {
1274 message: format!("failed to get min value from batch: {}", e),
1275 location: location!(),
1276 })?;
1277 let max =
1278 ScalarValue::try_from_array(&values, values.len() - 1).map_err(|e| Error::Internal {
1279 message: format!("failed to get max value from batch: {}", e),
1280 location: location!(),
1281 })?;
1282
1283 Ok(BatchStats {
1284 min,
1285 max,
1286 null_count: values.null_count() as u32,
1287 })
1288}
1289
1290#[async_trait]
1292pub trait BTreeSubIndex: Debug + Send + Sync + DeepSizeOf {
1293 async fn train(&self, batch: RecordBatch) -> Result<RecordBatch>;
1295
1296 async fn load_subindex(&self, serialized: RecordBatch) -> Result<Arc<dyn ScalarIndex>>;
1298
1299 async fn retrieve_data(&self, serialized: RecordBatch) -> Result<RecordBatch>;
1306
1307 fn schema(&self) -> &Arc<Schema>;
1309
1310 async fn remap_subindex(
1312 &self,
1313 serialized: RecordBatch,
1314 mapping: &HashMap<u64, Option<u64>>,
1315 ) -> Result<RecordBatch>;
1316}
1317
1318struct EncodedBatch {
1319 stats: BatchStats,
1320 page_number: u32,
1321}
1322
1323async fn train_btree_page(
1324 batch: RecordBatch,
1325 batch_idx: u32,
1326 sub_index_trainer: &dyn BTreeSubIndex,
1327 writer: &mut dyn IndexWriter,
1328) -> Result<EncodedBatch> {
1329 let stats = analyze_batch(&batch)?;
1330 let trained = sub_index_trainer.train(batch).await?;
1331 writer.write_record_batch(trained).await?;
1332 Ok(EncodedBatch {
1333 stats,
1334 page_number: batch_idx,
1335 })
1336}
1337
1338fn btree_stats_as_batch(stats: Vec<EncodedBatch>, value_type: &DataType) -> Result<RecordBatch> {
1339 let mins = if stats.is_empty() {
1340 new_empty_array(value_type)
1341 } else {
1342 ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.min.clone()))?
1343 };
1344 let maxs = if stats.is_empty() {
1345 new_empty_array(value_type)
1346 } else {
1347 ScalarValue::iter_to_array(stats.iter().map(|stat| stat.stats.max.clone()))?
1348 };
1349 let null_counts = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.stats.null_count));
1350 let page_numbers = UInt32Array::from_iter_values(stats.iter().map(|stat| stat.page_number));
1351
1352 let schema = Arc::new(Schema::new(vec![
1353 Field::new("min", mins.data_type().clone(), true),
1355 Field::new("max", maxs.data_type().clone(), true),
1356 Field::new("null_count", null_counts.data_type().clone(), false),
1357 Field::new("page_idx", page_numbers.data_type().clone(), false),
1358 ]));
1359
1360 let columns = vec![
1361 mins,
1362 maxs,
1363 Arc::new(null_counts) as Arc<dyn Array>,
1364 Arc::new(page_numbers) as Arc<dyn Array>,
1365 ];
1366
1367 Ok(RecordBatch::try_new(schema, columns)?)
1368}
1369
1370pub async fn train_btree_index(
1376 batches_source: SendableRecordBatchStream,
1377 sub_index_trainer: &dyn BTreeSubIndex,
1378 index_store: &dyn IndexStore,
1379 batch_size: u64,
1380 fragment_ids: Option<Vec<u32>>,
1381) -> Result<()> {
1382 let fragment_mask = fragment_ids.as_ref().and_then(|frag_ids| {
1383 if !frag_ids.is_empty() {
1384 Some((frag_ids[0] as u64) << 32)
1388 } else {
1389 None
1390 }
1391 });
1392
1393 let mut sub_index_file;
1394 if fragment_mask.is_none() {
1395 sub_index_file = index_store
1396 .new_index_file(BTREE_PAGES_NAME, sub_index_trainer.schema().clone())
1397 .await?;
1398 } else {
1399 sub_index_file = index_store
1400 .new_index_file(
1401 part_page_data_file_path(fragment_mask.unwrap()).as_str(),
1402 sub_index_trainer.schema().clone(),
1403 )
1404 .await?;
1405 }
1406
1407 let mut encoded_batches = Vec::new();
1408 let mut batch_idx = 0;
1409
1410 let value_type = batches_source
1411 .schema()
1412 .field_with_name(VALUE_COLUMN_NAME)?
1413 .data_type()
1414 .clone();
1415
1416 let mut batches_source = chunk_concat_stream(batches_source, batch_size as usize);
1417
1418 while let Some(batch) = batches_source.try_next().await? {
1419 encoded_batches.push(
1420 train_btree_page(batch, batch_idx, sub_index_trainer, sub_index_file.as_mut()).await?,
1421 );
1422 batch_idx += 1;
1423 }
1424 sub_index_file.finish().await?;
1425 let record_batch = btree_stats_as_batch(encoded_batches, &value_type)?;
1426 let mut file_schema = record_batch.schema().as_ref().clone();
1427 file_schema
1428 .metadata
1429 .insert(BATCH_SIZE_META_KEY.to_string(), batch_size.to_string());
1430 let mut btree_index_file;
1431 if fragment_mask.is_none() {
1432 btree_index_file = index_store
1433 .new_index_file(BTREE_LOOKUP_NAME, Arc::new(file_schema))
1434 .await?;
1435 } else {
1436 btree_index_file = index_store
1437 .new_index_file(
1438 part_lookup_file_path(fragment_mask.unwrap()).as_str(),
1439 Arc::new(file_schema),
1440 )
1441 .await?;
1442 }
1443 btree_index_file.write_record_batch(record_batch).await?;
1444 btree_index_file.finish().await?;
1445 Ok(())
1446}
1447
1448pub async fn merge_index_files(
1449 object_store: &ObjectStore,
1450 index_dir: &Path,
1451 store: Arc<dyn IndexStore>,
1452 batch_readhead: Option<usize>,
1453) -> Result<()> {
1454 let (part_page_files, part_lookup_files) =
1456 list_page_lookup_files(object_store, index_dir).await?;
1457 merge_metadata_files(store, &part_page_files, &part_lookup_files, batch_readhead).await
1458}
1459
1460async fn list_page_lookup_files(
1463 object_store: &ObjectStore,
1464 index_dir: &Path,
1465) -> Result<(Vec<String>, Vec<String>)> {
1466 let mut part_page_files = Vec::new();
1467 let mut part_lookup_files = Vec::new();
1468
1469 let mut list_stream = object_store.list(Some(index_dir.clone()));
1470
1471 while let Some(item) = list_stream.next().await {
1472 match item {
1473 Ok(meta) => {
1474 let file_name = meta.location.filename().unwrap_or_default();
1475 if file_name.starts_with("part_") && file_name.ends_with("_page_data.lance") {
1477 part_page_files.push(file_name.to_string());
1478 }
1479 if file_name.starts_with("part_") && file_name.ends_with("_page_lookup.lance") {
1481 part_lookup_files.push(file_name.to_string());
1482 }
1483 }
1484 Err(_) => continue,
1485 }
1486 }
1487
1488 if part_page_files.is_empty() || part_lookup_files.is_empty() {
1489 return Err(Error::Internal {
1490 message: format!(
1491 "No partition metadata files found in index directory: {} (page_files: {}, lookup_files: {})",
1492 index_dir, part_page_files.len(), part_lookup_files.len()
1493 ),
1494 location: location!(),
1495 });
1496 }
1497
1498 Ok((part_page_files, part_lookup_files))
1499}
1500
1501async fn merge_metadata_files(
1506 store: Arc<dyn IndexStore>,
1507 part_page_files: &[String],
1508 part_lookup_files: &[String],
1509 batch_readhead: Option<usize>,
1510) -> Result<()> {
1511 if part_lookup_files.is_empty() || part_page_files.is_empty() {
1512 return Err(Error::Internal {
1513 message: "No partition files provided for merging".to_string(),
1514 location: location!(),
1515 });
1516 }
1517
1518 if part_lookup_files.len() != part_page_files.len() {
1520 return Err(Error::Internal {
1521 message: format!(
1522 "Number of partition lookup files ({}) does not match number of partition page files ({})",
1523 part_lookup_files.len(),
1524 part_page_files.len()
1525 ),
1526 location: location!(),
1527 });
1528 }
1529 let mut page_files_map = HashMap::new();
1530 for page_file in part_page_files {
1531 let partition_id = extract_partition_id(page_file)?;
1532 page_files_map.insert(partition_id, page_file);
1533 }
1534
1535 for lookup_file in part_lookup_files {
1537 let partition_id = extract_partition_id(lookup_file)?;
1538 if !page_files_map.contains_key(&partition_id) {
1539 return Err(Error::Internal {
1540 message: format!(
1541 "No corresponding page file found for lookup file: {} (partition_id: {})",
1542 lookup_file, partition_id
1543 ),
1544 location: location!(),
1545 });
1546 }
1547 }
1548
1549 let first_lookup_reader = store.open_index_file(&part_lookup_files[0]).await?;
1551 let batch_size = first_lookup_reader
1552 .schema()
1553 .metadata
1554 .get(BATCH_SIZE_META_KEY)
1555 .map(|bs| bs.parse().unwrap_or(DEFAULT_BTREE_BATCH_SIZE))
1556 .unwrap_or(DEFAULT_BTREE_BATCH_SIZE);
1557
1558 let value_type = first_lookup_reader
1560 .schema()
1561 .fields
1562 .first()
1563 .unwrap()
1564 .data_type();
1565
1566 let partition_id = extract_partition_id(part_lookup_files[0].as_str())?;
1568 let page_file = page_files_map.get(&partition_id).unwrap();
1569 let page_reader = store.open_index_file(page_file).await?;
1570 let page_schema = page_reader.schema().clone();
1571
1572 let arrow_schema = Arc::new(Schema::from(&page_schema));
1573 let mut page_file = store
1574 .new_index_file(BTREE_PAGES_NAME, arrow_schema.clone())
1575 .await?;
1576
1577 let lookup_entries = merge_pages(
1579 part_lookup_files,
1580 &page_files_map,
1581 &store,
1582 batch_size,
1583 &mut page_file,
1584 arrow_schema.clone(),
1585 batch_readhead,
1586 )
1587 .await?;
1588
1589 page_file.finish().await?;
1590
1591 let mut metadata = HashMap::new();
1594 metadata.insert(BATCH_SIZE_META_KEY.to_string(), batch_size.to_string());
1595
1596 let lookup_schema_with_metadata = Arc::new(Schema::new_with_metadata(
1597 vec![
1598 Field::new("min", value_type.clone(), true),
1599 Field::new("max", value_type, true),
1600 Field::new("null_count", DataType::UInt32, false),
1601 Field::new("page_idx", DataType::UInt32, false),
1602 ],
1603 metadata,
1604 ));
1605
1606 let lookup_batch = RecordBatch::try_new(
1607 lookup_schema_with_metadata.clone(),
1608 vec![
1609 ScalarValue::iter_to_array(lookup_entries.iter().map(|(min, _, _, _)| min.clone()))?,
1610 ScalarValue::iter_to_array(lookup_entries.iter().map(|(_, max, _, _)| max.clone()))?,
1611 Arc::new(UInt32Array::from_iter_values(
1612 lookup_entries
1613 .iter()
1614 .map(|(_, _, null_count, _)| *null_count),
1615 )),
1616 Arc::new(UInt32Array::from_iter_values(
1617 lookup_entries.iter().map(|(_, _, _, page_idx)| *page_idx),
1618 )),
1619 ],
1620 )?;
1621
1622 let mut lookup_file = store
1623 .new_index_file(BTREE_LOOKUP_NAME, lookup_schema_with_metadata)
1624 .await?;
1625 lookup_file.write_record_batch(lookup_batch).await?;
1626 lookup_file.finish().await?;
1627
1628 cleanup_partition_files(&store, part_lookup_files, part_page_files).await;
1631
1632 Ok(())
1633}
1634
1635async fn merge_pages(
1638 part_lookup_files: &[String],
1639 page_files_map: &HashMap<u64, &String>,
1640 store: &Arc<dyn IndexStore>,
1641 batch_size: u64,
1642 page_file: &mut Box<dyn IndexWriter>,
1643 arrow_schema: Arc<Schema>,
1644 batch_readhead: Option<usize>,
1645) -> Result<Vec<(ScalarValue, ScalarValue, u32, u32)>> {
1646 let mut lookup_entries = Vec::new();
1647 let mut page_idx = 0u32;
1648
1649 debug!(
1650 "Starting SortPreservingMerge with {} partitions",
1651 part_lookup_files.len()
1652 );
1653
1654 let value_field = arrow_schema.field(0).clone().with_name(VALUE_COLUMN_NAME);
1655 let row_id_field = arrow_schema.field(1).clone().with_name(ROW_ID);
1656 let stream_schema = Arc::new(Schema::new(vec![value_field, row_id_field]));
1657
1658 let mut inputs: Vec<Arc<dyn ExecutionPlan>> = Vec::new();
1660 for lookup_file in part_lookup_files {
1661 let partition_id = extract_partition_id(lookup_file)?;
1662 let page_file_name =
1663 (*page_files_map
1664 .get(&partition_id)
1665 .ok_or_else(|| Error::Internal {
1666 message: format!("Page file not found for partition ID: {}", partition_id),
1667 location: location!(),
1668 })?)
1669 .clone();
1670
1671 let reader = store.open_index_file(&page_file_name).await?;
1672
1673 let reader_stream = IndexReaderStream::new(reader, batch_size).await;
1674
1675 let stream = reader_stream
1676 .map(|fut| fut.map_err(DataFusionError::from))
1677 .buffered(batch_readhead.unwrap_or(1))
1678 .boxed();
1679
1680 let sendable_stream =
1681 Box::pin(RecordBatchStreamAdapter::new(stream_schema.clone(), stream));
1682 inputs.push(Arc::new(OneShotExec::new(sendable_stream)));
1683 }
1684
1685 let union_inputs = Arc::new(UnionExec::new(inputs));
1687
1688 let value_column_index = stream_schema.index_of(VALUE_COLUMN_NAME)?;
1690 let sort_expr = PhysicalSortExpr {
1691 expr: Arc::new(Column::new(VALUE_COLUMN_NAME, value_column_index)),
1692 options: SortOptions {
1693 descending: false,
1694 nulls_first: true,
1695 },
1696 };
1697
1698 let merge_exec = Arc::new(SortPreservingMergeExec::new(
1699 [sort_expr].into(),
1700 union_inputs,
1701 ));
1702
1703 let unchunked = execute_plan(
1704 merge_exec,
1705 LanceExecutionOptions {
1706 use_spilling: false,
1707 ..Default::default()
1708 },
1709 )?;
1710
1711 let mut chunked_stream = chunk_concat_stream(unchunked, batch_size as usize);
1713
1714 while let Some(batch) = chunked_stream.try_next().await? {
1716 let writer_batch = RecordBatch::try_new(
1717 arrow_schema.clone(),
1718 vec![batch.column(0).clone(), batch.column(1).clone()],
1719 )?;
1720
1721 page_file.write_record_batch(writer_batch).await?;
1722
1723 let min_val = ScalarValue::try_from_array(batch.column(0), 0)?;
1724 let max_val = ScalarValue::try_from_array(batch.column(0), batch.num_rows() - 1)?;
1725 let null_count = batch.column(0).null_count() as u32;
1726
1727 lookup_entries.push((min_val, max_val, null_count, page_idx));
1728 page_idx += 1;
1729 }
1730
1731 Ok(lookup_entries)
1732}
1733
1734fn extract_partition_id(filename: &str) -> Result<u64> {
1737 if !filename.starts_with("part_") {
1738 return Err(Error::Internal {
1739 message: format!("Invalid partition file name format: {}", filename),
1740 location: location!(),
1741 });
1742 }
1743
1744 let parts: Vec<&str> = filename.split('_').collect();
1745 if parts.len() < 3 {
1746 return Err(Error::Internal {
1747 message: format!("Invalid partition file name format: {}", filename),
1748 location: location!(),
1749 });
1750 }
1751
1752 parts[1].parse::<u64>().map_err(|_| Error::Internal {
1753 message: format!("Failed to parse partition ID from filename: {}", filename),
1754 location: location!(),
1755 })
1756}
1757
1758async fn cleanup_partition_files(
1763 store: &Arc<dyn IndexStore>,
1764 part_lookup_files: &[String],
1765 part_page_files: &[String],
1766) {
1767 for file_name in part_lookup_files {
1769 cleanup_single_file(
1770 store,
1771 file_name,
1772 "part_",
1773 "_page_lookup.lance",
1774 "partition lookup",
1775 )
1776 .await;
1777 }
1778
1779 for file_name in part_page_files {
1781 cleanup_single_file(
1782 store,
1783 file_name,
1784 "part_",
1785 "_page_data.lance",
1786 "partition page",
1787 )
1788 .await;
1789 }
1790}
1791
1792async fn cleanup_single_file(
1796 store: &Arc<dyn IndexStore>,
1797 file_name: &str,
1798 expected_prefix: &str,
1799 expected_suffix: &str,
1800 file_type: &str,
1801) {
1802 if file_name.starts_with(expected_prefix) && file_name.ends_with(expected_suffix) {
1803 match store.delete_index_file(file_name).await {
1804 Ok(()) => {
1805 debug!("Successfully deleted {} file: {}", file_type, file_name);
1806 }
1807 Err(e) => {
1808 warn!(
1809 "Failed to delete {} file '{}': {}. \
1810 This does not affect the merge operation, but may leave \
1811 partition files that should be cleaned up manually.",
1812 file_type, file_name, e
1813 );
1814 }
1815 }
1816 } else {
1817 warn!(
1819 "Skipping deletion of file '{}' as it does not match the expected \
1820 {} file pattern ({}*{})",
1821 file_name, file_type, expected_prefix, expected_suffix
1822 );
1823 }
1824}
1825
1826pub(crate) fn part_page_data_file_path(partition_id: u64) -> String {
1827 format!("part_{}_{}", partition_id, BTREE_PAGES_NAME)
1828}
1829
1830pub(crate) fn part_lookup_file_path(partition_id: u64) -> String {
1831 format!("part_{}_{}", partition_id, BTREE_LOOKUP_NAME)
1832}
1833
1834struct IndexReaderStream {
1838 reader: Arc<dyn IndexReader>,
1839 batch_size: u64,
1840 num_batches: u32,
1841 batch_idx: u32,
1842}
1843
1844impl IndexReaderStream {
1845 async fn new(reader: Arc<dyn IndexReader>, batch_size: u64) -> Self {
1846 let num_batches = reader.num_batches(batch_size).await;
1847 Self {
1848 reader,
1849 batch_size,
1850 num_batches,
1851 batch_idx: 0,
1852 }
1853 }
1854}
1855
1856impl Stream for IndexReaderStream {
1857 type Item = BoxFuture<'static, Result<RecordBatch>>;
1858
1859 fn poll_next(
1860 self: std::pin::Pin<&mut Self>,
1861 _cx: &mut std::task::Context<'_>,
1862 ) -> std::task::Poll<Option<Self::Item>> {
1863 let this = self.get_mut();
1864 if this.batch_idx >= this.num_batches {
1865 return std::task::Poll::Ready(None);
1866 }
1867 let batch_num = this.batch_idx;
1868 this.batch_idx += 1;
1869 let reader_copy = this.reader.clone();
1870 let batch_size = this.batch_size;
1871 let read_task = async move {
1872 reader_copy
1873 .read_record_batch(batch_num as u64, batch_size)
1874 .await
1875 }
1876 .boxed();
1877 std::task::Poll::Ready(Some(read_task))
1878 }
1879}
1880
1881#[derive(Debug, Serialize, Deserialize)]
1883pub struct BTreeParameters {
1884 pub zone_size: Option<u64>,
1886}
1887
1888struct BTreeTrainingRequest {
1889 parameters: BTreeParameters,
1890 criteria: TrainingCriteria,
1891}
1892
1893impl BTreeTrainingRequest {
1894 pub fn new(parameters: BTreeParameters) -> Self {
1895 Self {
1896 parameters,
1897 criteria: TrainingCriteria::new(TrainingOrdering::Values).with_row_id(),
1899 }
1900 }
1901}
1902
1903impl TrainingRequest for BTreeTrainingRequest {
1904 fn as_any(&self) -> &dyn std::any::Any {
1905 self
1906 }
1907
1908 fn criteria(&self) -> &TrainingCriteria {
1909 &self.criteria
1910 }
1911}
1912
1913#[derive(Debug, Default)]
1914pub struct BTreeIndexPlugin;
1915
1916#[async_trait]
1917impl ScalarIndexPlugin for BTreeIndexPlugin {
1918 fn name(&self) -> &str {
1919 "BTree"
1920 }
1921
1922 fn new_training_request(
1923 &self,
1924 params: &str,
1925 field: &Field,
1926 ) -> Result<Box<dyn TrainingRequest>> {
1927 if field.data_type().is_nested() {
1928 return Err(Error::InvalidInput {
1929 source: "A btree index can only be created on a non-nested field.".into(),
1930 location: location!(),
1931 });
1932 }
1933
1934 let params = serde_json::from_str::<BTreeParameters>(params)?;
1935 Ok(Box::new(BTreeTrainingRequest::new(params)))
1936 }
1937
1938 fn provides_exact_answer(&self) -> bool {
1939 true
1940 }
1941
1942 fn version(&self) -> u32 {
1943 BTREE_INDEX_VERSION
1944 }
1945
1946 fn new_query_parser(
1947 &self,
1948 index_name: String,
1949 _index_details: &prost_types::Any,
1950 ) -> Option<Box<dyn ScalarQueryParser>> {
1951 Some(Box::new(SargableQueryParser::new(index_name, false)))
1952 }
1953
1954 async fn train_index(
1955 &self,
1956 data: SendableRecordBatchStream,
1957 index_store: &dyn IndexStore,
1958 request: Box<dyn TrainingRequest>,
1959 fragment_ids: Option<Vec<u32>>,
1960 ) -> Result<CreatedIndex> {
1961 let request = request
1962 .as_any()
1963 .downcast_ref::<BTreeTrainingRequest>()
1964 .unwrap();
1965 let value_type = data
1966 .schema()
1967 .field_with_name(VALUE_COLUMN_NAME)?
1968 .data_type()
1969 .clone();
1970 let flat_index_trainer = FlatIndexMetadata::new(value_type);
1971 train_btree_index(
1972 data,
1973 &flat_index_trainer,
1974 index_store,
1975 request
1976 .parameters
1977 .zone_size
1978 .unwrap_or(DEFAULT_BTREE_BATCH_SIZE),
1979 fragment_ids,
1980 )
1981 .await?;
1982 Ok(CreatedIndex {
1983 index_details: prost_types::Any::from_msg(&pbold::BTreeIndexDetails::default())
1984 .unwrap(),
1985 index_version: BTREE_INDEX_VERSION,
1986 })
1987 }
1988
1989 async fn load_index(
1990 &self,
1991 index_store: Arc<dyn IndexStore>,
1992 _index_details: &prost_types::Any,
1993 frag_reuse_index: Option<Arc<FragReuseIndex>>,
1994 cache: &LanceCache,
1995 ) -> Result<Arc<dyn ScalarIndex>> {
1996 Ok(BTreeIndex::load(index_store, frag_reuse_index, cache).await? as Arc<dyn ScalarIndex>)
1997 }
1998}
1999
2000#[cfg(test)]
2001mod tests {
2002 use std::sync::atomic::Ordering;
2003 use std::{collections::HashMap, sync::Arc};
2004
2005 use arrow::datatypes::{Float32Type, Float64Type, Int32Type, UInt64Type};
2006 use arrow_array::FixedSizeListArray;
2007 use arrow_schema::DataType;
2008 use datafusion::{
2009 execution::{SendableRecordBatchStream, TaskContext},
2010 physical_plan::{sorts::sort::SortExec, stream::RecordBatchStreamAdapter, ExecutionPlan},
2011 };
2012 use datafusion_common::{DataFusionError, ScalarValue};
2013 use datafusion_physical_expr::{expressions::col, PhysicalSortExpr};
2014 use deepsize::DeepSizeOf;
2015 use futures::TryStreamExt;
2016 use lance_core::utils::tempfile::TempObjDir;
2017 use lance_core::{cache::LanceCache, utils::mask::RowIdTreeMap};
2018 use lance_datafusion::{chunker::break_stream, datagen::DatafusionDatagenExt};
2019 use lance_datagen::{array, gen_batch, ArrayGeneratorExt, BatchCount, RowCount};
2020 use lance_io::object_store::ObjectStore;
2021
2022 use crate::metrics::LocalMetricsCollector;
2023 use crate::{
2024 metrics::NoOpMetricsCollector,
2025 scalar::{
2026 btree::{BTreeIndex, BTREE_PAGES_NAME},
2027 flat::FlatIndexMetadata,
2028 lance_format::LanceIndexStore,
2029 IndexStore, SargableQuery, ScalarIndex, SearchResult,
2030 },
2031 };
2032
2033 use super::{
2034 part_lookup_file_path, part_page_data_file_path, train_btree_index, OrderableScalarValue,
2035 DEFAULT_BTREE_BATCH_SIZE,
2036 };
2037 #[test]
2038 fn test_scalar_value_size() {
2039 let size_of_i32 = OrderableScalarValue(ScalarValue::Int32(Some(0))).deep_size_of();
2040 let size_of_many_i32 = OrderableScalarValue(ScalarValue::FixedSizeList(Arc::new(
2041 FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
2042 vec![Some(vec![Some(0); 128])],
2043 128,
2044 ),
2045 )))
2046 .deep_size_of();
2047
2048 assert!(size_of_i32 > 4);
2050 assert!(size_of_many_i32 > 128 * 4);
2051 }
2052
2053 #[tokio::test]
2054 async fn test_null_ids() {
2055 let tmpdir = TempObjDir::default();
2056 let test_store = Arc::new(LanceIndexStore::new(
2057 Arc::new(ObjectStore::local()),
2058 tmpdir.clone(),
2059 Arc::new(LanceCache::no_cache()),
2060 ));
2061
2062 let stream = gen_batch()
2064 .col(
2065 "value",
2066 array::rand::<Float32Type>().with_nulls(&[true, false, false, false, false]),
2067 )
2068 .col("_rowid", array::step::<UInt64Type>())
2069 .into_df_stream(RowCount::from(5000), BatchCount::from(10));
2070 let sub_index_trainer = FlatIndexMetadata::new(DataType::Float32);
2071
2072 train_btree_index(stream, &sub_index_trainer, test_store.as_ref(), 5000, None)
2073 .await
2074 .unwrap();
2075
2076 let index = BTreeIndex::load(test_store.clone(), None, &LanceCache::no_cache())
2077 .await
2078 .unwrap();
2079
2080 assert_eq!(index.page_lookup.null_pages.len(), 10);
2081
2082 let remap_dir = TempObjDir::default();
2083 let remap_store = Arc::new(LanceIndexStore::new(
2084 Arc::new(ObjectStore::local()),
2085 remap_dir.clone(),
2086 Arc::new(LanceCache::no_cache()),
2087 ));
2088
2089 index
2091 .remap(&HashMap::default(), remap_store.as_ref())
2092 .await
2093 .unwrap();
2094
2095 let remap_index = BTreeIndex::load(remap_store.clone(), None, &LanceCache::no_cache())
2096 .await
2097 .unwrap();
2098
2099 assert_eq!(remap_index.page_lookup, index.page_lookup);
2100
2101 let original_pages = test_store.open_index_file(BTREE_PAGES_NAME).await.unwrap();
2102 let remapped_pages = remap_store.open_index_file(BTREE_PAGES_NAME).await.unwrap();
2103
2104 assert_eq!(original_pages.num_rows(), remapped_pages.num_rows());
2105
2106 let original_data = original_pages
2107 .read_record_batch(0, original_pages.num_rows() as u64)
2108 .await
2109 .unwrap();
2110 let remapped_data = remapped_pages
2111 .read_record_batch(0, remapped_pages.num_rows() as u64)
2112 .await
2113 .unwrap();
2114
2115 assert_eq!(original_data, remapped_data);
2116 }
2117
2118 #[tokio::test]
2119 async fn test_nan_ordering() {
2120 let tmpdir = TempObjDir::default();
2121 let test_store = Arc::new(LanceIndexStore::new(
2122 Arc::new(ObjectStore::local()),
2123 tmpdir.clone(),
2124 Arc::new(LanceCache::no_cache()),
2125 ));
2126
2127 let values = vec![
2128 0.0,
2129 1.0,
2130 2.0,
2131 3.0,
2132 f64::NAN,
2133 f64::NEG_INFINITY,
2134 f64::INFINITY,
2135 ];
2136
2137 let data = gen_batch()
2141 .col("value", array::cycle::<Float64Type>(values.clone()))
2142 .col("_rowid", array::step::<UInt64Type>())
2143 .into_df_exec(RowCount::from(10), BatchCount::from(100));
2144 let schema = data.schema();
2145 let sort_expr = PhysicalSortExpr::new_default(col("value", schema.as_ref()).unwrap());
2146 let plan = Arc::new(SortExec::new([sort_expr].into(), data));
2147 let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap();
2148 let stream = break_stream(stream, 64);
2149 let stream = stream.map_err(DataFusionError::from);
2150 let stream =
2151 Box::pin(RecordBatchStreamAdapter::new(schema, stream)) as SendableRecordBatchStream;
2152
2153 let sub_index_trainer = FlatIndexMetadata::new(DataType::Float64);
2154
2155 train_btree_index(stream, &sub_index_trainer, test_store.as_ref(), 64, None)
2156 .await
2157 .unwrap();
2158
2159 let index = BTreeIndex::load(test_store, None, &LanceCache::no_cache())
2160 .await
2161 .unwrap();
2162
2163 for (idx, value) in values.into_iter().enumerate() {
2164 let query = SargableQuery::Equals(ScalarValue::Float64(Some(value)));
2165 let result = index.search(&query, &NoOpMetricsCollector).await.unwrap();
2166 assert_eq!(
2167 result,
2168 SearchResult::Exact(RowIdTreeMap::from_iter(((idx as u64)..1000).step_by(7)))
2169 );
2170 }
2171 }
2172
2173 #[tokio::test]
2174 async fn test_page_cache() {
2175 let tmpdir = TempObjDir::default();
2176 let test_store = Arc::new(LanceIndexStore::new(
2177 Arc::new(ObjectStore::local()),
2178 tmpdir.clone(),
2179 Arc::new(LanceCache::no_cache()),
2180 ));
2181
2182 let data = gen_batch()
2183 .col("value", array::step::<Float32Type>())
2184 .col("_rowid", array::step::<UInt64Type>())
2185 .into_df_exec(RowCount::from(1000), BatchCount::from(10));
2186 let schema = data.schema();
2187 let sort_expr = PhysicalSortExpr::new_default(col("value", schema.as_ref()).unwrap());
2188 let plan = Arc::new(SortExec::new([sort_expr].into(), data));
2189 let stream = plan.execute(0, Arc::new(TaskContext::default())).unwrap();
2190 let stream = break_stream(stream, 64);
2191 let stream = stream.map_err(DataFusionError::from);
2192 let stream =
2193 Box::pin(RecordBatchStreamAdapter::new(schema, stream)) as SendableRecordBatchStream;
2194 let sub_index_trainer = FlatIndexMetadata::new(DataType::Float32);
2195
2196 train_btree_index(stream, &sub_index_trainer, test_store.as_ref(), 64, None)
2197 .await
2198 .unwrap();
2199
2200 let cache = Arc::new(LanceCache::with_capacity(100 * 1024 * 1024));
2201 let index = BTreeIndex::load(test_store, None, cache.as_ref())
2202 .await
2203 .unwrap();
2204
2205 let query = SargableQuery::Equals(ScalarValue::Float32(Some(0.0)));
2206 let metrics = LocalMetricsCollector::default();
2207 let query1 = index.search(&query, &metrics);
2208 let query2 = index.search(&query, &metrics);
2209 tokio::join!(query1, query2).0.unwrap();
2210 assert_eq!(metrics.parts_loaded.load(Ordering::Relaxed), 1);
2211 }
2212
2213 #[tokio::test]
2214 async fn test_fragment_btree_index_consistency() {
2215 let full_tmpdir = TempObjDir::default();
2217 let full_store = Arc::new(LanceIndexStore::new(
2218 Arc::new(ObjectStore::local()),
2219 full_tmpdir.clone(),
2220 Arc::new(LanceCache::no_cache()),
2221 ));
2222
2223 let fragment_tmpdir = TempObjDir::default();
2224 let fragment_store = Arc::new(LanceIndexStore::new(
2225 Arc::new(ObjectStore::local()),
2226 fragment_tmpdir.clone(),
2227 Arc::new(LanceCache::no_cache()),
2228 ));
2229
2230 let sub_index_trainer = FlatIndexMetadata::new(DataType::Int32);
2231
2232 let total_count = 2 * DEFAULT_BTREE_BATCH_SIZE;
2235 let full_data_gen = gen_batch()
2236 .col("value", array::step::<Int32Type>())
2237 .col("_rowid", array::step::<UInt64Type>())
2238 .into_df_stream(RowCount::from(total_count / 2), BatchCount::from(2));
2239 let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
2240 full_data_gen.schema(),
2241 full_data_gen,
2242 ));
2243
2244 train_btree_index(
2245 full_data_source,
2246 &sub_index_trainer,
2247 full_store.as_ref(),
2248 DEFAULT_BTREE_BATCH_SIZE,
2249 None,
2250 )
2251 .await
2252 .unwrap();
2253
2254 let half_count = DEFAULT_BTREE_BATCH_SIZE;
2257 let fragment1_gen = gen_batch()
2258 .col("value", array::step::<Int32Type>())
2259 .col("_rowid", array::step::<UInt64Type>())
2260 .into_df_stream(RowCount::from(half_count), BatchCount::from(1));
2261 let fragment1_data_source = Box::pin(RecordBatchStreamAdapter::new(
2262 fragment1_gen.schema(),
2263 fragment1_gen,
2264 ));
2265
2266 train_btree_index(
2267 fragment1_data_source,
2268 &sub_index_trainer,
2269 fragment_store.as_ref(),
2270 DEFAULT_BTREE_BATCH_SIZE,
2271 Some(vec![1]), )
2273 .await
2274 .unwrap();
2275
2276 let start_val = DEFAULT_BTREE_BATCH_SIZE as i32;
2278 let end_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2279 let values_second_half: Vec<i32> = (start_val..end_val).collect();
2280 let row_ids_second_half: Vec<u64> = (start_val as u64..end_val as u64).collect();
2281 let fragment2_gen = gen_batch()
2282 .col("value", array::cycle::<Int32Type>(values_second_half))
2283 .col("_rowid", array::cycle::<UInt64Type>(row_ids_second_half))
2284 .into_df_stream(RowCount::from(half_count), BatchCount::from(1));
2285 let fragment2_data_source = Box::pin(RecordBatchStreamAdapter::new(
2286 fragment2_gen.schema(),
2287 fragment2_gen,
2288 ));
2289
2290 train_btree_index(
2291 fragment2_data_source,
2292 &sub_index_trainer,
2293 fragment_store.as_ref(),
2294 DEFAULT_BTREE_BATCH_SIZE,
2295 Some(vec![2]), )
2297 .await
2298 .unwrap();
2299
2300 let part_page_files = vec![
2302 part_page_data_file_path(1 << 32),
2303 part_page_data_file_path(2 << 32),
2304 ];
2305
2306 let part_lookup_files = vec![
2307 part_lookup_file_path(1 << 32),
2308 part_lookup_file_path(2 << 32),
2309 ];
2310
2311 super::merge_metadata_files(
2312 fragment_store.clone(),
2313 &part_page_files,
2314 &part_lookup_files,
2315 Option::from(1usize),
2316 )
2317 .await
2318 .unwrap();
2319
2320 let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
2322 .await
2323 .unwrap();
2324
2325 let merged_index = BTreeIndex::load(fragment_store.clone(), None, &LanceCache::no_cache())
2326 .await
2327 .unwrap();
2328
2329 let query_0 = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
2333 let full_result_0 = full_index
2334 .search(&query_0, &NoOpMetricsCollector)
2335 .await
2336 .unwrap();
2337 let merged_result_0 = merged_index
2338 .search(&query_0, &NoOpMetricsCollector)
2339 .await
2340 .unwrap();
2341 assert_eq!(full_result_0, merged_result_0, "Query for value 0 failed");
2342
2343 let mid_first_batch = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
2345 let query_mid_first = SargableQuery::Equals(ScalarValue::Int32(Some(mid_first_batch)));
2346 let full_result_mid_first = full_index
2347 .search(&query_mid_first, &NoOpMetricsCollector)
2348 .await
2349 .unwrap();
2350 let merged_result_mid_first = merged_index
2351 .search(&query_mid_first, &NoOpMetricsCollector)
2352 .await
2353 .unwrap();
2354 assert_eq!(
2355 full_result_mid_first, merged_result_mid_first,
2356 "Query for value {} failed",
2357 mid_first_batch
2358 );
2359
2360 let first_second_batch = DEFAULT_BTREE_BATCH_SIZE as i32;
2362 let query_first_second =
2363 SargableQuery::Equals(ScalarValue::Int32(Some(first_second_batch)));
2364 let full_result_first_second = full_index
2365 .search(&query_first_second, &NoOpMetricsCollector)
2366 .await
2367 .unwrap();
2368 let merged_result_first_second = merged_index
2369 .search(&query_first_second, &NoOpMetricsCollector)
2370 .await
2371 .unwrap();
2372 assert_eq!(
2373 full_result_first_second, merged_result_first_second,
2374 "Query for value {} failed",
2375 first_second_batch
2376 );
2377
2378 let mid_second_batch = (DEFAULT_BTREE_BATCH_SIZE + DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
2380 let query_mid_second = SargableQuery::Equals(ScalarValue::Int32(Some(mid_second_batch)));
2381
2382 let full_result_mid_second = full_index
2383 .search(&query_mid_second, &NoOpMetricsCollector)
2384 .await
2385 .unwrap();
2386 let merged_result_mid_second = merged_index
2387 .search(&query_mid_second, &NoOpMetricsCollector)
2388 .await
2389 .unwrap();
2390 assert_eq!(
2391 full_result_mid_second, merged_result_mid_second,
2392 "Query for value {} failed",
2393 mid_second_batch
2394 );
2395 }
2396
2397 #[tokio::test]
2398 async fn test_fragment_btree_index_boundary_queries() {
2399 let full_tmpdir = TempObjDir::default();
2401 let full_store = Arc::new(LanceIndexStore::new(
2402 Arc::new(ObjectStore::local()),
2403 full_tmpdir.clone(),
2404 Arc::new(LanceCache::no_cache()),
2405 ));
2406
2407 let fragment_tmpdir = TempObjDir::default();
2408 let fragment_store = Arc::new(LanceIndexStore::new(
2409 Arc::new(ObjectStore::local()),
2410 fragment_tmpdir.clone(),
2411 Arc::new(LanceCache::no_cache()),
2412 ));
2413
2414 let sub_index_trainer = FlatIndexMetadata::new(DataType::Int32);
2415
2416 let total_count = 3 * DEFAULT_BTREE_BATCH_SIZE;
2418
2419 let full_data_gen = gen_batch()
2421 .col("value", array::step::<Int32Type>())
2422 .col("_rowid", array::step::<UInt64Type>())
2423 .into_df_stream(RowCount::from(total_count / 3), BatchCount::from(3));
2424 let full_data_source = Box::pin(RecordBatchStreamAdapter::new(
2425 full_data_gen.schema(),
2426 full_data_gen,
2427 ));
2428
2429 train_btree_index(
2430 full_data_source,
2431 &sub_index_trainer,
2432 full_store.as_ref(),
2433 DEFAULT_BTREE_BATCH_SIZE,
2434 None,
2435 )
2436 .await
2437 .unwrap();
2438
2439 let fragment_size = DEFAULT_BTREE_BATCH_SIZE;
2442 let fragment1_gen = gen_batch()
2443 .col("value", array::step::<Int32Type>())
2444 .col("_rowid", array::step::<UInt64Type>())
2445 .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
2446 let fragment1_data_source = Box::pin(RecordBatchStreamAdapter::new(
2447 fragment1_gen.schema(),
2448 fragment1_gen,
2449 ));
2450
2451 train_btree_index(
2452 fragment1_data_source,
2453 &sub_index_trainer,
2454 fragment_store.as_ref(),
2455 DEFAULT_BTREE_BATCH_SIZE,
2456 Some(vec![1]),
2457 )
2458 .await
2459 .unwrap();
2460
2461 let start_val2 = DEFAULT_BTREE_BATCH_SIZE as i32;
2463 let end_val2 = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2464 let values_fragment2: Vec<i32> = (start_val2..end_val2).collect();
2465 let row_ids_fragment2: Vec<u64> = (start_val2 as u64..end_val2 as u64).collect();
2466 let fragment2_gen = gen_batch()
2467 .col("value", array::cycle::<Int32Type>(values_fragment2))
2468 .col("_rowid", array::cycle::<UInt64Type>(row_ids_fragment2))
2469 .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
2470 let fragment2_data_source = Box::pin(RecordBatchStreamAdapter::new(
2471 fragment2_gen.schema(),
2472 fragment2_gen,
2473 ));
2474
2475 train_btree_index(
2476 fragment2_data_source,
2477 &sub_index_trainer,
2478 fragment_store.as_ref(),
2479 DEFAULT_BTREE_BATCH_SIZE,
2480 Some(vec![2]),
2481 )
2482 .await
2483 .unwrap();
2484
2485 let start_val3 = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2487 let end_val3 = (3 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2488 let values_fragment3: Vec<i32> = (start_val3..end_val3).collect();
2489 let row_ids_fragment3: Vec<u64> = (start_val3 as u64..end_val3 as u64).collect();
2490 let fragment3_gen = gen_batch()
2491 .col("value", array::cycle::<Int32Type>(values_fragment3))
2492 .col("_rowid", array::cycle::<UInt64Type>(row_ids_fragment3))
2493 .into_df_stream(RowCount::from(fragment_size), BatchCount::from(1));
2494 let fragment3_data_source = Box::pin(RecordBatchStreamAdapter::new(
2495 fragment3_gen.schema(),
2496 fragment3_gen,
2497 ));
2498
2499 train_btree_index(
2500 fragment3_data_source,
2501 &sub_index_trainer,
2502 fragment_store.as_ref(),
2503 DEFAULT_BTREE_BATCH_SIZE,
2504 Some(vec![3]),
2505 )
2506 .await
2507 .unwrap();
2508
2509 let part_page_files = vec![
2511 part_page_data_file_path(1 << 32),
2512 part_page_data_file_path(2 << 32),
2513 part_page_data_file_path(3 << 32),
2514 ];
2515
2516 let part_lookup_files = vec![
2517 part_lookup_file_path(1 << 32),
2518 part_lookup_file_path(2 << 32),
2519 part_lookup_file_path(3 << 32),
2520 ];
2521
2522 super::merge_metadata_files(
2523 fragment_store.clone(),
2524 &part_page_files,
2525 &part_lookup_files,
2526 Option::from(1usize),
2527 )
2528 .await
2529 .unwrap();
2530
2531 let full_index = BTreeIndex::load(full_store.clone(), None, &LanceCache::no_cache())
2533 .await
2534 .unwrap();
2535
2536 let merged_index = BTreeIndex::load(fragment_store.clone(), None, &LanceCache::no_cache())
2537 .await
2538 .unwrap();
2539
2540 let query_min = SargableQuery::Equals(ScalarValue::Int32(Some(0)));
2544 let full_result_min = full_index
2545 .search(&query_min, &NoOpMetricsCollector)
2546 .await
2547 .unwrap();
2548 let merged_result_min = merged_index
2549 .search(&query_min, &NoOpMetricsCollector)
2550 .await
2551 .unwrap();
2552 assert_eq!(
2553 full_result_min, merged_result_min,
2554 "Query for minimum value 0 failed"
2555 );
2556
2557 let max_val = (3 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
2559 let query_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val)));
2560 let full_result_max = full_index
2561 .search(&query_max, &NoOpMetricsCollector)
2562 .await
2563 .unwrap();
2564 let merged_result_max = merged_index
2565 .search(&query_max, &NoOpMetricsCollector)
2566 .await
2567 .unwrap();
2568 assert_eq!(
2569 full_result_max, merged_result_max,
2570 "Query for maximum value {} failed",
2571 max_val
2572 );
2573
2574 let fragment1_last = (DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
2576 let query_frag1_last = SargableQuery::Equals(ScalarValue::Int32(Some(fragment1_last)));
2577 let full_result_frag1_last = full_index
2578 .search(&query_frag1_last, &NoOpMetricsCollector)
2579 .await
2580 .unwrap();
2581 let merged_result_frag1_last = merged_index
2582 .search(&query_frag1_last, &NoOpMetricsCollector)
2583 .await
2584 .unwrap();
2585 assert_eq!(
2586 full_result_frag1_last, merged_result_frag1_last,
2587 "Query for fragment 1 last value {} failed",
2588 fragment1_last
2589 );
2590
2591 let fragment2_first = DEFAULT_BTREE_BATCH_SIZE as i32;
2593 let query_frag2_first = SargableQuery::Equals(ScalarValue::Int32(Some(fragment2_first)));
2594 let full_result_frag2_first = full_index
2595 .search(&query_frag2_first, &NoOpMetricsCollector)
2596 .await
2597 .unwrap();
2598 let merged_result_frag2_first = merged_index
2599 .search(&query_frag2_first, &NoOpMetricsCollector)
2600 .await
2601 .unwrap();
2602 assert_eq!(
2603 full_result_frag2_first, merged_result_frag2_first,
2604 "Query for fragment 2 first value {} failed",
2605 fragment2_first
2606 );
2607
2608 let fragment2_last = (2 * DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
2610 let query_frag2_last = SargableQuery::Equals(ScalarValue::Int32(Some(fragment2_last)));
2611 let full_result_frag2_last = full_index
2612 .search(&query_frag2_last, &NoOpMetricsCollector)
2613 .await
2614 .unwrap();
2615 let merged_result_frag2_last = merged_index
2616 .search(&query_frag2_last, &NoOpMetricsCollector)
2617 .await
2618 .unwrap();
2619 assert_eq!(
2620 full_result_frag2_last, merged_result_frag2_last,
2621 "Query for fragment 2 last value {} failed",
2622 fragment2_last
2623 );
2624
2625 let fragment3_first = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2627 let query_frag3_first = SargableQuery::Equals(ScalarValue::Int32(Some(fragment3_first)));
2628 let full_result_frag3_first = full_index
2629 .search(&query_frag3_first, &NoOpMetricsCollector)
2630 .await
2631 .unwrap();
2632 let merged_result_frag3_first = merged_index
2633 .search(&query_frag3_first, &NoOpMetricsCollector)
2634 .await
2635 .unwrap();
2636 assert_eq!(
2637 full_result_frag3_first, merged_result_frag3_first,
2638 "Query for fragment 3 first value {} failed",
2639 fragment3_first
2640 );
2641
2642 let query_below_min = SargableQuery::Equals(ScalarValue::Int32(Some(-1)));
2646 let full_result_below = full_index
2647 .search(&query_below_min, &NoOpMetricsCollector)
2648 .await
2649 .unwrap();
2650 let merged_result_below = merged_index
2651 .search(&query_below_min, &NoOpMetricsCollector)
2652 .await
2653 .unwrap();
2654 assert_eq!(
2655 full_result_below, merged_result_below,
2656 "Query for value below minimum (-1) failed"
2657 );
2658
2659 let query_above_max = SargableQuery::Equals(ScalarValue::Int32(Some(max_val + 1)));
2661 let full_result_above = full_index
2662 .search(&query_above_max, &NoOpMetricsCollector)
2663 .await
2664 .unwrap();
2665 let merged_result_above = merged_index
2666 .search(&query_above_max, &NoOpMetricsCollector)
2667 .await
2668 .unwrap();
2669 assert_eq!(
2670 full_result_above,
2671 merged_result_above,
2672 "Query for value above maximum ({}) failed",
2673 max_val + 1
2674 );
2675
2676 let range_start = (DEFAULT_BTREE_BATCH_SIZE - 100) as i32;
2680 let range_end = (DEFAULT_BTREE_BATCH_SIZE + 100) as i32;
2681 let query_cross_frag = SargableQuery::Range(
2682 std::collections::Bound::Included(ScalarValue::Int32(Some(range_start))),
2683 std::collections::Bound::Excluded(ScalarValue::Int32(Some(range_end))),
2684 );
2685 let full_result_cross = full_index
2686 .search(&query_cross_frag, &NoOpMetricsCollector)
2687 .await
2688 .unwrap();
2689 let merged_result_cross = merged_index
2690 .search(&query_cross_frag, &NoOpMetricsCollector)
2691 .await
2692 .unwrap();
2693 assert_eq!(
2694 full_result_cross, merged_result_cross,
2695 "Cross-fragment range query [{}, {}] failed",
2696 range_start, range_end
2697 );
2698
2699 let single_frag_start = 100i32;
2701 let single_frag_end = 200i32;
2702 let query_single_frag = SargableQuery::Range(
2703 std::collections::Bound::Included(ScalarValue::Int32(Some(single_frag_start))),
2704 std::collections::Bound::Excluded(ScalarValue::Int32(Some(single_frag_end))),
2705 );
2706 let full_result_single = full_index
2707 .search(&query_single_frag, &NoOpMetricsCollector)
2708 .await
2709 .unwrap();
2710 let merged_result_single = merged_index
2711 .search(&query_single_frag, &NoOpMetricsCollector)
2712 .await
2713 .unwrap();
2714 assert_eq!(
2715 full_result_single, merged_result_single,
2716 "Single fragment range query [{}, {}] failed",
2717 single_frag_start, single_frag_end
2718 );
2719
2720 let large_range_start = 100i32;
2722 let large_range_end = (3 * DEFAULT_BTREE_BATCH_SIZE - 100) as i32;
2723 let query_large_range = SargableQuery::Range(
2724 std::collections::Bound::Included(ScalarValue::Int32(Some(large_range_start))),
2725 std::collections::Bound::Excluded(ScalarValue::Int32(Some(large_range_end))),
2726 );
2727 let full_result_large = full_index
2728 .search(&query_large_range, &NoOpMetricsCollector)
2729 .await
2730 .unwrap();
2731 let merged_result_large = merged_index
2732 .search(&query_large_range, &NoOpMetricsCollector)
2733 .await
2734 .unwrap();
2735 assert_eq!(
2736 full_result_large, merged_result_large,
2737 "Large range query [{}, {}] failed",
2738 large_range_start, large_range_end
2739 );
2740
2741 let lt_val = (DEFAULT_BTREE_BATCH_SIZE / 2) as i32;
2745 let query_lt = SargableQuery::Range(
2746 std::collections::Bound::Included(ScalarValue::Int32(Some(0))),
2747 std::collections::Bound::Excluded(ScalarValue::Int32(Some(lt_val))),
2748 );
2749 let full_result_lt = full_index
2750 .search(&query_lt, &NoOpMetricsCollector)
2751 .await
2752 .unwrap();
2753 let merged_result_lt = merged_index
2754 .search(&query_lt, &NoOpMetricsCollector)
2755 .await
2756 .unwrap();
2757 assert_eq!(
2758 full_result_lt, merged_result_lt,
2759 "Less than query (<{}) failed",
2760 lt_val
2761 );
2762
2763 let gt_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2765 let max_range_val = (3 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2766 let query_gt = SargableQuery::Range(
2767 std::collections::Bound::Excluded(ScalarValue::Int32(Some(gt_val))),
2768 std::collections::Bound::Excluded(ScalarValue::Int32(Some(max_range_val))),
2769 );
2770 let full_result_gt = full_index
2771 .search(&query_gt, &NoOpMetricsCollector)
2772 .await
2773 .unwrap();
2774 let merged_result_gt = merged_index
2775 .search(&query_gt, &NoOpMetricsCollector)
2776 .await
2777 .unwrap();
2778 assert_eq!(
2779 full_result_gt, merged_result_gt,
2780 "Greater than query (>{}) failed",
2781 gt_val
2782 );
2783
2784 let lte_val = (DEFAULT_BTREE_BATCH_SIZE - 1) as i32;
2786 let query_lte = SargableQuery::Range(
2787 std::collections::Bound::Included(ScalarValue::Int32(Some(0))),
2788 std::collections::Bound::Included(ScalarValue::Int32(Some(lte_val))),
2789 );
2790 let full_result_lte = full_index
2791 .search(&query_lte, &NoOpMetricsCollector)
2792 .await
2793 .unwrap();
2794 let merged_result_lte = merged_index
2795 .search(&query_lte, &NoOpMetricsCollector)
2796 .await
2797 .unwrap();
2798 assert_eq!(
2799 full_result_lte, merged_result_lte,
2800 "Less than or equal query (<={}) failed",
2801 lte_val
2802 );
2803
2804 let gte_val = (2 * DEFAULT_BTREE_BATCH_SIZE) as i32;
2806 let query_gte = SargableQuery::Range(
2807 std::collections::Bound::Included(ScalarValue::Int32(Some(gte_val))),
2808 std::collections::Bound::Excluded(ScalarValue::Int32(Some(max_range_val))),
2809 );
2810 let full_result_gte = full_index
2811 .search(&query_gte, &NoOpMetricsCollector)
2812 .await
2813 .unwrap();
2814 let merged_result_gte = merged_index
2815 .search(&query_gte, &NoOpMetricsCollector)
2816 .await
2817 .unwrap();
2818 assert_eq!(
2819 full_result_gte, merged_result_gte,
2820 "Greater than or equal query (>={}) failed",
2821 gte_val
2822 );
2823 }
2824
2825 #[test]
2826 fn test_extract_partition_id() {
2827 assert_eq!(
2829 super::extract_partition_id("part_123_page_data.lance").unwrap(),
2830 123
2831 );
2832 assert_eq!(
2833 super::extract_partition_id("part_456_page_lookup.lance").unwrap(),
2834 456
2835 );
2836 assert_eq!(
2837 super::extract_partition_id("part_4294967296_page_data.lance").unwrap(),
2838 4294967296
2839 );
2840
2841 assert!(super::extract_partition_id("invalid_filename.lance").is_err());
2843 assert!(super::extract_partition_id("part_abc_page_data.lance").is_err());
2844 assert!(super::extract_partition_id("part_123").is_err());
2845 assert!(super::extract_partition_id("part_").is_err());
2846 }
2847
2848 #[tokio::test]
2849 async fn test_cleanup_partition_files() {
2850 let tmpdir = TempObjDir::default();
2852 let test_store: Arc<dyn crate::scalar::IndexStore> = Arc::new(LanceIndexStore::new(
2853 Arc::new(ObjectStore::local()),
2854 tmpdir.clone(),
2855 Arc::new(LanceCache::no_cache()),
2856 ));
2857
2858 let lookup_files = vec![
2860 "part_123_page_lookup.lance".to_string(),
2861 "invalid_lookup_file.lance".to_string(),
2862 "part_456_page_lookup.lance".to_string(),
2863 ];
2864
2865 let page_files = vec![
2866 "part_123_page_data.lance".to_string(),
2867 "invalid_page_file.lance".to_string(),
2868 "part_456_page_data.lance".to_string(),
2869 ];
2870
2871 super::cleanup_partition_files(&test_store, &lookup_files, &page_files).await;
2874 }
2875}