1use std::{collections::HashMap, sync::Arc};
5
6use arrow_array::cast::AsArray;
7use arrow_array::types::UInt64Type;
8use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt64Array};
9use lance_core::deepsize::{Context, DeepSizeOf};
10use lance_core::{Error, Result};
11use lance_select::RowAddrTreeMap;
12use roaring::{RoaringBitmap, RoaringTreemap};
13use serde::{Deserialize, Serialize};
14use uuid::Uuid;
15
16use crate::format::pb::fragment_reuse_index_details::InlineContent;
17use crate::format::{ExternalFile, Fragment, pb};
18
19pub const FRAG_REUSE_INDEX_NAME: &str = "__lance_frag_reuse";
20pub const FRAG_REUSE_DETAILS_FILE_NAME: &str = "details.binpb";
21
22#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
23pub struct FragDigest {
24 pub id: u64,
25 pub physical_rows: usize,
26 pub num_deleted_rows: usize,
27}
28
29impl From<&FragDigest> for pb::fragment_reuse_index_details::FragmentDigest {
30 fn from(digest: &FragDigest) -> Self {
31 Self {
32 id: digest.id,
33 physical_rows: digest.physical_rows as u64,
34 num_deleted_rows: digest.num_deleted_rows as u64,
35 }
36 }
37}
38
39impl From<&Fragment> for FragDigest {
40 fn from(fragment: &Fragment) -> Self {
41 Self {
42 id: fragment.id,
43 physical_rows: fragment
44 .physical_rows
45 .expect("Fragment doesn't have physical rows recorded"),
46 num_deleted_rows: fragment
47 .deletion_file
48 .as_ref()
49 .and_then(|d| d.num_deleted_rows)
50 .unwrap_or(0),
51 }
52 }
53}
54
55impl TryFrom<pb::fragment_reuse_index_details::FragmentDigest> for FragDigest {
56 type Error = Error;
57
58 fn try_from(digest: pb::fragment_reuse_index_details::FragmentDigest) -> Result<Self> {
59 Ok(Self {
60 id: digest.id,
61 physical_rows: digest.physical_rows as usize,
62 num_deleted_rows: digest.num_deleted_rows as usize,
63 })
64 }
65}
66
67#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
68pub struct FragReuseGroup {
69 pub changed_row_addrs: Vec<u8>,
70 pub old_frags: Vec<FragDigest>,
71 pub new_frags: Vec<FragDigest>,
72}
73
74impl From<&FragReuseGroup> for pb::fragment_reuse_index_details::Group {
75 fn from(group: &FragReuseGroup) -> Self {
76 Self {
77 changed_row_addrs: group.changed_row_addrs.clone(),
78 old_fragments: group.old_frags.iter().map(|f| f.into()).collect(),
79 new_fragments: group.new_frags.iter().map(|f| f.into()).collect(),
80 }
81 }
82}
83
84impl TryFrom<pb::fragment_reuse_index_details::Group> for FragReuseGroup {
85 type Error = Error;
86
87 fn try_from(group: pb::fragment_reuse_index_details::Group) -> Result<Self> {
88 Ok(Self {
89 changed_row_addrs: group.changed_row_addrs,
90 old_frags: group
91 .old_fragments
92 .into_iter()
93 .map(FragDigest::try_from)
94 .collect::<Result<_>>()?,
95 new_frags: group
96 .new_fragments
97 .into_iter()
98 .map(FragDigest::try_from)
99 .collect::<Result<_>>()?,
100 })
101 }
102}
103
104#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
105pub struct FragReuseVersion {
106 pub dataset_version: u64,
107 pub groups: Vec<FragReuseGroup>,
108}
109
110impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version {
111 fn from(version: &FragReuseVersion) -> Self {
112 Self {
113 dataset_version: version.dataset_version,
114 groups: version.groups.iter().map(|g| g.into()).collect(),
115 }
116 }
117}
118
119impl TryFrom<pb::fragment_reuse_index_details::Version> for FragReuseVersion {
120 type Error = Error;
121
122 fn try_from(version: pb::fragment_reuse_index_details::Version) -> Result<Self> {
123 Ok(Self {
124 dataset_version: version.dataset_version,
125 groups: version
126 .groups
127 .into_iter()
128 .map(FragReuseGroup::try_from)
129 .collect::<Result<_>>()?,
130 })
131 }
132}
133
134impl FragReuseVersion {
135 pub fn old_frag_ids(&self) -> Vec<u64> {
136 self.groups
137 .iter()
138 .flat_map(|g| g.old_frags.iter().map(|f| f.id))
139 .collect::<Vec<_>>()
140 }
141
142 pub fn new_frag_ids(&self) -> Vec<u64> {
143 self.groups
144 .iter()
145 .flat_map(|g| g.new_frags.iter().map(|f| f.id))
146 .collect::<Vec<_>>()
147 }
148
149 pub fn new_frag_bitmap(&self) -> RoaringBitmap {
150 RoaringBitmap::from_iter(self.new_frag_ids().iter().map(|&id| id as u32))
151 }
152}
153
154#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
155pub enum FragReuseIndexDetailsContentType {
156 Inline(FragReuseIndexDetails),
157 External(ExternalFile),
158}
159
160#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
161pub struct FragReuseIndexDetails {
162 pub versions: Vec<FragReuseVersion>,
163}
164
165impl From<&FragReuseIndexDetails> for InlineContent {
166 fn from(details: &FragReuseIndexDetails) -> Self {
167 let mut versions: Vec<pb::fragment_reuse_index_details::Version> =
168 details.versions.iter().map(|m| m.into()).collect();
169 versions.sort_by_key(|v| v.dataset_version);
171 Self { versions }
172 }
173}
174
175impl TryFrom<InlineContent> for FragReuseIndexDetails {
176 type Error = Error;
177
178 fn try_from(content: InlineContent) -> Result<Self> {
179 Ok(Self {
180 versions: content
181 .versions
182 .into_iter()
183 .map(|m| m.try_into())
184 .collect::<Result<Vec<_>>>()?,
185 })
186 }
187}
188
189impl FragReuseIndexDetails {
190 pub fn new_frag_bitmap(&self) -> RoaringBitmap {
191 RoaringBitmap::from_iter(
192 self.versions
193 .iter()
194 .flat_map(|v| v.new_frag_ids().into_iter().map(|id| id as u32)),
195 )
196 }
197}
198
199#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
203pub struct FragReuseIndex {
204 pub uuid: Uuid,
205 pub row_id_maps: Vec<HashMap<u64, Option<u64>>>,
206 pub details: FragReuseIndexDetails,
207}
208
209impl DeepSizeOf for FragReuseIndex {
210 fn deep_size_of_children(&self, cx: &mut Context) -> usize {
211 self.row_id_maps.deep_size_of_children(cx) + self.details.deep_size_of_children(cx)
212 }
213}
214
215impl FragReuseIndex {
216 pub fn new(
217 uuid: Uuid,
218 row_id_maps: Vec<HashMap<u64, Option<u64>>>,
219 details: FragReuseIndexDetails,
220 ) -> Self {
221 Self {
222 uuid,
223 row_id_maps,
224 details,
225 }
226 }
227
228 pub fn remap_row_id(&self, row_id: u64) -> Option<u64> {
229 let mut mapped_value = Some(row_id);
230 for row_id_map in self.row_id_maps.iter() {
231 if mapped_value.is_some() {
232 mapped_value = row_id_map
233 .get(&mapped_value.unwrap())
234 .copied()
235 .unwrap_or(mapped_value);
236 }
237 }
238
239 mapped_value
240 }
241
242 pub fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap {
243 RowAddrTreeMap::from_iter(row_addrs.row_addrs().unwrap().filter_map(|addr| {
244 let addr_as_u64 = u64::from(addr);
245 self.remap_row_id(addr_as_u64)
246 }))
247 }
248
249 pub fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap {
250 RoaringTreemap::from_iter(row_ids.iter().filter_map(|addr| self.remap_row_id(addr)))
251 }
252
253 pub fn remap_row_ids_record_batch(
259 &self,
260 batch: RecordBatch,
261 row_id_idx: usize,
262 ) -> Result<RecordBatch> {
263 assert_eq!(batch.schema().fields().len(), 2);
264 let other_column_idx = 1 - row_id_idx;
265 let row_ids = batch.column(row_id_idx).as_primitive::<UInt64Type>();
266 let (val_indices, new_row_ids): (Vec<u64>, Vec<u64>) = row_ids
267 .values()
268 .iter()
269 .enumerate()
270 .filter_map(|(idx, old_id)| {
271 self.remap_row_id(*old_id)
272 .map(|new_id| (idx as u64, new_id))
273 })
274 .unzip();
275 let new_val_indices = UInt64Array::from_iter_values(val_indices);
276 let new_vals =
277 arrow::compute::take(batch.column(other_column_idx), &new_val_indices, None)?;
278
279 let mut batch_data: Vec<(usize, ArrayRef)> = vec![
280 (
281 row_id_idx,
282 Arc::new(UInt64Array::from_iter_values(new_row_ids)) as ArrayRef,
283 ),
284 (other_column_idx, Arc::new(new_vals)),
285 ];
286 batch_data.sort_by_key(|(i, _)| *i);
287 Ok(RecordBatch::try_new(
288 batch.schema(),
289 batch_data.into_iter().map(|(_, item)| item).collect(),
290 )?)
291 }
292
293 pub fn remap_row_ids_array(&self, array: ArrayRef) -> PrimitiveArray<UInt64Type> {
294 let primitive_array = array
295 .as_any()
296 .downcast_ref::<PrimitiveArray<UInt64Type>>()
297 .expect("expected row IDs to be uint64 array");
298 (0..primitive_array.len())
299 .map(|i| {
300 if primitive_array.is_null(i) {
301 None
302 } else {
303 self.remap_row_id(primitive_array.value(i))
304 }
305 })
306 .collect()
307 }
308
309 pub fn remap_fragment_bitmap(&self, fragment_bitmap: &mut RoaringBitmap) -> Result<()> {
310 for version in self.details.versions.iter() {
311 for group in version.groups.iter() {
312 let mut removed = 0;
313 for old_frag in group.old_frags.iter() {
314 if fragment_bitmap.remove(old_frag.id as u32) {
315 removed += 1;
316 }
317 }
318
319 if removed > 0 {
320 if removed != group.old_frags.len() {
321 tracing::warn!(
331 "Healing straddling fragment-reuse rewrite group in index bitmap: \
332 group {:?} was only partially indexed ({} of {} old fragments). \
333 Affected rows will use flat scan until the next optimize_indices.",
334 group.old_frags,
335 removed,
336 group.old_frags.len(),
337 );
338 continue;
339 }
340
341 for new_frag in group.new_frags.iter() {
342 fragment_bitmap.insert(new_frag.id as u32);
343 }
344 }
345 }
346 }
347 Ok(())
348 }
349}
350
351#[cfg(test)]
352mod tests {
353
354 use super::*;
355
356 #[tokio::test]
357 async fn test_serialize_deserialize_index_details() {
358 let version1 = FragReuseVersion {
360 dataset_version: 2,
361 groups: vec![FragReuseGroup {
362 changed_row_addrs: vec![1, 2, 3],
363 old_frags: vec![FragDigest {
364 id: 1,
365 physical_rows: 1,
366 num_deleted_rows: 0,
367 }],
368 new_frags: vec![
369 FragDigest {
370 id: 2,
371 physical_rows: 1,
372 num_deleted_rows: 0,
373 },
374 FragDigest {
375 id: 3,
376 physical_rows: 1,
377 num_deleted_rows: 0,
378 },
379 ],
380 }],
381 };
382
383 let version2 = FragReuseVersion {
384 dataset_version: 1,
385 groups: vec![FragReuseGroup {
386 changed_row_addrs: vec![4, 5, 6],
387 old_frags: vec![FragDigest {
388 id: 2,
389 physical_rows: 1,
390 num_deleted_rows: 0,
391 }],
392 new_frags: vec![
393 FragDigest {
394 id: 4,
395 physical_rows: 1,
396 num_deleted_rows: 0,
397 },
398 FragDigest {
399 id: 5,
400 physical_rows: 1,
401 num_deleted_rows: 0,
402 },
403 ],
404 }],
405 };
406
407 let details = FragReuseIndexDetails {
409 versions: vec![version1, version2],
410 };
411
412 let inline_content: InlineContent = (&details).into();
414
415 let roundtrip_details = FragReuseIndexDetails::try_from(inline_content).unwrap();
417
418 assert_eq!(roundtrip_details.versions.len(), 2);
420
421 assert_eq!(roundtrip_details.versions[0].dataset_version, 1);
423 assert_eq!(
424 roundtrip_details.versions[0].groups[0].changed_row_addrs,
425 vec![4, 5, 6]
426 );
427 assert_eq!(
428 roundtrip_details.versions[0].groups[0].new_frags,
429 vec![
430 FragDigest {
431 id: 4,
432 physical_rows: 1,
433 num_deleted_rows: 0,
434 },
435 FragDigest {
436 id: 5,
437 physical_rows: 1,
438 num_deleted_rows: 0,
439 }
440 ]
441 );
442 assert_eq!(
443 roundtrip_details.versions[0].groups[0].old_frags,
444 vec![FragDigest {
445 id: 2,
446 physical_rows: 1,
447 num_deleted_rows: 0,
448 }]
449 );
450
451 assert_eq!(roundtrip_details.versions[1].dataset_version, 2);
452 assert_eq!(
453 roundtrip_details.versions[1].groups[0].changed_row_addrs,
454 vec![1, 2, 3]
455 );
456 assert_eq!(
457 roundtrip_details.versions[1].groups[0].new_frags,
458 vec![
459 FragDigest {
460 id: 2,
461 physical_rows: 1,
462 num_deleted_rows: 0,
463 },
464 FragDigest {
465 id: 3,
466 physical_rows: 1,
467 num_deleted_rows: 0,
468 }
469 ]
470 );
471 assert_eq!(
472 roundtrip_details.versions[1].groups[0].old_frags,
473 vec![FragDigest {
474 id: 1,
475 physical_rows: 1,
476 num_deleted_rows: 0,
477 }]
478 );
479 }
480}