Skip to main content

lance_index/
frag_reuse.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use crate::{Index, IndexType};
5use arrow_array::cast::AsArray;
6use arrow_array::types::UInt64Type;
7use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt64Array};
8use async_trait::async_trait;
9use deepsize::{Context, DeepSizeOf};
10use itertools::Itertools;
11use lance_core::utils::mask::RowAddrTreeMap;
12use lance_core::{Error, Result};
13use lance_table::format::pb::fragment_reuse_index_details::InlineContent;
14use lance_table::format::{ExternalFile, Fragment, pb};
15use roaring::{RoaringBitmap, RoaringTreemap};
16use serde::{Deserialize, Serialize};
17use std::{any::Any, collections::HashMap, sync::Arc};
18use uuid::Uuid;
19
20pub const FRAG_REUSE_INDEX_NAME: &str = "__lance_frag_reuse";
21pub const FRAG_REUSE_DETAILS_FILE_NAME: &str = "details.binpb";
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
24pub struct FragDigest {
25    pub id: u64,
26    pub physical_rows: usize,
27    pub num_deleted_rows: usize,
28}
29
30impl From<&FragDigest> for pb::fragment_reuse_index_details::FragmentDigest {
31    fn from(digest: &FragDigest) -> Self {
32        Self {
33            id: digest.id,
34            physical_rows: digest.physical_rows as u64,
35            num_deleted_rows: digest.num_deleted_rows as u64,
36        }
37    }
38}
39
40impl From<&Fragment> for FragDigest {
41    fn from(fragment: &Fragment) -> Self {
42        Self {
43            id: fragment.id,
44            physical_rows: fragment
45                .physical_rows
46                .expect("Fragment doesn't have physical rows recorded"),
47            num_deleted_rows: fragment
48                .deletion_file
49                .as_ref()
50                .and_then(|d| d.num_deleted_rows)
51                .unwrap_or(0),
52        }
53    }
54}
55
56impl TryFrom<pb::fragment_reuse_index_details::FragmentDigest> for FragDigest {
57    type Error = Error;
58
59    fn try_from(digest: pb::fragment_reuse_index_details::FragmentDigest) -> Result<Self> {
60        Ok(Self {
61            id: digest.id,
62            physical_rows: digest.physical_rows as usize,
63            num_deleted_rows: digest.num_deleted_rows as usize,
64        })
65    }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
69pub struct FragReuseGroup {
70    pub changed_row_addrs: Vec<u8>,
71    pub old_frags: Vec<FragDigest>,
72    pub new_frags: Vec<FragDigest>,
73}
74
75impl From<&FragReuseGroup> for pb::fragment_reuse_index_details::Group {
76    fn from(group: &FragReuseGroup) -> Self {
77        Self {
78            changed_row_addrs: group.changed_row_addrs.clone(),
79            old_fragments: group.old_frags.iter().map(|f| f.into()).collect(),
80            new_fragments: group.new_frags.iter().map(|f| f.into()).collect(),
81        }
82    }
83}
84
85impl TryFrom<pb::fragment_reuse_index_details::Group> for FragReuseGroup {
86    type Error = Error;
87
88    fn try_from(group: pb::fragment_reuse_index_details::Group) -> Result<Self> {
89        Ok(Self {
90            changed_row_addrs: group.changed_row_addrs,
91            old_frags: group
92                .old_fragments
93                .into_iter()
94                .map(FragDigest::try_from)
95                .collect::<Result<_>>()?,
96            new_frags: group
97                .new_fragments
98                .into_iter()
99                .map(FragDigest::try_from)
100                .collect::<Result<_>>()?,
101        })
102    }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
106pub struct FragReuseVersion {
107    pub dataset_version: u64,
108    pub groups: Vec<FragReuseGroup>,
109}
110
111impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version {
112    fn from(version: &FragReuseVersion) -> Self {
113        Self {
114            dataset_version: version.dataset_version,
115            groups: version.groups.iter().map(|g| g.into()).collect(),
116        }
117    }
118}
119
120impl TryFrom<pb::fragment_reuse_index_details::Version> for FragReuseVersion {
121    type Error = Error;
122
123    fn try_from(version: pb::fragment_reuse_index_details::Version) -> Result<Self> {
124        Ok(Self {
125            dataset_version: version.dataset_version,
126            groups: version
127                .groups
128                .into_iter()
129                .map(FragReuseGroup::try_from)
130                .collect::<Result<_>>()?,
131        })
132    }
133}
134
135impl FragReuseVersion {
136    pub fn old_frag_ids(&self) -> Vec<u64> {
137        self.groups
138            .iter()
139            .flat_map(|g| g.old_frags.iter().map(|f| f.id))
140            .collect::<Vec<_>>()
141    }
142
143    pub fn new_frag_ids(&self) -> Vec<u64> {
144        self.groups
145            .iter()
146            .flat_map(|g| g.new_frags.iter().map(|f| f.id))
147            .collect::<Vec<_>>()
148    }
149
150    pub fn new_frag_bitmap(&self) -> RoaringBitmap {
151        RoaringBitmap::from_iter(self.new_frag_ids().iter().map(|&id| id as u32))
152    }
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
156pub enum FragReuseIndexDetailsContentType {
157    Inline(FragReuseIndexDetails),
158    External(ExternalFile),
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
162pub struct FragReuseIndexDetails {
163    pub versions: Vec<FragReuseVersion>,
164}
165
166impl From<&FragReuseIndexDetails> for InlineContent {
167    fn from(details: &FragReuseIndexDetails) -> Self {
168        Self {
169            versions: details
170                .versions
171                .iter()
172                .map(|m| m.into())
173                // sort from oldest to latest version
174                .sorted_by_key(|v: &pb::fragment_reuse_index_details::Version| v.dataset_version)
175                .collect(),
176        }
177    }
178}
179
180impl TryFrom<InlineContent> for FragReuseIndexDetails {
181    type Error = Error;
182
183    fn try_from(content: InlineContent) -> Result<Self> {
184        Ok(Self {
185            versions: content
186                .versions
187                .into_iter()
188                .map(|m| m.try_into())
189                .collect::<Result<Vec<_>>>()?,
190        })
191    }
192}
193
194impl FragReuseIndexDetails {
195    pub fn new_frag_bitmap(&self) -> RoaringBitmap {
196        RoaringBitmap::from_iter(
197            self.versions
198                .iter()
199                .flat_map(|v| v.new_frag_ids().into_iter().map(|id| id as u32)),
200        )
201    }
202}
203
204/// An index that stores row ID maps.
205/// A row ID map describes the mapping from old row address to new address after compactions.
206/// Each version contains the mapping for one round of compaction.
207#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub struct FragReuseIndex {
209    pub uuid: Uuid,
210    pub row_id_maps: Vec<HashMap<u64, Option<u64>>>,
211    pub details: FragReuseIndexDetails,
212}
213
214impl DeepSizeOf for FragReuseIndex {
215    fn deep_size_of_children(&self, cx: &mut Context) -> usize {
216        self.row_id_maps.deep_size_of_children(cx) + self.details.deep_size_of_children(cx)
217    }
218}
219
220impl FragReuseIndex {
221    pub fn new(
222        uuid: Uuid,
223        row_id_maps: Vec<HashMap<u64, Option<u64>>>,
224        details: FragReuseIndexDetails,
225    ) -> Self {
226        Self {
227            uuid,
228            row_id_maps,
229            details,
230        }
231    }
232
233    pub fn remap_row_id(&self, row_id: u64) -> Option<u64> {
234        let mut mapped_value = Some(row_id);
235        for row_id_map in self.row_id_maps.iter() {
236            if mapped_value.is_some() {
237                mapped_value = row_id_map
238                    .get(&mapped_value.unwrap())
239                    .copied()
240                    .unwrap_or(mapped_value);
241            }
242        }
243
244        mapped_value
245    }
246
247    pub fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap {
248        RowAddrTreeMap::from_iter(row_addrs.row_addrs().unwrap().filter_map(|addr| {
249            let addr_as_u64 = u64::from(addr);
250            self.remap_row_id(addr_as_u64)
251        }))
252    }
253
254    pub fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap {
255        RoaringTreemap::from_iter(row_ids.iter().filter_map(|addr| self.remap_row_id(addr)))
256    }
257
258    /// Remap a record batch that contains a row_id column at index `row_id_idx`
259    /// Currently this assumes there are only 2 columns in the schema,
260    /// which is the case for all indexes.
261    /// For example, for btree, the schema is (value, row_id).
262    /// For vector index storage, the schema is (row_id, vector).
263    pub fn remap_row_ids_record_batch(
264        &self,
265        batch: RecordBatch,
266        row_id_idx: usize,
267    ) -> Result<RecordBatch> {
268        assert_eq!(batch.schema().fields().len(), 2);
269        let other_column_idx = 1 - row_id_idx;
270        let row_ids = batch.column(row_id_idx).as_primitive::<UInt64Type>();
271        let (val_indices, new_row_ids): (Vec<u64>, Vec<u64>) = row_ids
272            .values()
273            .iter()
274            .enumerate()
275            .filter_map(|(idx, old_id)| {
276                self.remap_row_id(*old_id)
277                    .map(|new_id| (idx as u64, new_id))
278            })
279            .unzip();
280        let new_val_indices = UInt64Array::from_iter_values(val_indices);
281        let new_vals =
282            arrow_select::take::take(batch.column(other_column_idx), &new_val_indices, None)?;
283
284        let mut batch_data: Vec<(usize, ArrayRef)> = vec![
285            (
286                row_id_idx,
287                Arc::new(UInt64Array::from_iter_values(new_row_ids)) as ArrayRef,
288            ),
289            (other_column_idx, Arc::new(new_vals)),
290        ];
291        batch_data.sort_by_key(|(i, _)| *i);
292        Ok(RecordBatch::try_new(
293            batch.schema(),
294            batch_data.into_iter().map(|(_, item)| item).collect(),
295        )?)
296    }
297
298    pub fn remap_row_ids_array(&self, array: ArrayRef) -> PrimitiveArray<UInt64Type> {
299        let primitive_array = array
300            .as_any()
301            .downcast_ref::<PrimitiveArray<UInt64Type>>()
302            .expect("expected row IDs to be uint64 array");
303        (0..primitive_array.len())
304            .map(|i| {
305                if primitive_array.is_null(i) {
306                    None
307                } else {
308                    self.remap_row_id(primitive_array.value(i))
309                }
310            })
311            .collect()
312    }
313
314    pub fn remap_fragment_bitmap(&self, fragment_bitmap: &mut RoaringBitmap) -> Result<()> {
315        for version in self.details.versions.iter() {
316            for group in version.groups.iter() {
317                let mut removed = 0;
318                for old_frag in group.old_frags.iter() {
319                    if fragment_bitmap.remove(old_frag.id as u32) {
320                        removed += 1;
321                    }
322                }
323
324                if removed > 0 {
325                    if removed != group.old_frags.len() {
326                        // Straddle: the index covered only part of this rewrite
327                        // group. Caused by the bug fixed in
328                        // <https://github.com/lance-format/lance/pull/6610>.
329                        // We've already removed the indexed old_frags from the
330                        // bitmap above; deliberately do NOT insert new_frags,
331                        // since the merged fragment also contains rows that
332                        // were never indexed. Affected rows fall through to
333                        // flat scan until the next optimize_indices. The fix
334                        // is persisted on the next write via build_manifest.
335                        tracing::warn!(
336                            "Healing straddling fragment-reuse rewrite group in index bitmap: \
337                             group {:?} was only partially indexed ({} of {} old fragments). \
338                             Affected rows will use flat scan until the next optimize_indices.",
339                            group.old_frags,
340                            removed,
341                            group.old_frags.len(),
342                        );
343                        continue;
344                    }
345
346                    for new_frag in group.new_frags.iter() {
347                        fragment_bitmap.insert(new_frag.id as u32);
348                    }
349                }
350            }
351        }
352        Ok(())
353    }
354}
355
356#[derive(Serialize)]
357struct FragReuseStatistics {
358    num_versions: usize,
359}
360
361#[async_trait]
362impl Index for FragReuseIndex {
363    fn as_any(&self) -> &dyn Any {
364        self
365    }
366
367    fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
368        self
369    }
370
371    fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
372        Err(Error::not_supported_source(
373            "FragReuseIndex is not a vector index".into(),
374        ))
375    }
376
377    fn statistics(&self) -> Result<serde_json::Value> {
378        let stats = FragReuseStatistics {
379            num_versions: self.details.versions.len(),
380        };
381        serde_json::to_value(stats).map_err(|e| {
382            Error::internal(format!(
383                "failed to serialize fragment reuse index statistics: {}",
384                e
385            ))
386        })
387    }
388
389    async fn prewarm(&self) -> Result<()> {
390        Ok(())
391    }
392
393    fn index_type(&self) -> IndexType {
394        IndexType::FragmentReuse
395    }
396
397    async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
398        unimplemented!()
399    }
400}
401
402#[cfg(test)]
403mod tests {
404
405    use super::*;
406
407    #[tokio::test]
408    async fn test_serialize_deserialize_index_details() {
409        // Create sample FragReuseVersions with different dataset versions
410        let version1 = FragReuseVersion {
411            dataset_version: 2,
412            groups: vec![FragReuseGroup {
413                changed_row_addrs: vec![1, 2, 3],
414                old_frags: vec![FragDigest {
415                    id: 1,
416                    physical_rows: 1,
417                    num_deleted_rows: 0,
418                }],
419                new_frags: vec![
420                    FragDigest {
421                        id: 2,
422                        physical_rows: 1,
423                        num_deleted_rows: 0,
424                    },
425                    FragDigest {
426                        id: 3,
427                        physical_rows: 1,
428                        num_deleted_rows: 0,
429                    },
430                ],
431            }],
432        };
433
434        let version2 = FragReuseVersion {
435            dataset_version: 1,
436            groups: vec![FragReuseGroup {
437                changed_row_addrs: vec![4, 5, 6],
438                old_frags: vec![FragDigest {
439                    id: 2,
440                    physical_rows: 1,
441                    num_deleted_rows: 0,
442                }],
443                new_frags: vec![
444                    FragDigest {
445                        id: 4,
446                        physical_rows: 1,
447                        num_deleted_rows: 0,
448                    },
449                    FragDigest {
450                        id: 5,
451                        physical_rows: 1,
452                        num_deleted_rows: 0,
453                    },
454                ],
455            }],
456        };
457
458        // Create FragReuseIndexDetails with versions in reverse order
459        let details = FragReuseIndexDetails {
460            versions: vec![version1, version2],
461        };
462
463        // Convert to protobuf format
464        let inline_content: InlineContent = (&details).into();
465
466        // Convert back to FragReuseIndexDetails
467        let roundtrip_details = FragReuseIndexDetails::try_from(inline_content).unwrap();
468
469        // Verify the roundtrip
470        assert_eq!(roundtrip_details.versions.len(), 2);
471
472        // Verify versions are sorted by dataset_version (oldest to latest)
473        assert_eq!(roundtrip_details.versions[0].dataset_version, 1);
474        assert_eq!(
475            roundtrip_details.versions[0].groups[0].changed_row_addrs,
476            vec![4, 5, 6]
477        );
478        assert_eq!(
479            roundtrip_details.versions[0].groups[0].new_frags,
480            vec![
481                FragDigest {
482                    id: 4,
483                    physical_rows: 1,
484                    num_deleted_rows: 0,
485                },
486                FragDigest {
487                    id: 5,
488                    physical_rows: 1,
489                    num_deleted_rows: 0,
490                }
491            ]
492        );
493        assert_eq!(
494            roundtrip_details.versions[0].groups[0].old_frags,
495            vec![FragDigest {
496                id: 2,
497                physical_rows: 1,
498                num_deleted_rows: 0,
499            }]
500        );
501
502        assert_eq!(roundtrip_details.versions[1].dataset_version, 2);
503        assert_eq!(
504            roundtrip_details.versions[1].groups[0].changed_row_addrs,
505            vec![1, 2, 3]
506        );
507        assert_eq!(
508            roundtrip_details.versions[1].groups[0].new_frags,
509            vec![
510                FragDigest {
511                    id: 2,
512                    physical_rows: 1,
513                    num_deleted_rows: 0,
514                },
515                FragDigest {
516                    id: 3,
517                    physical_rows: 1,
518                    num_deleted_rows: 0,
519                }
520            ]
521        );
522        assert_eq!(
523            roundtrip_details.versions[1].groups[0].old_frags,
524            vec![FragDigest {
525                id: 1,
526                physical_rows: 1,
527                num_deleted_rows: 0,
528            }]
529        );
530    }
531}