1use crate::{Index, IndexType};
5use arrow_array::cast::AsArray;
6use arrow_array::types::UInt64Type;
7use arrow_array::{Array, ArrayRef, PrimitiveArray, RecordBatch, UInt64Array};
8use async_trait::async_trait;
9use deepsize::{Context, DeepSizeOf};
10use itertools::Itertools;
11use lance_core::utils::mask::RowAddrTreeMap;
12use lance_core::{Error, Result};
13use lance_table::format::pb::fragment_reuse_index_details::InlineContent;
14use lance_table::format::{ExternalFile, Fragment, pb};
15use roaring::{RoaringBitmap, RoaringTreemap};
16use serde::{Deserialize, Serialize};
17use std::{any::Any, collections::HashMap, sync::Arc};
18use uuid::Uuid;
19
20pub const FRAG_REUSE_INDEX_NAME: &str = "__lance_frag_reuse";
21pub const FRAG_REUSE_DETAILS_FILE_NAME: &str = "details.binpb";
22
23#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
24pub struct FragDigest {
25 pub id: u64,
26 pub physical_rows: usize,
27 pub num_deleted_rows: usize,
28}
29
30impl From<&FragDigest> for pb::fragment_reuse_index_details::FragmentDigest {
31 fn from(digest: &FragDigest) -> Self {
32 Self {
33 id: digest.id,
34 physical_rows: digest.physical_rows as u64,
35 num_deleted_rows: digest.num_deleted_rows as u64,
36 }
37 }
38}
39
40impl From<&Fragment> for FragDigest {
41 fn from(fragment: &Fragment) -> Self {
42 Self {
43 id: fragment.id,
44 physical_rows: fragment
45 .physical_rows
46 .expect("Fragment doesn't have physical rows recorded"),
47 num_deleted_rows: fragment
48 .deletion_file
49 .as_ref()
50 .and_then(|d| d.num_deleted_rows)
51 .unwrap_or(0),
52 }
53 }
54}
55
56impl TryFrom<pb::fragment_reuse_index_details::FragmentDigest> for FragDigest {
57 type Error = Error;
58
59 fn try_from(digest: pb::fragment_reuse_index_details::FragmentDigest) -> Result<Self> {
60 Ok(Self {
61 id: digest.id,
62 physical_rows: digest.physical_rows as usize,
63 num_deleted_rows: digest.num_deleted_rows as usize,
64 })
65 }
66}
67
68#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
69pub struct FragReuseGroup {
70 pub changed_row_addrs: Vec<u8>,
71 pub old_frags: Vec<FragDigest>,
72 pub new_frags: Vec<FragDigest>,
73}
74
75impl From<&FragReuseGroup> for pb::fragment_reuse_index_details::Group {
76 fn from(group: &FragReuseGroup) -> Self {
77 Self {
78 changed_row_addrs: group.changed_row_addrs.clone(),
79 old_fragments: group.old_frags.iter().map(|f| f.into()).collect(),
80 new_fragments: group.new_frags.iter().map(|f| f.into()).collect(),
81 }
82 }
83}
84
85impl TryFrom<pb::fragment_reuse_index_details::Group> for FragReuseGroup {
86 type Error = Error;
87
88 fn try_from(group: pb::fragment_reuse_index_details::Group) -> Result<Self> {
89 Ok(Self {
90 changed_row_addrs: group.changed_row_addrs,
91 old_frags: group
92 .old_fragments
93 .into_iter()
94 .map(FragDigest::try_from)
95 .collect::<Result<_>>()?,
96 new_frags: group
97 .new_fragments
98 .into_iter()
99 .map(FragDigest::try_from)
100 .collect::<Result<_>>()?,
101 })
102 }
103}
104
105#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
106pub struct FragReuseVersion {
107 pub dataset_version: u64,
108 pub groups: Vec<FragReuseGroup>,
109}
110
111impl From<&FragReuseVersion> for pb::fragment_reuse_index_details::Version {
112 fn from(version: &FragReuseVersion) -> Self {
113 Self {
114 dataset_version: version.dataset_version,
115 groups: version.groups.iter().map(|g| g.into()).collect(),
116 }
117 }
118}
119
120impl TryFrom<pb::fragment_reuse_index_details::Version> for FragReuseVersion {
121 type Error = Error;
122
123 fn try_from(version: pb::fragment_reuse_index_details::Version) -> Result<Self> {
124 Ok(Self {
125 dataset_version: version.dataset_version,
126 groups: version
127 .groups
128 .into_iter()
129 .map(FragReuseGroup::try_from)
130 .collect::<Result<_>>()?,
131 })
132 }
133}
134
135impl FragReuseVersion {
136 pub fn old_frag_ids(&self) -> Vec<u64> {
137 self.groups
138 .iter()
139 .flat_map(|g| g.old_frags.iter().map(|f| f.id))
140 .collect::<Vec<_>>()
141 }
142
143 pub fn new_frag_ids(&self) -> Vec<u64> {
144 self.groups
145 .iter()
146 .flat_map(|g| g.new_frags.iter().map(|f| f.id))
147 .collect::<Vec<_>>()
148 }
149
150 pub fn new_frag_bitmap(&self) -> RoaringBitmap {
151 RoaringBitmap::from_iter(self.new_frag_ids().iter().map(|&id| id as u32))
152 }
153}
154
155#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
156pub enum FragReuseIndexDetailsContentType {
157 Inline(FragReuseIndexDetails),
158 External(ExternalFile),
159}
160
161#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize, DeepSizeOf)]
162pub struct FragReuseIndexDetails {
163 pub versions: Vec<FragReuseVersion>,
164}
165
166impl From<&FragReuseIndexDetails> for InlineContent {
167 fn from(details: &FragReuseIndexDetails) -> Self {
168 Self {
169 versions: details
170 .versions
171 .iter()
172 .map(|m| m.into())
173 .sorted_by_key(|v: &pb::fragment_reuse_index_details::Version| v.dataset_version)
175 .collect(),
176 }
177 }
178}
179
180impl TryFrom<InlineContent> for FragReuseIndexDetails {
181 type Error = Error;
182
183 fn try_from(content: InlineContent) -> Result<Self> {
184 Ok(Self {
185 versions: content
186 .versions
187 .into_iter()
188 .map(|m| m.try_into())
189 .collect::<Result<Vec<_>>>()?,
190 })
191 }
192}
193
194impl FragReuseIndexDetails {
195 pub fn new_frag_bitmap(&self) -> RoaringBitmap {
196 RoaringBitmap::from_iter(
197 self.versions
198 .iter()
199 .flat_map(|v| v.new_frag_ids().into_iter().map(|id| id as u32)),
200 )
201 }
202}
203
204#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
208pub struct FragReuseIndex {
209 pub uuid: Uuid,
210 pub row_id_maps: Vec<HashMap<u64, Option<u64>>>,
211 pub details: FragReuseIndexDetails,
212}
213
214impl DeepSizeOf for FragReuseIndex {
215 fn deep_size_of_children(&self, cx: &mut Context) -> usize {
216 self.row_id_maps.deep_size_of_children(cx) + self.details.deep_size_of_children(cx)
217 }
218}
219
220impl FragReuseIndex {
221 pub fn new(
222 uuid: Uuid,
223 row_id_maps: Vec<HashMap<u64, Option<u64>>>,
224 details: FragReuseIndexDetails,
225 ) -> Self {
226 Self {
227 uuid,
228 row_id_maps,
229 details,
230 }
231 }
232
233 pub fn remap_row_id(&self, row_id: u64) -> Option<u64> {
234 let mut mapped_value = Some(row_id);
235 for row_id_map in self.row_id_maps.iter() {
236 if mapped_value.is_some() {
237 mapped_value = row_id_map
238 .get(&mapped_value.unwrap())
239 .copied()
240 .unwrap_or(mapped_value);
241 }
242 }
243
244 mapped_value
245 }
246
247 pub fn remap_row_addrs_tree_map(&self, row_addrs: &RowAddrTreeMap) -> RowAddrTreeMap {
248 RowAddrTreeMap::from_iter(row_addrs.row_addrs().unwrap().filter_map(|addr| {
249 let addr_as_u64 = u64::from(addr);
250 self.remap_row_id(addr_as_u64)
251 }))
252 }
253
254 pub fn remap_row_ids_roaring_tree_map(&self, row_ids: &RoaringTreemap) -> RoaringTreemap {
255 RoaringTreemap::from_iter(row_ids.iter().filter_map(|addr| self.remap_row_id(addr)))
256 }
257
258 pub fn remap_row_ids_record_batch(
264 &self,
265 batch: RecordBatch,
266 row_id_idx: usize,
267 ) -> Result<RecordBatch> {
268 assert_eq!(batch.schema().fields().len(), 2);
269 let other_column_idx = 1 - row_id_idx;
270 let row_ids = batch.column(row_id_idx).as_primitive::<UInt64Type>();
271 let (val_indices, new_row_ids): (Vec<u64>, Vec<u64>) = row_ids
272 .values()
273 .iter()
274 .enumerate()
275 .filter_map(|(idx, old_id)| {
276 self.remap_row_id(*old_id)
277 .map(|new_id| (idx as u64, new_id))
278 })
279 .unzip();
280 let new_val_indices = UInt64Array::from_iter_values(val_indices);
281 let new_vals =
282 arrow_select::take::take(batch.column(other_column_idx), &new_val_indices, None)?;
283
284 let mut batch_data: Vec<(usize, ArrayRef)> = vec![
285 (
286 row_id_idx,
287 Arc::new(UInt64Array::from_iter_values(new_row_ids)) as ArrayRef,
288 ),
289 (other_column_idx, Arc::new(new_vals)),
290 ];
291 batch_data.sort_by_key(|(i, _)| *i);
292 Ok(RecordBatch::try_new(
293 batch.schema(),
294 batch_data.into_iter().map(|(_, item)| item).collect(),
295 )?)
296 }
297
298 pub fn remap_row_ids_array(&self, array: ArrayRef) -> PrimitiveArray<UInt64Type> {
299 let primitive_array = array
300 .as_any()
301 .downcast_ref::<PrimitiveArray<UInt64Type>>()
302 .expect("expected row IDs to be uint64 array");
303 (0..primitive_array.len())
304 .map(|i| {
305 if primitive_array.is_null(i) {
306 None
307 } else {
308 self.remap_row_id(primitive_array.value(i))
309 }
310 })
311 .collect()
312 }
313
314 pub fn remap_fragment_bitmap(&self, fragment_bitmap: &mut RoaringBitmap) -> Result<()> {
315 for version in self.details.versions.iter() {
316 for group in version.groups.iter() {
317 let mut removed = 0;
318 for old_frag in group.old_frags.iter() {
319 if fragment_bitmap.remove(old_frag.id as u32) {
320 removed += 1;
321 }
322 }
323
324 if removed > 0 {
325 if removed != group.old_frags.len() {
326 tracing::warn!(
336 "Healing straddling fragment-reuse rewrite group in index bitmap: \
337 group {:?} was only partially indexed ({} of {} old fragments). \
338 Affected rows will use flat scan until the next optimize_indices.",
339 group.old_frags,
340 removed,
341 group.old_frags.len(),
342 );
343 continue;
344 }
345
346 for new_frag in group.new_frags.iter() {
347 fragment_bitmap.insert(new_frag.id as u32);
348 }
349 }
350 }
351 }
352 Ok(())
353 }
354}
355
356#[derive(Serialize)]
357struct FragReuseStatistics {
358 num_versions: usize,
359}
360
361#[async_trait]
362impl Index for FragReuseIndex {
363 fn as_any(&self) -> &dyn Any {
364 self
365 }
366
367 fn as_index(self: Arc<Self>) -> Arc<dyn Index> {
368 self
369 }
370
371 fn as_vector_index(self: Arc<Self>) -> Result<Arc<dyn crate::vector::VectorIndex>> {
372 Err(Error::not_supported_source(
373 "FragReuseIndex is not a vector index".into(),
374 ))
375 }
376
377 fn statistics(&self) -> Result<serde_json::Value> {
378 let stats = FragReuseStatistics {
379 num_versions: self.details.versions.len(),
380 };
381 serde_json::to_value(stats).map_err(|e| {
382 Error::internal(format!(
383 "failed to serialize fragment reuse index statistics: {}",
384 e
385 ))
386 })
387 }
388
389 async fn prewarm(&self) -> Result<()> {
390 Ok(())
391 }
392
393 fn index_type(&self) -> IndexType {
394 IndexType::FragmentReuse
395 }
396
397 async fn calculate_included_frags(&self) -> Result<RoaringBitmap> {
398 unimplemented!()
399 }
400}
401
402#[cfg(test)]
403mod tests {
404
405 use super::*;
406
407 #[tokio::test]
408 async fn test_serialize_deserialize_index_details() {
409 let version1 = FragReuseVersion {
411 dataset_version: 2,
412 groups: vec![FragReuseGroup {
413 changed_row_addrs: vec![1, 2, 3],
414 old_frags: vec![FragDigest {
415 id: 1,
416 physical_rows: 1,
417 num_deleted_rows: 0,
418 }],
419 new_frags: vec![
420 FragDigest {
421 id: 2,
422 physical_rows: 1,
423 num_deleted_rows: 0,
424 },
425 FragDigest {
426 id: 3,
427 physical_rows: 1,
428 num_deleted_rows: 0,
429 },
430 ],
431 }],
432 };
433
434 let version2 = FragReuseVersion {
435 dataset_version: 1,
436 groups: vec![FragReuseGroup {
437 changed_row_addrs: vec![4, 5, 6],
438 old_frags: vec![FragDigest {
439 id: 2,
440 physical_rows: 1,
441 num_deleted_rows: 0,
442 }],
443 new_frags: vec![
444 FragDigest {
445 id: 4,
446 physical_rows: 1,
447 num_deleted_rows: 0,
448 },
449 FragDigest {
450 id: 5,
451 physical_rows: 1,
452 num_deleted_rows: 0,
453 },
454 ],
455 }],
456 };
457
458 let details = FragReuseIndexDetails {
460 versions: vec![version1, version2],
461 };
462
463 let inline_content: InlineContent = (&details).into();
465
466 let roundtrip_details = FragReuseIndexDetails::try_from(inline_content).unwrap();
468
469 assert_eq!(roundtrip_details.versions.len(), 2);
471
472 assert_eq!(roundtrip_details.versions[0].dataset_version, 1);
474 assert_eq!(
475 roundtrip_details.versions[0].groups[0].changed_row_addrs,
476 vec![4, 5, 6]
477 );
478 assert_eq!(
479 roundtrip_details.versions[0].groups[0].new_frags,
480 vec![
481 FragDigest {
482 id: 4,
483 physical_rows: 1,
484 num_deleted_rows: 0,
485 },
486 FragDigest {
487 id: 5,
488 physical_rows: 1,
489 num_deleted_rows: 0,
490 }
491 ]
492 );
493 assert_eq!(
494 roundtrip_details.versions[0].groups[0].old_frags,
495 vec![FragDigest {
496 id: 2,
497 physical_rows: 1,
498 num_deleted_rows: 0,
499 }]
500 );
501
502 assert_eq!(roundtrip_details.versions[1].dataset_version, 2);
503 assert_eq!(
504 roundtrip_details.versions[1].groups[0].changed_row_addrs,
505 vec![1, 2, 3]
506 );
507 assert_eq!(
508 roundtrip_details.versions[1].groups[0].new_frags,
509 vec![
510 FragDigest {
511 id: 2,
512 physical_rows: 1,
513 num_deleted_rows: 0,
514 },
515 FragDigest {
516 id: 3,
517 physical_rows: 1,
518 num_deleted_rows: 0,
519 }
520 ]
521 );
522 assert_eq!(
523 roundtrip_details.versions[1].groups[0].old_frags,
524 vec![FragDigest {
525 id: 1,
526 physical_rows: 1,
527 num_deleted_rows: 0,
528 }]
529 );
530 }
531}