Skip to main content

lance_table/system_index/
frag_reuse.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::cast::AsArray;
7use arrow_array::types::UInt64Type;
8use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt64Array};
9use lance_core::deepsize::{Context, DeepSizeOf};
10use lance_core::{Error, Result};
11use lance_select::RowAddrTreeMap;
12use roaring::{RoaringBitmap, RoaringTreemap};
13use serde::{Deserialize, Serialize};
14use uuid::Uuid;
15
16use crate::format::pb::fragment_reuse_index_details::InlineContent;
17use crate::format::{ExternalFile, Fragment, pb};
18
19pub const FRAG_REUSE_INDEX_NAME: &str = "__lance_frag_reuse";
20pub const FRAG_REUSE_DETAILS_FILE_NAME: &str = "details.binpb";
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
23pub struct FragDigest {
24    pub id: u64,
25    pub physical_rows: usize,
26    pub num_deleted_rows: usize,
27}
28
29impl From<&FragDigest> for pb::fragment_reuse_index_details::FragmentDigest {
30    fn from(digest: &FragDigest) -> Self {
31        Self {
32            id: digest.id,
33            physical_rows: digest.physical_rows as u64,
34            num_deleted_rows: digest.num_deleted_rows as u64,
35        }
36    }
37}
38
39impl From<&Fragment> for FragDigest {
40    fn from(fragment: &Fragment) -> Self {
41        Self {
42            id: fragment.id,
43            physical_rows: fragment
44                .physical_rows
45                .expect("Fragment doesn't have physical rows recorded"),
46            num_deleted_rows: fragment
47                .deletion_file
48                .as_ref()
49                .and_then(|d| d.num_deleted_rows)
50                .unwrap_or(0),
51        }
52    }
53}
54
55impl TryFrom<pb::fragment_reuse_index_details::FragmentDigest> for FragDigest {
56    type Error = Error;
57
58    fn try_from(digest: pb::fragment_reuse_index_details::FragmentDigest) -> Result<Self> {
59        Ok(Self {
60            id: digest.id,
61            physical_rows: digest.physical_rows as usize,
62            num_deleted_rows: digest.num_deleted_rows as usize,
63        })
64    }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
68pub struct FragReuseGroup {
69    pub changed_row_addrs: Vec<u8>,
70    pub old_frags: Vec<FragDigest>,
71    pub new_frags: Vec<FragDigest>,
72}
73
74impl From<&FragReuseGroup> for pb::fragment_reuse_index_details::Group {
75    fn from(group: &FragReuseGroup) -> Self {
76        Self {
77            changed_row_addrs: group.changed_row_addrs.clone(),
78            old_fragments: group.old_frags.iter().map(|f| f.into()).collect(),
79            new_fragments: group.new_frags.iter().map(|f| f.into()).collect(),
80        }
81    }
82}
83
84impl TryFrom<pb::fragment_reuse_index_details::Group> for FragReuseGroup {
85    type Error = Error;
86
87    fn try_from(group: pb::fragment_reuse_index_details::Group) -> Result<Self> {
88        Ok(Self {
89            changed_row_addrs: group.changed_row_addrs,
90            old_frags: group
91                .old_fragments
92                .into_iter()
93                .map(FragDigest::try_from)
94                .collect::<Result<_>>()?,
95            new_frags: group
96                .new_fragments
97                .into_iter()
98                .map(FragDigest::try_from)
99                .collect::<Result<_>>()?,
100        })
101    }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
105pub struct FragReuseVersion {
106    pub dataset_version: u64,
107    pub groups: Vec<FragReuseGroup>,
108}
109
110impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version {
111    fn from(version: &FragReuseVersion) -> Self {
112        Self {
113            dataset_version: version.dataset_version,
114            groups: version.groups.iter().map(|g| g.into()).collect(),
115        }
116    }
117}
118
119impl TryFrom<pb::fragment_reuse_index_details::Version> for FragReuseVersion {
120    type Error = Error;
121
122    fn try_from(version: pb::fragment_reuse_index_details::Version) -> Result<Self> {
123        Ok(Self {
124            dataset_version: version.dataset_version,
125            groups: version
126                .groups
127                .into_iter()
128                .map(FragReuseGroup::try_from)
129                .collect::<Result<_>>()?,
130        })
131    }
132}
133
134impl FragReuseVersion {
135    pub fn old_frag_ids(&self) -> Vec<u64> {
136        self.groups
137            .iter()
138            .flat_map(|g| g.old_frags.iter().map(|f| f.id))
139            .collect::<Vec<_>>()
140    }
141
142    pub fn new_frag_ids(&self) -> Vec<u64> {
143        self.groups
144            .iter()
145            .flat_map(|g| g.new_frags.iter().map(|f| f.id))
146            .collect::<Vec<_>>()
147    }
148
149    pub fn new_frag_bitmap(&self) -> RoaringBitmap {
150        RoaringBitmap::from_iter(self.new_frag_ids().iter().map(|&id| id as u32))
151    }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
155pub enum FragReuseIndexDetailsContentType {
156    Inline(FragReuseIndexDetails),
157    External(ExternalFile),
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
161pub struct FragReuseIndexDetails {
162    pub versions: Vec<FragReuseVersion>,
163}
164
165impl From<&FragReuseIndexDetails> for InlineContent {
166    fn from(details: &FragReuseIndexDetails) -> Self {
167        let mut versions: Vec<pb::fragment_reuse_index_details::Version> =
168            details.versions.iter().map(|m| m.into()).collect();
169        // sort from oldest to latest version
170        versions.sort_by_key(|v| v.dataset_version);
171        Self { versions }
172    }
173}
174
175impl TryFrom<InlineContent> for FragReuseIndexDetails {
176    type Error = Error;
177
178    fn try_from(content: InlineContent) -> Result<Self> {
179        Ok(Self {
180            versions: content
181                .versions
182                .into_iter()
183                .map(|m| m.try_into())
184                .collect::<Result<Vec<_>>>()?,
185        })
186    }
187}
188
189impl FragReuseIndexDetails {
190    pub fn new_frag_bitmap(&self) -> RoaringBitmap {
191        RoaringBitmap::from_iter(
192            self.versions
193                .iter()
194                .flat_map(|v| v.new_frag_ids().into_iter().map(|id| id as u32)),
195        )
196    }
197}
198
199/// An index that stores row ID maps.
200/// A row ID map describes the mapping from old row address to new address after compactions.
201/// Each version contains the mapping for one round of compaction.
202#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
203pub struct FragReuseIndex {
204    pub uuid: Uuid,
205    pub row_id_maps: Vec<HashMap<u64, Option<u64>>>,
206    pub details: FragReuseIndexDetails,
207}
208
209impl DeepSizeOf for FragReuseIndex {
210    fn deep_size_of_children(&self, cx: &mut Context) -> usize {
211        self.row_id_maps.deep_size_of_children(cx) + self.details.deep_size_of_children(cx)
212    }
213}
214
215impl FragReuseIndex {
216    pub fn new(
217        uuid: Uuid,
218        row_id_maps: Vec<HashMap<u64, Option<u64>>>,
219        details: FragReuseIndexDetails,
220    ) -> Self {
221        Self {
222            uuid,
223            row_id_maps,
224            details,
225        }
226    }
227
228    pub fn remap_row_id(&self, row_id: u64) -> Option<u64> {
229        let mut mapped_value = Some(row_id);
230        for row_id_map in self.row_id_maps.iter() {
231            if mapped_value.is_some() {
232                mapped_value = row_id_map
233                    .get(&mapped_value.unwrap())
234                    .copied()
235                    .unwrap_or(mapped_value);
236            }
237        }
238
239        mapped_value
240    }
241
242    pub fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap {
243        RowAddrTreeMap::from_iter(row_addrs.row_addrs().unwrap().filter_map(|addr| {
244            let addr_as_u64 = u64::from(addr);
245            self.remap_row_id(addr_as_u64)
246        }))
247    }
248
249    pub fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap {
250        RoaringTreemap::from_iter(row_ids.iter().filter_map(|addr| self.remap_row_id(addr)))
251    }
252
253    /// Remap a record batch that contains a row_id column at index `row_id_idx`
254    /// Currently this assumes there are only 2 columns in the schema,
255    /// which is the case for all indexes.
256    /// For example, for btree, the schema is (value, row_id).
257    /// For vector index storage, the schema is (row_id, vector).
258    pub fn remap_row_ids_record_batch(
259        &self,
260        batch: RecordBatch,
261        row_id_idx: usize,
262    ) -> Result<RecordBatch> {
263        assert_eq!(batch.schema().fields().len(), 2);
264        let other_column_idx = 1 - row_id_idx;
265        let row_ids = batch.column(row_id_idx).as_primitive::<UInt64Type>();
266        let (val_indices, new_row_ids): (Vec<u64>, Vec<u64>) = row_ids
267            .values()
268            .iter()
269            .enumerate()
270            .filter_map(|(idx, old_id)| {
271                self.remap_row_id(*old_id)
272                    .map(|new_id| (idx as u64, new_id))
273            })
274            .unzip();
275        let new_val_indices = UInt64Array::from_iter_values(val_indices);
276        let new_vals =
277            arrow::compute::take(batch.column(other_column_idx), &new_val_indices, None)?;
278
279        let mut batch_data: Vec<(usize, ArrayRef)> = vec![
280            (
281                row_id_idx,
282                Arc::new(UInt64Array::from_iter_values(new_row_ids)) as ArrayRef,
283            ),
284            (other_column_idx, Arc::new(new_vals)),
285        ];
286        batch_data.sort_by_key(|(i, _)| *i);
287        Ok(RecordBatch::try_new(
288            batch.schema(),
289            batch_data.into_iter().map(|(_, item)| item).collect(),
290        )?)
291    }
292
293    pub fn remap_row_ids_array(&self, array: ArrayRef) -> PrimitiveArray<UInt64Type> {
294        let primitive_array = array
295            .as_any()
296            .downcast_ref::<PrimitiveArray<UInt64Type>>()
297            .expect("expected row IDs to be uint64 array");
298        (0..primitive_array.len())
299            .map(|i| {
300                if primitive_array.is_null(i) {
301                    None
302                } else {
303                    self.remap_row_id(primitive_array.value(i))
304                }
305            })
306            .collect()
307    }
308
309    pub fn remap_fragment_bitmap(&self, fragment_bitmap: &mut RoaringBitmap) -> Result<()> {
310        for version in self.details.versions.iter() {
311            for group in version.groups.iter() {
312                let mut removed = 0;
313                for old_frag in group.old_frags.iter() {
314                    if fragment_bitmap.remove(old_frag.id as u32) {
315                        removed += 1;
316                    }
317                }
318
319                if removed > 0 {
320                    if removed != group.old_frags.len() {
321                        // Straddle: the index covered only part of this rewrite
322                        // group. Caused by the bug fixed in
323                        // <https://github.com/lance-format/lance/pull/6610>.
324                        // We've already removed the indexed old_frags from the
325                        // bitmap above; deliberately do NOT insert new_frags,
326                        // since the merged fragment also contains rows that
327                        // were never indexed. Affected rows fall through to
328                        // flat scan until the next optimize_indices. The fix
329                        // is persisted on the next write via build_manifest.
330                        tracing::warn!(
331                            "Healing straddling fragment-reuse rewrite group in index bitmap: \
332                             group {:?} was only partially indexed ({} of {} old fragments). \
333                             Affected rows will use flat scan until the next optimize_indices.",
334                            group.old_frags,
335                            removed,
336                            group.old_frags.len(),
337                        );
338                        continue;
339                    }
340
341                    for new_frag in group.new_frags.iter() {
342                        fragment_bitmap.insert(new_frag.id as u32);
343                    }
344                }
345            }
346        }
347        Ok(())
348    }
349}
350
351#[cfg(test)]
352mod tests {
353
354    use super::*;
355
356    #[tokio::test]
357    async fn test_serialize_deserialize_index_details() {
358        // Create sample FragReuseVersions with different dataset versions
359        let version1 = FragReuseVersion {
360            dataset_version: 2,
361            groups: vec![FragReuseGroup {
362                changed_row_addrs: vec![1, 2, 3],
363                old_frags: vec![FragDigest {
364                    id: 1,
365                    physical_rows: 1,
366                    num_deleted_rows: 0,
367                }],
368                new_frags: vec![
369                    FragDigest {
370                        id: 2,
371                        physical_rows: 1,
372                        num_deleted_rows: 0,
373                    },
374                    FragDigest {
375                        id: 3,
376                        physical_rows: 1,
377                        num_deleted_rows: 0,
378                    },
379                ],
380            }],
381        };
382
383        let version2 = FragReuseVersion {
384            dataset_version: 1,
385            groups: vec![FragReuseGroup {
386                changed_row_addrs: vec![4, 5, 6],
387                old_frags: vec![FragDigest {
388                    id: 2,
389                    physical_rows: 1,
390                    num_deleted_rows: 0,
391                }],
392                new_frags: vec![
393                    FragDigest {
394                        id: 4,
395                        physical_rows: 1,
396                        num_deleted_rows: 0,
397                    },
398                    FragDigest {
399                        id: 5,
400                        physical_rows: 1,
401                        num_deleted_rows: 0,
402                    },
403                ],
404            }],
405        };
406
407        // Create FragReuseIndexDetails with versions in reverse order
408        let details = FragReuseIndexDetails {
409            versions: vec![version1, version2],
410        };
411
412        // Convert to protobuf format
413        let inline_content: InlineContent = (&details).into();
414
415        // Convert back to FragReuseIndexDetails
416        let roundtrip_details = FragReuseIndexDetails::try_from(inline_content).unwrap();
417
418        // Verify the roundtrip
419        assert_eq!(roundtrip_details.versions.len(), 2);
420
421        // Verify versions are sorted by dataset_version (oldest to latest)
422        assert_eq!(roundtrip_details.versions[0].dataset_version, 1);
423        assert_eq!(
424            roundtrip_details.versions[0].groups[0].changed_row_addrs,
425            vec![4, 5, 6]
426        );
427        assert_eq!(
428            roundtrip_details.versions[0].groups[0].new_frags,
429            vec![
430                FragDigest {
431                    id: 4,
432                    physical_rows: 1,
433                    num_deleted_rows: 0,
434                },
435                FragDigest {
436                    id: 5,
437                    physical_rows: 1,
438                    num_deleted_rows: 0,
439                }
440            ]
441        );
442        assert_eq!(
443            roundtrip_details.versions[0].groups[0].old_frags,
444            vec![FragDigest {
445                id: 2,
446                physical_rows: 1,
447                num_deleted_rows: 0,
448            }]
449        );
450
451        assert_eq!(roundtrip_details.versions[1].dataset_version, 2);
452        assert_eq!(
453            roundtrip_details.versions[1].groups[0].changed_row_addrs,
454            vec![1, 2, 3]
455        );
456        assert_eq!(
457            roundtrip_details.versions[1].groups[0].new_frags,
458            vec![
459                FragDigest {
460                    id: 2,
461                    physical_rows: 1,
462                    num_deleted_rows: 0,
463                },
464                FragDigest {
465                    id: 3,
466                    physical_rows: 1,
467                    num_deleted_rows: 0,
468                }
469            ]
470        );
471        assert_eq!(
472            roundtrip_details.versions[1].groups[0].old_frags,
473            vec![FragDigest {
474                id: 1,
475                physical_rows: 1,
476                num_deleted_rows: 0,
477            }]
478        );
479    }
480}