Skip to main content

lance/dataset/
transaction.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4//! Transaction definitions for updating datasets
5//!
6//! Prior to creating a new manifest, a transaction must be created representing
7//! the changes being made to the dataset. By representing them as incremental
8//! changes, we can detect whether concurrent operations are compatible with
9//! one another. We can also rebuild manifests when retrying committing a
10//! manifest.
11//!
12//! ## Conflict Resolution
13//!
14//! Transactions are compatible with one another if they don't conflict.
15//! Currently, conflict resolution always assumes a Serializable isolation
16//! level.
17//!
18//! Below are the compatibilities between conflicting transactions. The columns
19//! represent the operation that has been applied, while the rows represent the
20//! operation that is being checked for compatibility to see if it can retry.
21//! ✅ indicates that the operation is compatible, while ❌ indicates that it is
22//! a conflict. Some operations have additional conditions that must be met for
23//! them to be compatible.
24//!
25//! NOTE/TODO(rmeng): DataReplacement conflict resolution is not fully implemented
26//!
27//! |                  | Append | Delete / Update | Overwrite/Create | Create Index | Rewrite | Merge | Project | UpdateConfig | DataReplacement |
28//! |------------------|--------|-----------------|------------------|--------------|---------|-------|---------|--------------|-----------------|
29//! | Append           | ✅     | ✅              | ❌                | ✅           | ✅      | ❌     | ❌      | ✅           | ✅
30//! | Delete / Update  | ✅     | 1️⃣              | ❌                | ✅           | 1️⃣      | ❌     | ❌      | ✅           | ✅
31//! | Overwrite/Create | ✅     | ✅              | ✅                | ✅           | ✅      | ✅     | ✅      | 2️⃣           | ✅
32//! | Create index     | ✅     | ✅              | ❌                | ✅           | ✅      | ✅     | ✅      | ✅           | 3️⃣
33//! | Rewrite          | ✅     | 1️⃣              | ❌                | ❌           | 1️⃣      | ❌     | ❌      | ✅           | 3️⃣
34//! | Merge            | ❌     | ❌              | ❌                | ❌           | ✅      | ❌     | ❌      | ✅           | ✅
35//! | Project          | ✅     | ✅              | ❌                | ❌           | ✅      | ❌     | ✅      | ✅           | ✅
36//! | UpdateConfig     | ✅     | ✅              | 2️⃣                | ✅           | ✅      | ✅     | ✅      | 2️⃣           | ✅
37//! | DataReplacement  | ✅     | ✅              | ❌                | 3️⃣           | 1️⃣      | ✅     | 3️⃣      | ✅           | 3️⃣
38//!
39//! 1️⃣ Delete, update, and rewrite are compatible with each other and themselves only if
40//! they affect distinct fragments. Otherwise, they conflict.
41//! 2️⃣ Operations that mutate the config conflict if one of the operations upserts a key
42//! that if referenced by another concurrent operation or if both operations modify the schema
43//! metadata or the same field metadata.
44//! 3️⃣ DataReplacement on a column without index is compatible with any operation AS LONG AS
45//! the operation does not modify the region of the column being replaced.
46//!
47
48use super::ManifestWriteConfig;
49use super::write::merge_insert::inserted_rows::KeyExistenceFilter;
50use crate::dataset::transaction::UpdateMode::RewriteRows;
51use crate::index::mem_wal::update_mem_wal_index_merged_generations;
52use crate::utils::temporal::timestamp_to_nanos;
53use deepsize::DeepSizeOf;
54use lance_core::{Error, Result, datatypes::Schema};
55use lance_file::{datatypes::Fields, version::LanceFileVersion};
56use lance_index::mem_wal::MergedGeneration;
57use lance_index::{frag_reuse::FRAG_REUSE_INDEX_NAME, is_system_index};
58use lance_io::object_store::ObjectStore;
59use lance_table::feature_flags::{FLAG_STABLE_ROW_IDS, apply_feature_flags};
60use lance_table::rowids::read_row_ids;
61use lance_table::{
62    format::{
63        BasePath, DataFile, DataStorageFormat, Fragment, IndexMetadata, Manifest, RowIdMeta, pb,
64    },
65    io::{
66        commit::CommitHandler,
67        manifest::{read_manifest, read_manifest_indexes},
68    },
69    rowids::{RowIdSequence, write_row_ids},
70};
71use object_store::path::Path;
72use roaring::RoaringBitmap;
73use std::cmp::Ordering;
74use std::{
75    collections::{HashMap, HashSet},
76    sync::Arc,
77};
78use uuid::Uuid;
79
80/// A change to a dataset that can be retried
81///
82/// This contains enough information to be able to build the next manifest,
83/// given the current manifest.
84#[derive(Debug, Clone, DeepSizeOf, PartialEq)]
85pub struct Transaction {
86    /// The version of the table this transaction is based off of. If this is
87    /// the first transaction, this should be 0.
88    pub read_version: u64,
89    pub uuid: String,
90    pub operation: Operation,
91    pub tag: Option<String>,
92    pub transaction_properties: Option<Arc<HashMap<String, String>>>,
93}
94
95#[derive(Debug, Clone, DeepSizeOf, PartialEq)]
96pub struct DataReplacementGroup(pub u64, pub DataFile);
97
98/// An entry for a map update. If value is None, the key will be removed from the map.
99#[derive(Debug, Clone, DeepSizeOf, PartialEq)]
100pub struct UpdateMapEntry {
101    /// The key of the map entry to update.
102    pub key: String,
103    /// The value to set for the key.
104    pub value: Option<String>,
105}
106
107impl From<(String, Option<String>)> for UpdateMapEntry {
108    fn from((key, value): (String, Option<String>)) -> Self {
109        Self { key, value }
110    }
111}
112
113impl From<(String, String)> for UpdateMapEntry {
114    fn from((key, value): (String, String)) -> Self {
115        Self::from((key, Some(value)))
116    }
117}
118
119impl From<(&str, Option<&str>)> for UpdateMapEntry {
120    fn from((key, value): (&str, Option<&str>)) -> Self {
121        Self {
122            key: key.to_string(),
123            value: value.map(str::to_owned),
124        }
125    }
126}
127
128impl From<(&str, &str)> for UpdateMapEntry {
129    fn from((key, value): (&str, &str)) -> Self {
130        Self::from((key, Some(value)))
131    }
132}
133
134/// Represents updates to a map (either incremental or replacement)
135#[derive(Debug, Clone, DeepSizeOf, PartialEq)]
136pub struct UpdateMap {
137    pub update_entries: Vec<UpdateMapEntry>,
138    /// If true, the map will be replaced entirely with the new entries.
139    /// If false, the new entries will be merged with the existing map.
140    pub replace: bool,
141}
142
143/// An operation on a dataset.
144#[derive(Debug, Clone, DeepSizeOf)]
145pub enum Operation {
146    /// Adding new fragments to the dataset. The fragments contained within
147    /// haven't yet been assigned a final ID.
148    Append { fragments: Vec<Fragment> },
149    /// Updated fragments contain those that have been modified with new deletion
150    /// files. The deleted fragment IDs are those that should be removed from
151    /// the manifest.
152    Delete {
153        updated_fragments: Vec<Fragment>,
154        deleted_fragment_ids: Vec<u64>,
155        predicate: String,
156    },
157    /// Overwrite the entire dataset with the given fragments. This is also
158    /// used when initially creating a table.
159    Overwrite {
160        fragments: Vec<Fragment>,
161        schema: Schema,
162        config_upsert_values: Option<HashMap<String, String>>,
163        initial_bases: Option<Vec<BasePath>>,
164    },
165    /// A new index has been created.
166    CreateIndex {
167        /// The new secondary indices,
168        /// any existing indices with the same name will be replaced.
169        new_indices: Vec<IndexMetadata>,
170        /// The indices that have been modified.
171        removed_indices: Vec<IndexMetadata>,
172    },
173    /// Data is rewritten but *not* modified. This is used for things like
174    /// compaction or re-ordering. Contains the old fragments and the new
175    /// ones that have been replaced.
176    ///
177    /// This operation will modify the row addresses of existing rows and
178    /// so any existing index covering a rewritten fragment will need to be
179    /// remapped.
180    Rewrite {
181        /// Groups of fragments that have been modified
182        groups: Vec<RewriteGroup>,
183        /// Indices that have been updated with the new row addresses
184        rewritten_indices: Vec<RewrittenIndex>,
185        /// The fragment reuse index to be created or updated to
186        frag_reuse_index: Option<IndexMetadata>,
187    },
188    /// Replace data in a column in the dataset with new data. This is used for
189    /// null column population where we replace an entirely null column with a
190    /// new column that has data.
191    ///
192    /// This operation will only allow replacing files that contain the same schema
193    /// e.g. if the original files contain columns A, B, C and the new files contain
194    /// only columns A, B then the operation is not allowed. As we would need to split
195    /// the original files into two files, one with column A, B and the other with column C.
196    ///
197    /// Corollary to the above: the operation will also not allow replacing files unless the
198    /// affected columns all have the same datafile layout across the fragments being replaced.
199    ///
200    /// e.g. if fragments being replaced contain files with different schema layouts on
201    /// the column being replaced, the operation is not allowed.
202    /// say `frag_1: [A] [B, C]` and `frag_2: [A, B] [C]` and we are trying to replace column A
203    /// with a new column A, the operation is not allowed.
204    DataReplacement {
205        replacements: Vec<DataReplacementGroup>,
206    },
207    /// Merge a new column in
208    /// 'fragments' is the final fragments include all data files, the new fragments must align with old ones at rows.
209    /// 'schema' is not forced to include existed columns, which means we could use Merge to drop column data
210    Merge {
211        fragments: Vec<Fragment>,
212        schema: Schema,
213    },
214    /// Restore an old version of the database
215    Restore { version: u64 },
216    /// Reserves fragment ids for future use
217    /// This can be used when row ids need to be known before a transaction
218    /// has been committed.  It is used during a rewrite operation to allow
219    /// indices to be remapped to the new row ids as part of the operation.
220    ReserveFragments { num_fragments: u32 },
221
222    /// Update values in the dataset.
223    ///
224    /// Updates are generally vertical or horizontal.
225    ///
226    /// A vertical update adds new rows.  In this case, the updated_fragments
227    /// will only have existing rows deleted and will not have any new fields added.
228    /// All new data will be contained in new_fragments.
229    /// This is what is used by a merge_insert that matches the whole schema and what
230    /// is used by the dataset updater.
231    ///
232    /// A horizontal update adds new columns.  In this case, the updated fragments
233    /// may have fields removed or added.  It is even possible for a field to be tombstoned
234    /// and then added back in the same update. (which is a field modification).  If any
235    /// fields are modified in this way then they need to be added to the fields_modified list.
236    /// This way we can correctly update the indices.
237    /// This is what is used by a merge insert that does not match the whole schema.
238    Update {
239        /// Ids of fragments that have been moved
240        removed_fragment_ids: Vec<u64>,
241        /// Fragments that have been updated
242        updated_fragments: Vec<Fragment>,
243        /// Fragments that have been added
244        new_fragments: Vec<Fragment>,
245        /// The fields that have been modified
246        fields_modified: Vec<u32>,
247        /// List of MemWAL region generations to mark as merged after this transaction
248        merged_generations: Vec<MergedGeneration>,
249        /// The fields that used to judge whether to preserve the new frag's id into
250        /// the frag bitmap of the specified indices.
251        fields_for_preserving_frag_bitmap: Vec<u32>,
252        /// The mode of update
253        update_mode: Option<UpdateMode>,
254        /// Optional filter for detecting conflicts on inserted row keys.
255        /// Only tracks keys from INSERT operations during merge insert, not updates.
256        inserted_rows_filter: Option<KeyExistenceFilter>,
257    },
258
259    /// Project to a new schema. This only changes the schema, not the data.
260    Project { schema: Schema },
261
262    /// Update the dataset configuration.
263    UpdateConfig {
264        config_updates: Option<UpdateMap>,
265        table_metadata_updates: Option<UpdateMap>,
266        schema_metadata_updates: Option<UpdateMap>,
267        field_metadata_updates: HashMap<i32, UpdateMap>,
268    },
269    /// Update merged generations in MemWAL index.
270    /// This is used during merge-insert to atomically record which
271    /// generations have been merged to the base table.
272    UpdateMemWalState {
273        merged_generations: Vec<MergedGeneration>,
274    },
275
276    /// Clone a dataset.
277    Clone {
278        is_shallow: bool,
279        ref_name: Option<String>,
280        ref_version: u64,
281        ref_path: String,
282        branch_name: Option<String>,
283    },
284
285    // Update base paths in the dataset (currently only supports adding new bases).
286    UpdateBases {
287        /// The new base paths to add to the manifest.
288        new_bases: Vec<BasePath>,
289    },
290}
291
292#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
293pub enum UpdateMode {
294    /// rows are deleted in current fragments and rewritten in new fragments.
295    /// This is most optimal when the majority of columns are being rewritten
296    /// or only a few rows are being updated.
297    RewriteRows,
298
299    /// within each fragment, columns are fully rewritten and inserted as new data files.
300    /// Old versions of columns are tombstoned. This is most optimal when most rows are affected
301    /// but a small subset of columns are affected.
302    RewriteColumns,
303}
304
305impl std::fmt::Display for Operation {
306    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
307        match self {
308            Self::Append { .. } => write!(f, "Append"),
309            Self::Delete { .. } => write!(f, "Delete"),
310            Self::Overwrite { .. } => write!(f, "Overwrite"),
311            Self::CreateIndex { .. } => write!(f, "CreateIndex"),
312            Self::Rewrite { .. } => write!(f, "Rewrite"),
313            Self::Merge { .. } => write!(f, "Merge"),
314            Self::Restore { .. } => write!(f, "Restore"),
315            Self::ReserveFragments { .. } => write!(f, "ReserveFragments"),
316            Self::Update { .. } => write!(f, "Update"),
317            Self::Project { .. } => write!(f, "Project"),
318            Self::UpdateConfig { .. } => write!(f, "UpdateConfig"),
319            Self::DataReplacement { .. } => write!(f, "DataReplacement"),
320            Self::Clone { .. } => write!(f, "Clone"),
321            Self::UpdateMemWalState { .. } => write!(f, "UpdateMemWalState"),
322            Self::UpdateBases { .. } => write!(f, "UpdateBases"),
323        }
324    }
325}
326
327impl From<&Transaction> for lance_table::format::Transaction {
328    fn from(value: &Transaction) -> Self {
329        let pb_transaction: pb::Transaction = value.into();
330        Self {
331            inner: pb_transaction,
332        }
333    }
334}
335
336impl PartialEq for Operation {
337    fn eq(&self, other: &Self) -> bool {
338        // Many of the operations contain `Vec<T>` where the order of the
339        // elements don't matter. So we need to compare them in a way that
340        // ignores the order of the elements.
341        // TODO: we can make it so the vecs are always constructed in order.
342        // Then we can use `==` instead of `compare_vec`.
343        fn compare_vec<T: PartialEq>(a: &[T], b: &[T]) -> bool {
344            a.len() == b.len() && a.iter().all(|f| b.contains(f))
345        }
346        match (self, other) {
347            (Self::Append { fragments: a }, Self::Append { fragments: b }) => compare_vec(a, b),
348            (
349                Self::Clone {
350                    is_shallow: a_is_shallow,
351                    ref_name: a_ref_name,
352                    ref_version: a_ref_version,
353                    ref_path: a_source_path,
354                    branch_name: a_branch_name,
355                },
356                Self::Clone {
357                    is_shallow: b_is_shallow,
358                    ref_name: b_ref_name,
359                    ref_version: b_ref_version,
360                    ref_path: b_source_path,
361                    branch_name: b_branch_name,
362                },
363            ) => {
364                a_is_shallow == b_is_shallow
365                    && a_ref_name == b_ref_name
366                    && a_ref_version == b_ref_version
367                    && a_source_path == b_source_path
368                    && a_branch_name == b_branch_name
369            }
370            (
371                Self::Delete {
372                    updated_fragments: a_updated,
373                    deleted_fragment_ids: a_deleted,
374                    predicate: a_predicate,
375                },
376                Self::Delete {
377                    updated_fragments: b_updated,
378                    deleted_fragment_ids: b_deleted,
379                    predicate: b_predicate,
380                },
381            ) => {
382                compare_vec(a_updated, b_updated)
383                    && compare_vec(a_deleted, b_deleted)
384                    && a_predicate == b_predicate
385            }
386            (
387                Self::Overwrite {
388                    fragments: a_fragments,
389                    schema: a_schema,
390                    config_upsert_values: a_config,
391                    initial_bases: a_initial,
392                },
393                Self::Overwrite {
394                    fragments: b_fragments,
395                    schema: b_schema,
396                    config_upsert_values: b_config,
397                    initial_bases: b_initial,
398                },
399            ) => {
400                compare_vec(a_fragments, b_fragments)
401                    && a_schema == b_schema
402                    && a_config == b_config
403                    && a_initial == b_initial
404            }
405            (
406                Self::CreateIndex {
407                    new_indices: a_new,
408                    removed_indices: a_removed,
409                },
410                Self::CreateIndex {
411                    new_indices: b_new,
412                    removed_indices: b_removed,
413                },
414            ) => compare_vec(a_new, b_new) && compare_vec(a_removed, b_removed),
415            (
416                Self::Rewrite {
417                    groups: a_groups,
418                    rewritten_indices: a_indices,
419                    frag_reuse_index: a_frag_reuse_index,
420                },
421                Self::Rewrite {
422                    groups: b_groups,
423                    rewritten_indices: b_indices,
424                    frag_reuse_index: b_frag_reuse_index,
425                },
426            ) => {
427                compare_vec(a_groups, b_groups)
428                    && compare_vec(a_indices, b_indices)
429                    && a_frag_reuse_index == b_frag_reuse_index
430            }
431            (
432                Self::Merge {
433                    fragments: a_fragments,
434                    schema: a_schema,
435                },
436                Self::Merge {
437                    fragments: b_fragments,
438                    schema: b_schema,
439                },
440            ) => compare_vec(a_fragments, b_fragments) && a_schema == b_schema,
441            (Self::Restore { version: a }, Self::Restore { version: b }) => a == b,
442            (
443                Self::ReserveFragments { num_fragments: a },
444                Self::ReserveFragments { num_fragments: b },
445            ) => a == b,
446            (
447                Self::Update {
448                    removed_fragment_ids: a_removed,
449                    updated_fragments: a_updated,
450                    new_fragments: a_new,
451                    fields_modified: a_fields,
452                    merged_generations: a_merged_generations,
453                    fields_for_preserving_frag_bitmap: a_fields_for_preserving_frag_bitmap,
454                    update_mode: a_update_mode,
455                    inserted_rows_filter: a_inserted_rows_filter,
456                },
457                Self::Update {
458                    removed_fragment_ids: b_removed,
459                    updated_fragments: b_updated,
460                    new_fragments: b_new,
461                    fields_modified: b_fields,
462                    merged_generations: b_merged_generations,
463                    fields_for_preserving_frag_bitmap: b_fields_for_preserving_frag_bitmap,
464                    update_mode: b_update_mode,
465                    inserted_rows_filter: b_inserted_rows_filter,
466                },
467            ) => {
468                compare_vec(a_removed, b_removed)
469                    && compare_vec(a_updated, b_updated)
470                    && compare_vec(a_new, b_new)
471                    && compare_vec(a_fields, b_fields)
472                    && compare_vec(a_merged_generations, b_merged_generations)
473                    && compare_vec(
474                        a_fields_for_preserving_frag_bitmap,
475                        b_fields_for_preserving_frag_bitmap,
476                    )
477                    && a_update_mode == b_update_mode
478                    && a_inserted_rows_filter == b_inserted_rows_filter
479            }
480            (Self::Project { schema: a }, Self::Project { schema: b }) => a == b,
481            (
482                Self::UpdateConfig {
483                    config_updates: a_config,
484                    table_metadata_updates: a_table_metadata,
485                    schema_metadata_updates: a_schema,
486                    field_metadata_updates: a_field,
487                },
488                Self::UpdateConfig {
489                    config_updates: b_config,
490                    table_metadata_updates: b_table_metadata,
491                    schema_metadata_updates: b_schema,
492                    field_metadata_updates: b_field,
493                },
494            ) => {
495                a_config == b_config
496                    && a_table_metadata == b_table_metadata
497                    && a_schema == b_schema
498                    && a_field == b_field
499            }
500            (
501                Self::DataReplacement { replacements: a },
502                Self::DataReplacement { replacements: b },
503            ) => a.len() == b.len() && a.iter().all(|r| b.contains(r)),
504            // Handle all remaining combinations.
505            // We spell out all combinations explicitly to prevent
506            // us accidentally handling a new case in the wrong way.
507            (Self::Append { .. }, Self::Delete { .. }) => {
508                std::mem::discriminant(self) == std::mem::discriminant(other)
509            }
510            (Self::Append { .. }, Self::Overwrite { .. }) => {
511                std::mem::discriminant(self) == std::mem::discriminant(other)
512            }
513            (Self::Append { .. }, Self::CreateIndex { .. }) => {
514                std::mem::discriminant(self) == std::mem::discriminant(other)
515            }
516            (Self::Append { .. }, Self::Rewrite { .. }) => {
517                std::mem::discriminant(self) == std::mem::discriminant(other)
518            }
519            (Self::Append { .. }, Self::Merge { .. }) => {
520                std::mem::discriminant(self) == std::mem::discriminant(other)
521            }
522            (Self::Append { .. }, Self::Restore { .. }) => {
523                std::mem::discriminant(self) == std::mem::discriminant(other)
524            }
525            (Self::Append { .. }, Self::ReserveFragments { .. }) => {
526                std::mem::discriminant(self) == std::mem::discriminant(other)
527            }
528            (Self::Append { .. }, Self::Update { .. }) => {
529                std::mem::discriminant(self) == std::mem::discriminant(other)
530            }
531            (Self::Append { .. }, Self::Project { .. }) => {
532                std::mem::discriminant(self) == std::mem::discriminant(other)
533            }
534            (Self::Append { .. }, Self::UpdateConfig { .. }) => {
535                std::mem::discriminant(self) == std::mem::discriminant(other)
536            }
537            (Self::Append { .. }, Self::DataReplacement { .. }) => {
538                std::mem::discriminant(self) == std::mem::discriminant(other)
539            }
540            (Self::Append { .. }, Self::UpdateMemWalState { .. }) => {
541                std::mem::discriminant(self) == std::mem::discriminant(other)
542            }
543            (Self::Append { .. }, Self::Clone { .. }) => {
544                std::mem::discriminant(self) == std::mem::discriminant(other)
545            }
546
547            (Self::Delete { .. }, Self::Append { .. }) => {
548                std::mem::discriminant(self) == std::mem::discriminant(other)
549            }
550            (Self::Delete { .. }, Self::Overwrite { .. }) => {
551                std::mem::discriminant(self) == std::mem::discriminant(other)
552            }
553            (Self::Delete { .. }, Self::CreateIndex { .. }) => {
554                std::mem::discriminant(self) == std::mem::discriminant(other)
555            }
556            (Self::Delete { .. }, Self::Rewrite { .. }) => {
557                std::mem::discriminant(self) == std::mem::discriminant(other)
558            }
559            (Self::Delete { .. }, Self::Merge { .. }) => {
560                std::mem::discriminant(self) == std::mem::discriminant(other)
561            }
562            (Self::Delete { .. }, Self::Restore { .. }) => {
563                std::mem::discriminant(self) == std::mem::discriminant(other)
564            }
565            (Self::Delete { .. }, Self::ReserveFragments { .. }) => {
566                std::mem::discriminant(self) == std::mem::discriminant(other)
567            }
568            (Self::Delete { .. }, Self::Update { .. }) => {
569                std::mem::discriminant(self) == std::mem::discriminant(other)
570            }
571            (Self::Delete { .. }, Self::Project { .. }) => {
572                std::mem::discriminant(self) == std::mem::discriminant(other)
573            }
574            (Self::Delete { .. }, Self::UpdateConfig { .. }) => {
575                std::mem::discriminant(self) == std::mem::discriminant(other)
576            }
577            (Self::Delete { .. }, Self::DataReplacement { .. }) => {
578                std::mem::discriminant(self) == std::mem::discriminant(other)
579            }
580            (Self::Delete { .. }, Self::UpdateMemWalState { .. }) => {
581                std::mem::discriminant(self) == std::mem::discriminant(other)
582            }
583            (Self::Delete { .. }, Self::Clone { .. }) => {
584                std::mem::discriminant(self) == std::mem::discriminant(other)
585            }
586
587            (Self::Overwrite { .. }, Self::Append { .. }) => {
588                std::mem::discriminant(self) == std::mem::discriminant(other)
589            }
590            (Self::Overwrite { .. }, Self::Delete { .. }) => {
591                std::mem::discriminant(self) == std::mem::discriminant(other)
592            }
593            (Self::Overwrite { .. }, Self::CreateIndex { .. }) => {
594                std::mem::discriminant(self) == std::mem::discriminant(other)
595            }
596            (Self::Overwrite { .. }, Self::Rewrite { .. }) => {
597                std::mem::discriminant(self) == std::mem::discriminant(other)
598            }
599            (Self::Overwrite { .. }, Self::Merge { .. }) => {
600                std::mem::discriminant(self) == std::mem::discriminant(other)
601            }
602            (Self::Overwrite { .. }, Self::Restore { .. }) => {
603                std::mem::discriminant(self) == std::mem::discriminant(other)
604            }
605            (Self::Overwrite { .. }, Self::ReserveFragments { .. }) => {
606                std::mem::discriminant(self) == std::mem::discriminant(other)
607            }
608            (Self::Overwrite { .. }, Self::Update { .. }) => {
609                std::mem::discriminant(self) == std::mem::discriminant(other)
610            }
611            (Self::Overwrite { .. }, Self::Project { .. }) => {
612                std::mem::discriminant(self) == std::mem::discriminant(other)
613            }
614            (Self::Overwrite { .. }, Self::UpdateConfig { .. }) => {
615                std::mem::discriminant(self) == std::mem::discriminant(other)
616            }
617            (Self::Overwrite { .. }, Self::DataReplacement { .. }) => {
618                std::mem::discriminant(self) == std::mem::discriminant(other)
619            }
620            (Self::Overwrite { .. }, Self::UpdateMemWalState { .. }) => {
621                std::mem::discriminant(self) == std::mem::discriminant(other)
622            }
623            (Self::Overwrite { .. }, Self::Clone { .. }) => {
624                std::mem::discriminant(self) == std::mem::discriminant(other)
625            }
626
627            (Self::CreateIndex { .. }, Self::Append { .. }) => {
628                std::mem::discriminant(self) == std::mem::discriminant(other)
629            }
630            (Self::CreateIndex { .. }, Self::Delete { .. }) => {
631                std::mem::discriminant(self) == std::mem::discriminant(other)
632            }
633            (Self::CreateIndex { .. }, Self::Overwrite { .. }) => {
634                std::mem::discriminant(self) == std::mem::discriminant(other)
635            }
636            (Self::CreateIndex { .. }, Self::Rewrite { .. }) => {
637                std::mem::discriminant(self) == std::mem::discriminant(other)
638            }
639            (Self::CreateIndex { .. }, Self::Merge { .. }) => {
640                std::mem::discriminant(self) == std::mem::discriminant(other)
641            }
642            (Self::CreateIndex { .. }, Self::Restore { .. }) => {
643                std::mem::discriminant(self) == std::mem::discriminant(other)
644            }
645            (Self::CreateIndex { .. }, Self::ReserveFragments { .. }) => {
646                std::mem::discriminant(self) == std::mem::discriminant(other)
647            }
648            (Self::CreateIndex { .. }, Self::Update { .. }) => {
649                std::mem::discriminant(self) == std::mem::discriminant(other)
650            }
651            (Self::CreateIndex { .. }, Self::Project { .. }) => {
652                std::mem::discriminant(self) == std::mem::discriminant(other)
653            }
654            (Self::CreateIndex { .. }, Self::UpdateConfig { .. }) => {
655                std::mem::discriminant(self) == std::mem::discriminant(other)
656            }
657            (Self::CreateIndex { .. }, Self::DataReplacement { .. }) => {
658                std::mem::discriminant(self) == std::mem::discriminant(other)
659            }
660            (Self::CreateIndex { .. }, Self::UpdateMemWalState { .. }) => {
661                std::mem::discriminant(self) == std::mem::discriminant(other)
662            }
663            (Self::CreateIndex { .. }, Self::Clone { .. }) => {
664                std::mem::discriminant(self) == std::mem::discriminant(other)
665            }
666
667            (Self::Rewrite { .. }, Self::Append { .. }) => {
668                std::mem::discriminant(self) == std::mem::discriminant(other)
669            }
670            (Self::Rewrite { .. }, Self::Delete { .. }) => {
671                std::mem::discriminant(self) == std::mem::discriminant(other)
672            }
673            (Self::Rewrite { .. }, Self::Overwrite { .. }) => {
674                std::mem::discriminant(self) == std::mem::discriminant(other)
675            }
676            (Self::Rewrite { .. }, Self::CreateIndex { .. }) => {
677                std::mem::discriminant(self) == std::mem::discriminant(other)
678            }
679            (Self::Rewrite { .. }, Self::Merge { .. }) => {
680                std::mem::discriminant(self) == std::mem::discriminant(other)
681            }
682            (Self::Rewrite { .. }, Self::Restore { .. }) => {
683                std::mem::discriminant(self) == std::mem::discriminant(other)
684            }
685            (Self::Rewrite { .. }, Self::ReserveFragments { .. }) => {
686                std::mem::discriminant(self) == std::mem::discriminant(other)
687            }
688            (Self::Rewrite { .. }, Self::Update { .. }) => {
689                std::mem::discriminant(self) == std::mem::discriminant(other)
690            }
691            (Self::Rewrite { .. }, Self::Project { .. }) => {
692                std::mem::discriminant(self) == std::mem::discriminant(other)
693            }
694            (Self::Rewrite { .. }, Self::UpdateConfig { .. }) => {
695                std::mem::discriminant(self) == std::mem::discriminant(other)
696            }
697            (Self::Rewrite { .. }, Self::DataReplacement { .. }) => {
698                std::mem::discriminant(self) == std::mem::discriminant(other)
699            }
700            (Self::Rewrite { .. }, Self::UpdateMemWalState { .. }) => {
701                std::mem::discriminant(self) == std::mem::discriminant(other)
702            }
703            (Self::Rewrite { .. }, Self::Clone { .. }) => {
704                std::mem::discriminant(self) == std::mem::discriminant(other)
705            }
706
707            (Self::Merge { .. }, Self::Append { .. }) => {
708                std::mem::discriminant(self) == std::mem::discriminant(other)
709            }
710            (Self::Merge { .. }, Self::Delete { .. }) => {
711                std::mem::discriminant(self) == std::mem::discriminant(other)
712            }
713            (Self::Merge { .. }, Self::Overwrite { .. }) => {
714                std::mem::discriminant(self) == std::mem::discriminant(other)
715            }
716            (Self::Merge { .. }, Self::CreateIndex { .. }) => {
717                std::mem::discriminant(self) == std::mem::discriminant(other)
718            }
719            (Self::Merge { .. }, Self::Rewrite { .. }) => {
720                std::mem::discriminant(self) == std::mem::discriminant(other)
721            }
722            (Self::Merge { .. }, Self::Restore { .. }) => {
723                std::mem::discriminant(self) == std::mem::discriminant(other)
724            }
725            (Self::Merge { .. }, Self::ReserveFragments { .. }) => {
726                std::mem::discriminant(self) == std::mem::discriminant(other)
727            }
728            (Self::Merge { .. }, Self::Update { .. }) => {
729                std::mem::discriminant(self) == std::mem::discriminant(other)
730            }
731            (Self::Merge { .. }, Self::Project { .. }) => {
732                std::mem::discriminant(self) == std::mem::discriminant(other)
733            }
734            (Self::Merge { .. }, Self::UpdateConfig { .. }) => {
735                std::mem::discriminant(self) == std::mem::discriminant(other)
736            }
737            (Self::Merge { .. }, Self::DataReplacement { .. }) => {
738                std::mem::discriminant(self) == std::mem::discriminant(other)
739            }
740            (Self::Merge { .. }, Self::UpdateMemWalState { .. }) => {
741                std::mem::discriminant(self) == std::mem::discriminant(other)
742            }
743            (Self::Merge { .. }, Self::Clone { .. }) => {
744                std::mem::discriminant(self) == std::mem::discriminant(other)
745            }
746
747            (Self::Restore { .. }, Self::Append { .. }) => {
748                std::mem::discriminant(self) == std::mem::discriminant(other)
749            }
750            (Self::Restore { .. }, Self::Delete { .. }) => {
751                std::mem::discriminant(self) == std::mem::discriminant(other)
752            }
753            (Self::Restore { .. }, Self::Overwrite { .. }) => {
754                std::mem::discriminant(self) == std::mem::discriminant(other)
755            }
756            (Self::Restore { .. }, Self::CreateIndex { .. }) => {
757                std::mem::discriminant(self) == std::mem::discriminant(other)
758            }
759            (Self::Restore { .. }, Self::Rewrite { .. }) => {
760                std::mem::discriminant(self) == std::mem::discriminant(other)
761            }
762            (Self::Restore { .. }, Self::Merge { .. }) => {
763                std::mem::discriminant(self) == std::mem::discriminant(other)
764            }
765            (Self::Restore { .. }, Self::ReserveFragments { .. }) => {
766                std::mem::discriminant(self) == std::mem::discriminant(other)
767            }
768            (Self::Restore { .. }, Self::Update { .. }) => {
769                std::mem::discriminant(self) == std::mem::discriminant(other)
770            }
771            (Self::Restore { .. }, Self::Project { .. }) => {
772                std::mem::discriminant(self) == std::mem::discriminant(other)
773            }
774            (Self::Restore { .. }, Self::UpdateConfig { .. }) => {
775                std::mem::discriminant(self) == std::mem::discriminant(other)
776            }
777            (Self::Restore { .. }, Self::DataReplacement { .. }) => {
778                std::mem::discriminant(self) == std::mem::discriminant(other)
779            }
780            (Self::Restore { .. }, Self::UpdateMemWalState { .. }) => {
781                std::mem::discriminant(self) == std::mem::discriminant(other)
782            }
783            (Self::Restore { .. }, Self::Clone { .. }) => {
784                std::mem::discriminant(self) == std::mem::discriminant(other)
785            }
786
787            (Self::ReserveFragments { .. }, Self::Append { .. }) => {
788                std::mem::discriminant(self) == std::mem::discriminant(other)
789            }
790            (Self::ReserveFragments { .. }, Self::Delete { .. }) => {
791                std::mem::discriminant(self) == std::mem::discriminant(other)
792            }
793            (Self::ReserveFragments { .. }, Self::Overwrite { .. }) => {
794                std::mem::discriminant(self) == std::mem::discriminant(other)
795            }
796            (Self::ReserveFragments { .. }, Self::CreateIndex { .. }) => {
797                std::mem::discriminant(self) == std::mem::discriminant(other)
798            }
799            (Self::ReserveFragments { .. }, Self::Rewrite { .. }) => {
800                std::mem::discriminant(self) == std::mem::discriminant(other)
801            }
802            (Self::ReserveFragments { .. }, Self::Merge { .. }) => {
803                std::mem::discriminant(self) == std::mem::discriminant(other)
804            }
805            (Self::ReserveFragments { .. }, Self::Restore { .. }) => {
806                std::mem::discriminant(self) == std::mem::discriminant(other)
807            }
808            (Self::ReserveFragments { .. }, Self::Update { .. }) => {
809                std::mem::discriminant(self) == std::mem::discriminant(other)
810            }
811            (Self::ReserveFragments { .. }, Self::Project { .. }) => {
812                std::mem::discriminant(self) == std::mem::discriminant(other)
813            }
814            (Self::ReserveFragments { .. }, Self::UpdateConfig { .. }) => {
815                std::mem::discriminant(self) == std::mem::discriminant(other)
816            }
817            (Self::ReserveFragments { .. }, Self::DataReplacement { .. }) => {
818                std::mem::discriminant(self) == std::mem::discriminant(other)
819            }
820            (Self::ReserveFragments { .. }, Self::UpdateMemWalState { .. }) => {
821                std::mem::discriminant(self) == std::mem::discriminant(other)
822            }
823            (Self::ReserveFragments { .. }, Self::Clone { .. }) => {
824                std::mem::discriminant(self) == std::mem::discriminant(other)
825            }
826
827            (Self::Update { .. }, Self::Append { .. }) => {
828                std::mem::discriminant(self) == std::mem::discriminant(other)
829            }
830            (Self::Update { .. }, Self::Delete { .. }) => {
831                std::mem::discriminant(self) == std::mem::discriminant(other)
832            }
833            (Self::Update { .. }, Self::Overwrite { .. }) => {
834                std::mem::discriminant(self) == std::mem::discriminant(other)
835            }
836            (Self::Update { .. }, Self::CreateIndex { .. }) => {
837                std::mem::discriminant(self) == std::mem::discriminant(other)
838            }
839            (Self::Update { .. }, Self::Rewrite { .. }) => {
840                std::mem::discriminant(self) == std::mem::discriminant(other)
841            }
842            (Self::Update { .. }, Self::Merge { .. }) => {
843                std::mem::discriminant(self) == std::mem::discriminant(other)
844            }
845            (Self::Update { .. }, Self::Restore { .. }) => {
846                std::mem::discriminant(self) == std::mem::discriminant(other)
847            }
848            (Self::Update { .. }, Self::ReserveFragments { .. }) => {
849                std::mem::discriminant(self) == std::mem::discriminant(other)
850            }
851            (Self::Update { .. }, Self::Project { .. }) => {
852                std::mem::discriminant(self) == std::mem::discriminant(other)
853            }
854            (Self::Update { .. }, Self::UpdateConfig { .. }) => {
855                std::mem::discriminant(self) == std::mem::discriminant(other)
856            }
857            (Self::Update { .. }, Self::DataReplacement { .. }) => {
858                std::mem::discriminant(self) == std::mem::discriminant(other)
859            }
860            (Self::Update { .. }, Self::UpdateMemWalState { .. }) => {
861                std::mem::discriminant(self) == std::mem::discriminant(other)
862            }
863            (Self::Update { .. }, Self::Clone { .. }) => {
864                std::mem::discriminant(self) == std::mem::discriminant(other)
865            }
866
867            (Self::Project { .. }, Self::Append { .. }) => {
868                std::mem::discriminant(self) == std::mem::discriminant(other)
869            }
870            (Self::Project { .. }, Self::Delete { .. }) => {
871                std::mem::discriminant(self) == std::mem::discriminant(other)
872            }
873            (Self::Project { .. }, Self::Overwrite { .. }) => {
874                std::mem::discriminant(self) == std::mem::discriminant(other)
875            }
876            (Self::Project { .. }, Self::CreateIndex { .. }) => {
877                std::mem::discriminant(self) == std::mem::discriminant(other)
878            }
879            (Self::Project { .. }, Self::Rewrite { .. }) => {
880                std::mem::discriminant(self) == std::mem::discriminant(other)
881            }
882            (Self::Project { .. }, Self::Merge { .. }) => {
883                std::mem::discriminant(self) == std::mem::discriminant(other)
884            }
885            (Self::Project { .. }, Self::Restore { .. }) => {
886                std::mem::discriminant(self) == std::mem::discriminant(other)
887            }
888            (Self::Project { .. }, Self::ReserveFragments { .. }) => {
889                std::mem::discriminant(self) == std::mem::discriminant(other)
890            }
891            (Self::Project { .. }, Self::Update { .. }) => {
892                std::mem::discriminant(self) == std::mem::discriminant(other)
893            }
894            (Self::Project { .. }, Self::UpdateConfig { .. }) => {
895                std::mem::discriminant(self) == std::mem::discriminant(other)
896            }
897            (Self::Project { .. }, Self::DataReplacement { .. }) => {
898                std::mem::discriminant(self) == std::mem::discriminant(other)
899            }
900            (Self::Project { .. }, Self::UpdateMemWalState { .. }) => {
901                std::mem::discriminant(self) == std::mem::discriminant(other)
902            }
903            (Self::Project { .. }, Self::Clone { .. }) => {
904                std::mem::discriminant(self) == std::mem::discriminant(other)
905            }
906
907            (Self::UpdateConfig { .. }, Self::Append { .. }) => {
908                std::mem::discriminant(self) == std::mem::discriminant(other)
909            }
910            (Self::UpdateConfig { .. }, Self::Delete { .. }) => {
911                std::mem::discriminant(self) == std::mem::discriminant(other)
912            }
913            (Self::UpdateConfig { .. }, Self::Overwrite { .. }) => {
914                std::mem::discriminant(self) == std::mem::discriminant(other)
915            }
916            (Self::UpdateConfig { .. }, Self::CreateIndex { .. }) => {
917                std::mem::discriminant(self) == std::mem::discriminant(other)
918            }
919            (Self::UpdateConfig { .. }, Self::Rewrite { .. }) => {
920                std::mem::discriminant(self) == std::mem::discriminant(other)
921            }
922            (Self::UpdateConfig { .. }, Self::Merge { .. }) => {
923                std::mem::discriminant(self) == std::mem::discriminant(other)
924            }
925            (Self::UpdateConfig { .. }, Self::Restore { .. }) => {
926                std::mem::discriminant(self) == std::mem::discriminant(other)
927            }
928            (Self::UpdateConfig { .. }, Self::ReserveFragments { .. }) => {
929                std::mem::discriminant(self) == std::mem::discriminant(other)
930            }
931            (Self::UpdateConfig { .. }, Self::Update { .. }) => {
932                std::mem::discriminant(self) == std::mem::discriminant(other)
933            }
934            (Self::UpdateConfig { .. }, Self::Project { .. }) => {
935                std::mem::discriminant(self) == std::mem::discriminant(other)
936            }
937            (Self::UpdateConfig { .. }, Self::DataReplacement { .. }) => {
938                std::mem::discriminant(self) == std::mem::discriminant(other)
939            }
940            (Self::UpdateConfig { .. }, Self::UpdateMemWalState { .. }) => {
941                std::mem::discriminant(self) == std::mem::discriminant(other)
942            }
943            (Self::UpdateConfig { .. }, Self::Clone { .. }) => {
944                std::mem::discriminant(self) == std::mem::discriminant(other)
945            }
946
947            (Self::DataReplacement { .. }, Self::Append { .. }) => {
948                std::mem::discriminant(self) == std::mem::discriminant(other)
949            }
950            (Self::DataReplacement { .. }, Self::Delete { .. }) => {
951                std::mem::discriminant(self) == std::mem::discriminant(other)
952            }
953            (Self::DataReplacement { .. }, Self::Overwrite { .. }) => {
954                std::mem::discriminant(self) == std::mem::discriminant(other)
955            }
956            (Self::DataReplacement { .. }, Self::CreateIndex { .. }) => {
957                std::mem::discriminant(self) == std::mem::discriminant(other)
958            }
959            (Self::DataReplacement { .. }, Self::Rewrite { .. }) => {
960                std::mem::discriminant(self) == std::mem::discriminant(other)
961            }
962            (Self::DataReplacement { .. }, Self::Merge { .. }) => {
963                std::mem::discriminant(self) == std::mem::discriminant(other)
964            }
965            (Self::DataReplacement { .. }, Self::Restore { .. }) => {
966                std::mem::discriminant(self) == std::mem::discriminant(other)
967            }
968            (Self::DataReplacement { .. }, Self::ReserveFragments { .. }) => {
969                std::mem::discriminant(self) == std::mem::discriminant(other)
970            }
971            (Self::DataReplacement { .. }, Self::Update { .. }) => {
972                std::mem::discriminant(self) == std::mem::discriminant(other)
973            }
974            (Self::DataReplacement { .. }, Self::Project { .. }) => {
975                std::mem::discriminant(self) == std::mem::discriminant(other)
976            }
977            (Self::DataReplacement { .. }, Self::UpdateConfig { .. }) => {
978                std::mem::discriminant(self) == std::mem::discriminant(other)
979            }
980            (Self::DataReplacement { .. }, Self::UpdateMemWalState { .. }) => {
981                std::mem::discriminant(self) == std::mem::discriminant(other)
982            }
983            (Self::DataReplacement { .. }, Self::Clone { .. }) => {
984                std::mem::discriminant(self) == std::mem::discriminant(other)
985            }
986
987            (Self::UpdateMemWalState { .. }, Self::Append { .. }) => {
988                std::mem::discriminant(self) == std::mem::discriminant(other)
989            }
990            (Self::UpdateMemWalState { .. }, Self::Delete { .. }) => {
991                std::mem::discriminant(self) == std::mem::discriminant(other)
992            }
993            (Self::UpdateMemWalState { .. }, Self::Overwrite { .. }) => {
994                std::mem::discriminant(self) == std::mem::discriminant(other)
995            }
996            (Self::UpdateMemWalState { .. }, Self::CreateIndex { .. }) => {
997                std::mem::discriminant(self) == std::mem::discriminant(other)
998            }
999            (Self::UpdateMemWalState { .. }, Self::Rewrite { .. }) => {
1000                std::mem::discriminant(self) == std::mem::discriminant(other)
1001            }
1002            (Self::UpdateMemWalState { .. }, Self::Merge { .. }) => {
1003                std::mem::discriminant(self) == std::mem::discriminant(other)
1004            }
1005            (Self::UpdateMemWalState { .. }, Self::Restore { .. }) => {
1006                std::mem::discriminant(self) == std::mem::discriminant(other)
1007            }
1008            (Self::UpdateMemWalState { .. }, Self::ReserveFragments { .. }) => {
1009                std::mem::discriminant(self) == std::mem::discriminant(other)
1010            }
1011            (Self::UpdateMemWalState { .. }, Self::Update { .. }) => {
1012                std::mem::discriminant(self) == std::mem::discriminant(other)
1013            }
1014            (Self::UpdateMemWalState { .. }, Self::Project { .. }) => {
1015                std::mem::discriminant(self) == std::mem::discriminant(other)
1016            }
1017            (Self::UpdateMemWalState { .. }, Self::UpdateConfig { .. }) => {
1018                std::mem::discriminant(self) == std::mem::discriminant(other)
1019            }
1020            (Self::UpdateMemWalState { .. }, Self::DataReplacement { .. }) => {
1021                std::mem::discriminant(self) == std::mem::discriminant(other)
1022            }
1023            (Self::UpdateMemWalState { .. }, Self::Clone { .. }) => {
1024                std::mem::discriminant(self) == std::mem::discriminant(other)
1025            }
1026            (
1027                Self::UpdateMemWalState {
1028                    merged_generations: a_merged,
1029                },
1030                Self::UpdateMemWalState {
1031                    merged_generations: b_merged,
1032                },
1033            ) => compare_vec(a_merged, b_merged),
1034            (Self::Clone { .. }, Self::Append { .. }) => {
1035                std::mem::discriminant(self) == std::mem::discriminant(other)
1036            }
1037            (Self::Clone { .. }, Self::Delete { .. }) => {
1038                std::mem::discriminant(self) == std::mem::discriminant(other)
1039            }
1040            (Self::Clone { .. }, Self::Overwrite { .. }) => {
1041                std::mem::discriminant(self) == std::mem::discriminant(other)
1042            }
1043            (Self::Clone { .. }, Self::CreateIndex { .. }) => {
1044                std::mem::discriminant(self) == std::mem::discriminant(other)
1045            }
1046            (Self::Clone { .. }, Self::Rewrite { .. }) => {
1047                std::mem::discriminant(self) == std::mem::discriminant(other)
1048            }
1049            (Self::Clone { .. }, Self::Merge { .. }) => {
1050                std::mem::discriminant(self) == std::mem::discriminant(other)
1051            }
1052            (Self::Clone { .. }, Self::Restore { .. }) => {
1053                std::mem::discriminant(self) == std::mem::discriminant(other)
1054            }
1055            (Self::Clone { .. }, Self::ReserveFragments { .. }) => {
1056                std::mem::discriminant(self) == std::mem::discriminant(other)
1057            }
1058            (Self::Clone { .. }, Self::Update { .. }) => {
1059                std::mem::discriminant(self) == std::mem::discriminant(other)
1060            }
1061            (Self::Clone { .. }, Self::Project { .. }) => {
1062                std::mem::discriminant(self) == std::mem::discriminant(other)
1063            }
1064            (Self::Clone { .. }, Self::UpdateConfig { .. }) => {
1065                std::mem::discriminant(self) == std::mem::discriminant(other)
1066            }
1067            (Self::Clone { .. }, Self::DataReplacement { .. }) => {
1068                std::mem::discriminant(self) == std::mem::discriminant(other)
1069            }
1070            (Self::Clone { .. }, Self::UpdateMemWalState { .. }) => {
1071                std::mem::discriminant(self) == std::mem::discriminant(other)
1072            }
1073
1074            (Self::UpdateBases { new_bases: a }, Self::UpdateBases { new_bases: b }) => {
1075                compare_vec(a, b)
1076            }
1077
1078            (Self::UpdateBases { .. }, Self::Append { .. }) => {
1079                std::mem::discriminant(self) == std::mem::discriminant(other)
1080            }
1081            (Self::UpdateBases { .. }, Self::Delete { .. }) => {
1082                std::mem::discriminant(self) == std::mem::discriminant(other)
1083            }
1084            (Self::UpdateBases { .. }, Self::Overwrite { .. }) => {
1085                std::mem::discriminant(self) == std::mem::discriminant(other)
1086            }
1087            (Self::UpdateBases { .. }, Self::CreateIndex { .. }) => {
1088                std::mem::discriminant(self) == std::mem::discriminant(other)
1089            }
1090            (Self::UpdateBases { .. }, Self::Rewrite { .. }) => {
1091                std::mem::discriminant(self) == std::mem::discriminant(other)
1092            }
1093            (Self::UpdateBases { .. }, Self::Merge { .. }) => {
1094                std::mem::discriminant(self) == std::mem::discriminant(other)
1095            }
1096            (Self::UpdateBases { .. }, Self::Restore { .. }) => {
1097                std::mem::discriminant(self) == std::mem::discriminant(other)
1098            }
1099            (Self::UpdateBases { .. }, Self::ReserveFragments { .. }) => {
1100                std::mem::discriminant(self) == std::mem::discriminant(other)
1101            }
1102            (Self::UpdateBases { .. }, Self::Update { .. }) => {
1103                std::mem::discriminant(self) == std::mem::discriminant(other)
1104            }
1105            (Self::UpdateBases { .. }, Self::Project { .. }) => {
1106                std::mem::discriminant(self) == std::mem::discriminant(other)
1107            }
1108            (Self::UpdateBases { .. }, Self::UpdateConfig { .. }) => {
1109                std::mem::discriminant(self) == std::mem::discriminant(other)
1110            }
1111            (Self::UpdateBases { .. }, Self::DataReplacement { .. }) => {
1112                std::mem::discriminant(self) == std::mem::discriminant(other)
1113            }
1114            (Self::UpdateBases { .. }, Self::UpdateMemWalState { .. }) => {
1115                std::mem::discriminant(self) == std::mem::discriminant(other)
1116            }
1117            (Self::UpdateBases { .. }, Self::Clone { .. }) => {
1118                std::mem::discriminant(self) == std::mem::discriminant(other)
1119            }
1120
1121            (Self::Append { .. }, Self::UpdateBases { .. }) => {
1122                std::mem::discriminant(self) == std::mem::discriminant(other)
1123            }
1124            (Self::Delete { .. }, Self::UpdateBases { .. }) => {
1125                std::mem::discriminant(self) == std::mem::discriminant(other)
1126            }
1127            (Self::Overwrite { .. }, Self::UpdateBases { .. }) => {
1128                std::mem::discriminant(self) == std::mem::discriminant(other)
1129            }
1130            (Self::CreateIndex { .. }, Self::UpdateBases { .. }) => {
1131                std::mem::discriminant(self) == std::mem::discriminant(other)
1132            }
1133            (Self::Rewrite { .. }, Self::UpdateBases { .. }) => {
1134                std::mem::discriminant(self) == std::mem::discriminant(other)
1135            }
1136            (Self::Merge { .. }, Self::UpdateBases { .. }) => {
1137                std::mem::discriminant(self) == std::mem::discriminant(other)
1138            }
1139            (Self::Restore { .. }, Self::UpdateBases { .. }) => {
1140                std::mem::discriminant(self) == std::mem::discriminant(other)
1141            }
1142            (Self::ReserveFragments { .. }, Self::UpdateBases { .. }) => {
1143                std::mem::discriminant(self) == std::mem::discriminant(other)
1144            }
1145            (Self::Update { .. }, Self::UpdateBases { .. }) => {
1146                std::mem::discriminant(self) == std::mem::discriminant(other)
1147            }
1148            (Self::Project { .. }, Self::UpdateBases { .. }) => {
1149                std::mem::discriminant(self) == std::mem::discriminant(other)
1150            }
1151            (Self::UpdateConfig { .. }, Self::UpdateBases { .. }) => {
1152                std::mem::discriminant(self) == std::mem::discriminant(other)
1153            }
1154            (Self::DataReplacement { .. }, Self::UpdateBases { .. }) => {
1155                std::mem::discriminant(self) == std::mem::discriminant(other)
1156            }
1157            (Self::UpdateMemWalState { .. }, Self::UpdateBases { .. }) => {
1158                std::mem::discriminant(self) == std::mem::discriminant(other)
1159            }
1160            (Self::Clone { .. }, Self::UpdateBases { .. }) => {
1161                std::mem::discriminant(self) == std::mem::discriminant(other)
1162            }
1163        }
1164    }
1165}
1166
1167#[derive(Debug, Clone, PartialEq)]
1168pub struct RewrittenIndex {
1169    pub old_id: Uuid,
1170    pub new_id: Uuid,
1171    pub new_index_details: prost_types::Any,
1172    pub new_index_version: u32,
1173}
1174
1175impl DeepSizeOf for RewrittenIndex {
1176    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
1177        self.new_index_details
1178            .type_url
1179            .deep_size_of_children(context)
1180            + self.new_index_details.value.deep_size_of_children(context)
1181    }
1182}
1183
1184#[derive(Debug, Clone, DeepSizeOf)]
1185pub struct RewriteGroup {
1186    pub old_fragments: Vec<Fragment>,
1187    pub new_fragments: Vec<Fragment>,
1188}
1189
1190impl PartialEq for RewriteGroup {
1191    fn eq(&self, other: &Self) -> bool {
1192        fn compare_vec<T: PartialEq>(a: &[T], b: &[T]) -> bool {
1193            a.len() == b.len() && a.iter().all(|f| b.contains(f))
1194        }
1195        compare_vec(&self.old_fragments, &other.old_fragments)
1196            && compare_vec(&self.new_fragments, &other.new_fragments)
1197    }
1198}
1199
1200impl Operation {
1201    /// Returns the config keys that have been upserted by this operation.
1202    fn get_upsert_config_keys(&self) -> Vec<String> {
1203        match self {
1204            Self::Overwrite {
1205                config_upsert_values: Some(upsert_values),
1206                ..
1207            } => {
1208                let vec: Vec<String> = upsert_values.keys().cloned().collect();
1209                vec
1210            }
1211            Self::UpdateConfig {
1212                config_updates: Some(config_updates),
1213                ..
1214            } => config_updates
1215                .update_entries
1216                .iter()
1217                .filter_map(|entry| {
1218                    if entry.value.is_some() {
1219                        Some(entry.key.clone())
1220                    } else {
1221                        None
1222                    }
1223                })
1224                .collect(),
1225            _ => Vec::<String>::new(),
1226        }
1227    }
1228
1229    /// Returns the config keys that have been deleted by this operation.
1230    fn get_delete_config_keys(&self) -> Vec<String> {
1231        match self {
1232            Self::UpdateConfig {
1233                config_updates: Some(config_updates),
1234                ..
1235            } => config_updates
1236                .update_entries
1237                .iter()
1238                .filter_map(|entry| {
1239                    if entry.value.is_none() {
1240                        Some(entry.key.clone())
1241                    } else {
1242                        None
1243                    }
1244                })
1245                .collect(),
1246            _ => Vec::<String>::new(),
1247        }
1248    }
1249
1250    pub(crate) fn modifies_same_metadata(&self, other: &Self) -> bool {
1251        match (self, other) {
1252            (
1253                Self::UpdateConfig {
1254                    schema_metadata_updates,
1255                    field_metadata_updates,
1256                    ..
1257                },
1258                Self::UpdateConfig {
1259                    schema_metadata_updates: other_schema_metadata,
1260                    field_metadata_updates: other_field_metadata,
1261                    ..
1262                },
1263            ) => {
1264                if schema_metadata_updates.is_some() && other_schema_metadata.is_some() {
1265                    return true;
1266                }
1267                if !field_metadata_updates.is_empty() && !other_field_metadata.is_empty() {
1268                    for field in field_metadata_updates.keys() {
1269                        if other_field_metadata.contains_key(field) {
1270                            return true;
1271                        }
1272                    }
1273                }
1274                false
1275            }
1276            _ => false,
1277        }
1278    }
1279
1280    /// Check whether another operation upserts a key that is referenced by another operation
1281    pub(crate) fn upsert_key_conflict(&self, other: &Self) -> bool {
1282        let self_upsert_keys = self.get_upsert_config_keys();
1283        let other_upsert_keys = other.get_upsert_config_keys();
1284
1285        let self_delete_keys = self.get_delete_config_keys();
1286        let other_delete_keys = other.get_delete_config_keys();
1287
1288        self_upsert_keys
1289            .iter()
1290            .any(|x| other_upsert_keys.contains(x) || other_delete_keys.contains(x))
1291            || other_upsert_keys
1292                .iter()
1293                .any(|x| self_upsert_keys.contains(x) || self_delete_keys.contains(x))
1294    }
1295
1296    pub fn name(&self) -> &str {
1297        match self {
1298            Self::Append { .. } => "Append",
1299            Self::Delete { .. } => "Delete",
1300            Self::Overwrite { .. } => "Overwrite",
1301            Self::CreateIndex { .. } => "CreateIndex",
1302            Self::Rewrite { .. } => "Rewrite",
1303            Self::Merge { .. } => "Merge",
1304            Self::ReserveFragments { .. } => "ReserveFragments",
1305            Self::Restore { .. } => "Restore",
1306            Self::Update { .. } => "Update",
1307            Self::Project { .. } => "Project",
1308            Self::UpdateConfig { .. } => "UpdateConfig",
1309            Self::DataReplacement { .. } => "DataReplacement",
1310            Self::UpdateMemWalState { .. } => "UpdateMemWalState",
1311            Self::Clone { .. } => "Clone",
1312            Self::UpdateBases { .. } => "UpdateBases",
1313        }
1314    }
1315}
1316
1317/// Helper function to apply UpdateMap changes to a HashMap<String, String>
1318fn apply_update_map(
1319    target: &mut std::collections::HashMap<String, String>,
1320    update_map: &UpdateMap,
1321) {
1322    if update_map.replace {
1323        // Full replacement - clear existing and replace with new entries that have values
1324        target.clear();
1325        for entry in &update_map.update_entries {
1326            if let Some(value) = &entry.value {
1327                target.insert(entry.key.clone(), value.clone());
1328            }
1329        }
1330    } else {
1331        // Incremental update - merge entries
1332        for entry in &update_map.update_entries {
1333            if let Some(value) = &entry.value {
1334                target.insert(entry.key.clone(), value.clone());
1335            } else {
1336                target.remove(&entry.key);
1337            }
1338        }
1339    }
1340}
1341
1342/// Helper function to translate old-style config updates to new UpdateMap format
1343pub fn translate_config_updates(
1344    upsert_values: &std::collections::HashMap<String, String>,
1345    delete_keys: &[String],
1346) -> UpdateMap {
1347    let mut update_entries = Vec::new();
1348
1349    // Add upsert entries (with values)
1350    for (key, value) in upsert_values {
1351        update_entries.push(UpdateMapEntry {
1352            key: key.clone(),
1353            value: Some(value.clone()),
1354        });
1355    }
1356
1357    // Add delete entries (without values)
1358    for key in delete_keys {
1359        update_entries.push(UpdateMapEntry {
1360            key: key.clone(),
1361            value: None,
1362        });
1363    }
1364
1365    UpdateMap {
1366        update_entries,
1367        replace: false, // Old style was always incremental
1368    }
1369}
1370
1371/// Helper function to translate old-style schema metadata to new UpdateMap format
1372pub fn translate_schema_metadata_updates(
1373    schema_metadata: &std::collections::HashMap<String, String>,
1374) -> UpdateMap {
1375    let update_entries = schema_metadata
1376        .iter()
1377        .map(|(key, value)| UpdateMapEntry {
1378            key: key.clone(),
1379            value: Some(value.clone()),
1380        })
1381        .collect();
1382
1383    UpdateMap {
1384        update_entries,
1385        replace: true, // Old style schema metadata was full replacement
1386    }
1387}
1388
1389impl From<&UpdateMap> for pb::transaction::UpdateMap {
1390    fn from(update_map: &UpdateMap) -> Self {
1391        Self {
1392            update_entries: update_map
1393                .update_entries
1394                .iter()
1395                .map(|entry| pb::transaction::UpdateMapEntry {
1396                    key: entry.key.clone(),
1397                    value: entry.value.clone(),
1398                })
1399                .collect(),
1400            replace: update_map.replace,
1401        }
1402    }
1403}
1404
1405impl From<&pb::transaction::UpdateMap> for UpdateMap {
1406    fn from(pb_update_map: &pb::transaction::UpdateMap) -> Self {
1407        Self {
1408            update_entries: pb_update_map
1409                .update_entries
1410                .iter()
1411                .map(|entry| UpdateMapEntry {
1412                    key: entry.key.clone(),
1413                    value: entry.value.clone(),
1414                })
1415                .collect(),
1416            replace: pb_update_map.replace,
1417        }
1418    }
1419}
1420
1421/// Add TransactionBuilder for flexibly setting option without using `mut`
1422pub struct TransactionBuilder {
1423    read_version: u64,
1424    // uuid is optional for builder since it can autogenerate
1425    uuid: Option<String>,
1426    operation: Operation,
1427    tag: Option<String>,
1428    transaction_properties: Option<Arc<HashMap<String, String>>>,
1429}
1430
1431impl TransactionBuilder {
1432    pub fn new(read_version: u64, operation: Operation) -> Self {
1433        Self {
1434            read_version,
1435            uuid: None,
1436            operation,
1437            tag: None,
1438            transaction_properties: None,
1439        }
1440    }
1441
1442    pub fn uuid(mut self, uuid: String) -> Self {
1443        self.uuid = Some(uuid);
1444        self
1445    }
1446
1447    pub fn tag(mut self, tag: Option<String>) -> Self {
1448        self.tag = tag;
1449        self
1450    }
1451
1452    pub fn transaction_properties(
1453        mut self,
1454        transaction_properties: Option<Arc<HashMap<String, String>>>,
1455    ) -> Self {
1456        self.transaction_properties = transaction_properties;
1457        self
1458    }
1459
1460    pub fn build(self) -> Transaction {
1461        let uuid = self
1462            .uuid
1463            .unwrap_or_else(|| Uuid::new_v4().hyphenated().to_string());
1464        Transaction {
1465            read_version: self.read_version,
1466            uuid,
1467            operation: self.operation,
1468            tag: self.tag,
1469            transaction_properties: self.transaction_properties,
1470        }
1471    }
1472}
1473
1474impl Transaction {
1475    pub fn new_from_version(read_version: u64, operation: Operation) -> Self {
1476        TransactionBuilder::new(read_version, operation).build()
1477    }
1478
1479    pub fn new(read_version: u64, operation: Operation, tag: Option<String>) -> Self {
1480        TransactionBuilder::new(read_version, operation)
1481            .tag(tag)
1482            .build()
1483    }
1484
1485    fn fragments_with_ids<'a, T>(
1486        new_fragments: T,
1487        fragment_id: &'a mut u64,
1488    ) -> impl Iterator<Item = Fragment> + 'a
1489    where
1490        T: IntoIterator<Item = Fragment> + 'a,
1491    {
1492        new_fragments.into_iter().map(move |mut f| {
1493            if f.id == 0 {
1494                f.id = *fragment_id;
1495                *fragment_id += 1;
1496            }
1497            f
1498        })
1499    }
1500
1501    fn data_storage_format_from_files(
1502        fragments: &[Fragment],
1503        user_requested: Option<LanceFileVersion>,
1504    ) -> Result<DataStorageFormat> {
1505        if let Some(file_version) = Fragment::try_infer_version(fragments)? {
1506            // Ensure user-requested matches data files
1507            if let Some(user_requested) = user_requested
1508                && user_requested != file_version
1509            {
1510                return Err(Error::invalid_input(format!(
1511                    "User requested data storage version ({}) does not match version in data files ({})",
1512                    user_requested, file_version
1513                )));
1514            }
1515            Ok(DataStorageFormat::new(file_version))
1516        } else {
1517            // If no files use user-requested or default
1518            Ok(user_requested
1519                .map(DataStorageFormat::new)
1520                .unwrap_or_default())
1521        }
1522    }
1523
1524    pub(crate) async fn restore_old_manifest(
1525        object_store: &ObjectStore,
1526        commit_handler: &dyn CommitHandler,
1527        base_path: &Path,
1528        version: u64,
1529        config: &ManifestWriteConfig,
1530        tx_path: &str,
1531        current_manifest: &Manifest,
1532    ) -> Result<(Manifest, Vec<IndexMetadata>)> {
1533        let location = commit_handler
1534            .resolve_version_location(base_path, version, &object_store.inner)
1535            .await?;
1536        let mut manifest = read_manifest(object_store, &location.path, location.size).await?;
1537        manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
1538        manifest.transaction_file = Some(tx_path.to_string());
1539        let indices = read_manifest_indexes(object_store, &location, &manifest).await?;
1540        manifest.max_fragment_id = manifest
1541            .max_fragment_id
1542            .max(current_manifest.max_fragment_id);
1543        Ok((manifest, indices))
1544    }
1545
1546    /// Create a new manifest from the current manifest and the transaction.
1547    ///
1548    /// `current_manifest` should only be None if the dataset does not yet exist.
1549    pub(crate) fn build_manifest(
1550        &self,
1551        current_manifest: Option<&Manifest>,
1552        current_indices: Vec<IndexMetadata>,
1553        transaction_file_path: &str,
1554        config: &ManifestWriteConfig,
1555    ) -> Result<(Manifest, Vec<IndexMetadata>)> {
1556        if config.use_stable_row_ids
1557            && current_manifest
1558                .map(|m| !m.uses_stable_row_ids())
1559                .unwrap_or_default()
1560        {
1561            return Err(Error::not_supported_source(
1562                "Cannot enable stable row ids on existing dataset".into(),
1563            ));
1564        }
1565        let mut reference_paths = match current_manifest {
1566            Some(m) => m.base_paths.clone(),
1567            None => HashMap::new(),
1568        };
1569
1570        if let Operation::Overwrite {
1571            initial_bases: Some(initial_bases),
1572            ..
1573        } = &self.operation
1574        {
1575            if current_manifest.is_none() {
1576                // CREATE mode: registering base paths
1577                // Base IDs should have been assigned during write operation
1578                // Validate uniqueness and insert them into the manifest
1579                for base_path in initial_bases.iter() {
1580                    if reference_paths.contains_key(&base_path.id) {
1581                        return Err(Error::invalid_input(format!(
1582                            "Duplicate base path ID {} detected. Base path IDs must be unique.",
1583                            base_path.id
1584                        )));
1585                    }
1586                    reference_paths.insert(base_path.id, base_path.clone());
1587                }
1588            } else {
1589                // OVERWRITE mode with initial_bases should have been rejected by validation
1590                // This branch should never be reached
1591                return Err(Error::invalid_input(
1592                    "OVERWRITE mode cannot register new bases. This should have been caught by validation.",
1593                ));
1594            }
1595        }
1596
1597        // Get the schema and the final fragment list
1598        let schema = match self.operation {
1599            Operation::Overwrite { ref schema, .. } => schema.clone(),
1600            Operation::Merge { ref schema, .. } => schema.clone(),
1601            Operation::Project { ref schema, .. } => schema.clone(),
1602            _ => {
1603                if let Some(current_manifest) = current_manifest {
1604                    current_manifest.schema.clone()
1605                } else {
1606                    return Err(Error::internal(
1607                        "Cannot create a new dataset without a schema".to_string(),
1608                    ));
1609                }
1610            }
1611        };
1612
1613        let mut fragment_id = if matches!(self.operation, Operation::Overwrite { .. }) {
1614            0
1615        } else {
1616            current_manifest
1617                .and_then(|m| m.max_fragment_id())
1618                .map(|id| id + 1)
1619                .unwrap_or(0)
1620        };
1621        let mut final_fragments = Vec::new();
1622        let mut final_indices = current_indices;
1623
1624        let mut next_row_id = {
1625            // Only use row ids if the feature flag is set already or
1626            match (current_manifest, config.use_stable_row_ids) {
1627                (Some(manifest), _) if manifest.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0 => {
1628                    Some(manifest.next_row_id)
1629                }
1630                (None, true) => Some(0),
1631                (_, false) => None,
1632                (Some(_), true) => {
1633                    return Err(Error::not_supported_source(
1634                        "Cannot enable stable row ids on existing dataset".into(),
1635                    ));
1636                }
1637            }
1638        };
1639
1640        let maybe_existing_fragments =
1641            current_manifest
1642                .map(|m| m.fragments.as_ref())
1643                .ok_or_else(|| {
1644                    Error::internal(format!(
1645                        "No current manifest was provided while building manifest for operation {}",
1646                        self.operation.name()
1647                    ))
1648                });
1649
1650        match &self.operation {
1651            Operation::Clone { .. } => {
1652                return Err(Error::internal(
1653                    "Clone operation should not enter build_manifest.".to_string(),
1654                ));
1655            }
1656            Operation::Append { fragments } => {
1657                final_fragments.extend(maybe_existing_fragments?.clone());
1658                let mut new_fragments =
1659                    Self::fragments_with_ids(fragments.clone(), &mut fragment_id)
1660                        .collect::<Vec<_>>();
1661                if let Some(next_row_id) = &mut next_row_id {
1662                    Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?;
1663                    // Add version metadata for all new fragments
1664                    let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1);
1665                    for fragment in new_fragments.iter_mut() {
1666                        let version_meta =
1667                            lance_table::rowids::version::build_version_meta(fragment, new_version);
1668                        fragment.last_updated_at_version_meta = version_meta.clone();
1669                        fragment.created_at_version_meta = version_meta;
1670                    }
1671                }
1672                final_fragments.extend(new_fragments);
1673            }
1674            Operation::Delete {
1675                updated_fragments,
1676                deleted_fragment_ids,
1677                ..
1678            } => {
1679                // Remove the deleted fragments
1680                final_fragments.extend(maybe_existing_fragments?.clone());
1681                final_fragments.retain(|f| !deleted_fragment_ids.contains(&f.id));
1682                final_fragments.iter_mut().for_each(|f| {
1683                    for updated in updated_fragments {
1684                        if updated.id == f.id {
1685                            *f = updated.clone();
1686                        }
1687                    }
1688                });
1689                Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments)
1690            }
1691            Operation::Update {
1692                removed_fragment_ids,
1693                updated_fragments,
1694                new_fragments,
1695                fields_modified,
1696                merged_generations,
1697                fields_for_preserving_frag_bitmap,
1698                update_mode,
1699                ..
1700            } => {
1701                // Extract existing fragments once for reuse
1702                let existing_fragments = maybe_existing_fragments?;
1703
1704                // Apply updates to existing fragments
1705                let updated_frags: Vec<Fragment> = existing_fragments
1706                    .iter()
1707                    .filter_map(|f| {
1708                        if removed_fragment_ids.contains(&f.id) {
1709                            return None;
1710                        }
1711                        if let Some(updated) = updated_fragments.iter().find(|uf| uf.id == f.id) {
1712                            Some(updated.clone())
1713                        } else {
1714                            Some(f.clone())
1715                        }
1716                    })
1717                    .collect();
1718
1719                // Update version metadata for updated fragments if stable row IDs are enabled
1720                // Note: We don't update version metadata for fragments with deletion vectors
1721                // because the version sequences are indexed by physical row position, not logical position.
1722                // Version metadata for deleted rows will be filtered out during scan using the deletion vector.
1723                if next_row_id.is_some() {
1724                    // Version metadata will be properly set during compaction when deletions are materialized
1725                }
1726
1727                final_fragments.extend(updated_frags);
1728
1729                // If we updated any fields, remove those fragments from indices covering those fields
1730                Self::prune_updated_fields_from_indices(
1731                    &mut final_indices,
1732                    updated_fragments,
1733                    fields_modified,
1734                );
1735
1736                let mut new_fragments =
1737                    Self::fragments_with_ids(new_fragments.clone(), &mut fragment_id)
1738                        .collect::<Vec<_>>();
1739
1740                // Assign row IDs to any fragments that don't have them yet
1741                // (e.g., inserted rows from merge_insert operations)
1742                if let Some(next_row_id) = &mut next_row_id {
1743                    Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?;
1744                }
1745
1746                // Set version metadata for newly created fragments (updated rows)
1747                // Preserve created_at from original fragments, set last_updated to new version
1748                if next_row_id.is_some() {
1749                    let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1);
1750
1751                    // Build a map of original fragment ID -> original fragment for lookup
1752                    let original_frags_map: std::collections::HashMap<u64, &Fragment> =
1753                        existing_fragments.iter().map(|f| (f.id, f)).collect();
1754
1755                    for fragment in new_fragments.iter_mut() {
1756                        // For update operations with RewriteRows mode:
1757                        // - Rows are deleted from old fragments and rewritten to new fragments
1758                        // - last_updated_at should be the current version (when update happened)
1759                        // - created_at should be preserved from the original fragment
1760
1761                        // Read row IDs from this fragment to find original fragments
1762                        let row_ids = if let Some(row_id_meta) = &fragment.row_id_meta {
1763                            match row_id_meta {
1764                                lance_table::format::RowIdMeta::Inline(data) => {
1765                                    lance_table::rowids::read_row_ids(data).ok()
1766                                }
1767                                lance_table::format::RowIdMeta::External(_) => None,
1768                            }
1769                        } else {
1770                            None
1771                        };
1772
1773                        if let Some(row_ids) = row_ids {
1774                            // Extract created_at version for each row from original fragments
1775                            let physical_rows = fragment.physical_rows.unwrap_or(0);
1776                            let mut created_at_versions = Vec::with_capacity(physical_rows);
1777
1778                            for row_id in row_ids.iter() {
1779                                // Row ID format: upper 32 bits = fragment ID, lower 32 bits = row offset
1780                                let orig_frag_id = row_id >> 32;
1781                                let row_offset = (row_id & 0xFFFFFFFF) as usize;
1782
1783                                // Look up the original fragment
1784                                if let Some(orig_frag) = original_frags_map.get(&orig_frag_id) {
1785                                    // Get created_at version from original fragment's metadata
1786                                    let created_version = if let Some(created_meta) =
1787                                        &orig_frag.created_at_version_meta
1788                                    {
1789                                        // Load and index into the version sequence
1790                                        match created_meta.load_sequence() {
1791                                            Ok(seq) => {
1792                                                let versions: Vec<u64> = seq.versions().collect();
1793                                                versions.get(row_offset).copied().unwrap_or(1)
1794                                            }
1795                                            Err(_e) => {
1796                                                1 // Default to version 1 on error
1797                                            }
1798                                        }
1799                                    } else {
1800                                        // No metadata on original fragment, default to version 1
1801                                        1
1802                                    };
1803                                    created_at_versions.push(created_version);
1804                                } else {
1805                                    // Original fragment not found, default to version 1
1806                                    created_at_versions.push(1);
1807                                }
1808                            }
1809
1810                            // Build version metadata from the collected versions
1811                            // Compress into runs: consecutive identical versions become one run
1812                            let mut runs = Vec::new();
1813                            if !created_at_versions.is_empty() {
1814                                let mut current_version = created_at_versions[0];
1815                                let mut run_start = 0u64;
1816
1817                                for (i, &version) in created_at_versions.iter().enumerate().skip(1)
1818                                {
1819                                    if version != current_version {
1820                                        // End current run, start new one
1821                                        runs.push(lance_table::format::RowDatasetVersionRun {
1822                                            span: lance_table::rowids::segment::U64Segment::Range(
1823                                                run_start..i as u64,
1824                                            ),
1825                                            version: current_version,
1826                                        });
1827                                        current_version = version;
1828                                        run_start = i as u64;
1829                                    }
1830                                }
1831                                // Add final run
1832                                runs.push(lance_table::format::RowDatasetVersionRun {
1833                                    span: lance_table::rowids::segment::U64Segment::Range(
1834                                        run_start..created_at_versions.len() as u64,
1835                                    ),
1836                                    version: current_version,
1837                                });
1838                            }
1839
1840                            let created_at_seq =
1841                                lance_table::format::RowDatasetVersionSequence { runs };
1842                            fragment.created_at_version_meta = Some(
1843                                lance_table::format::RowDatasetVersionMeta::from_sequence(
1844                                    &created_at_seq,
1845                                )
1846                                .map_err(|e| {
1847                                    Error::internal(format!(
1848                                        "Failed to create created_at version metadata: {}",
1849                                        e
1850                                    ))
1851                                })?,
1852                            );
1853
1854                            // Set last_updated_at to the new version for all rows
1855                            let last_updated_meta =
1856                                lance_table::rowids::version::build_version_meta(
1857                                    fragment,
1858                                    new_version,
1859                                );
1860                            fragment.last_updated_at_version_meta = last_updated_meta;
1861                        } else {
1862                            // Fallback: can't read row IDs, set both to new version
1863                            let version_meta = lance_table::rowids::version::build_version_meta(
1864                                fragment,
1865                                new_version,
1866                            );
1867                            fragment.last_updated_at_version_meta = version_meta.clone();
1868                            fragment.created_at_version_meta = version_meta;
1869                        }
1870                    }
1871                }
1872
1873                if config.use_stable_row_ids
1874                    && update_mode.is_some()
1875                    && *update_mode == Some(RewriteRows)
1876                {
1877                    let pure_updated_frag_ids =
1878                        Self::collect_pure_rewrite_row_update_frags_ids(&new_fragments)?;
1879
1880                    // collect all the original frag ids that contains the updated rows
1881                    let original_fragment_ids: Vec<u64> = removed_fragment_ids
1882                        .iter()
1883                        .chain(updated_fragments.iter().map(|f| &f.id))
1884                        .copied()
1885                        .collect();
1886
1887                    Self::register_pure_rewrite_rows_update_frags_in_indices(
1888                        &mut final_indices,
1889                        &pure_updated_frag_ids,
1890                        &original_fragment_ids,
1891                        fields_for_preserving_frag_bitmap,
1892                    );
1893                }
1894
1895                if let Some(next_row_id) = &mut next_row_id {
1896                    Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?;
1897                    // Note: Version metadata is already set above (lines 1627-1755)
1898                    // for Update operations, preserving created_at from original fragments.
1899                    // Don't overwrite it here.
1900                }
1901                // Identify fragments that were updated or newly created in this update
1902                let mut target_ids: HashSet<u64> = HashSet::new();
1903                target_ids.extend(new_fragments.iter().map(|f| f.id));
1904                final_fragments.extend(new_fragments);
1905                Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments);
1906
1907                if !merged_generations.is_empty() {
1908                    update_mem_wal_index_merged_generations(
1909                        &mut final_indices,
1910                        current_manifest.map_or(1, |m| m.version + 1),
1911                        merged_generations.clone(),
1912                    )?;
1913                }
1914            }
1915            Operation::Overwrite { fragments, .. } => {
1916                let mut new_fragments =
1917                    Self::fragments_with_ids(fragments.clone(), &mut fragment_id)
1918                        .collect::<Vec<_>>();
1919                if let Some(next_row_id) = &mut next_row_id {
1920                    Self::assign_row_ids(next_row_id, new_fragments.as_mut_slice())?;
1921                    // Add version metadata for all new fragments
1922                    let new_version = current_manifest.map(|m| m.version + 1).unwrap_or(1);
1923                    for fragment in new_fragments.iter_mut() {
1924                        let version_meta =
1925                            lance_table::rowids::version::build_version_meta(fragment, new_version);
1926                        fragment.last_updated_at_version_meta = version_meta.clone();
1927                        fragment.created_at_version_meta = version_meta;
1928                    }
1929                }
1930                final_fragments.extend(new_fragments);
1931                final_indices = Vec::new();
1932            }
1933            Operation::Rewrite {
1934                groups,
1935                rewritten_indices,
1936                frag_reuse_index,
1937            } => {
1938                final_fragments.extend(maybe_existing_fragments?.clone());
1939                let current_version = current_manifest.map(|m| m.version).unwrap_or_default();
1940                Self::handle_rewrite_fragments(
1941                    &mut final_fragments,
1942                    groups,
1943                    &mut fragment_id,
1944                    current_version,
1945                    next_row_id.as_ref(),
1946                )?;
1947
1948                if next_row_id.is_some() {
1949                    // We can re-use indices, but need to rewrite the fragment bitmaps
1950                    debug_assert!(rewritten_indices.is_empty());
1951                    for index in final_indices.iter_mut() {
1952                        if let Some(fragment_bitmap) = &mut index.fragment_bitmap {
1953                            *fragment_bitmap =
1954                                Self::recalculate_fragment_bitmap(fragment_bitmap, groups)?;
1955                        }
1956                    }
1957                } else {
1958                    Self::handle_rewrite_indices(&mut final_indices, rewritten_indices, groups)?;
1959                }
1960
1961                if let Some(frag_reuse_index) = frag_reuse_index {
1962                    final_indices.retain(|idx| idx.name != frag_reuse_index.name);
1963                    final_indices.push(frag_reuse_index.clone());
1964                }
1965            }
1966            Operation::CreateIndex {
1967                new_indices,
1968                removed_indices,
1969            } => {
1970                final_fragments.extend(maybe_existing_fragments?.clone());
1971                final_indices.retain(|existing_index| {
1972                    !new_indices
1973                        .iter()
1974                        .any(|new_index| new_index.name == existing_index.name)
1975                        && !removed_indices
1976                            .iter()
1977                            .any(|old_index| old_index.uuid == existing_index.uuid)
1978                });
1979                final_indices.extend(new_indices.clone());
1980            }
1981            Operation::ReserveFragments { .. } | Operation::UpdateConfig { .. } => {
1982                final_fragments.extend(maybe_existing_fragments?.clone());
1983            }
1984            Operation::Merge { fragments, .. } => {
1985                final_fragments.extend(fragments.clone());
1986
1987                // Some fields that have indices may have been removed, so we should
1988                // remove those indices as well.
1989                Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments)
1990            }
1991            Operation::Project { .. } => {
1992                final_fragments.extend(maybe_existing_fragments?.clone());
1993
1994                // We might have removed all fields for certain data files, so
1995                // we should remove the data files that are no longer relevant.
1996                let remaining_field_ids = schema
1997                    .fields_pre_order()
1998                    .map(|f| f.id)
1999                    .collect::<HashSet<_>>();
2000                for fragment in final_fragments.iter_mut() {
2001                    fragment.files.retain(|file| {
2002                        file.fields
2003                            .iter()
2004                            .any(|field_id| remaining_field_ids.contains(field_id))
2005                    });
2006                }
2007
2008                // Some fields that have indices may have been removed, so we should
2009                // remove those indices as well.
2010                Self::retain_relevant_indices(&mut final_indices, &schema, &final_fragments)
2011            }
2012            Operation::Restore { .. } => {
2013                unreachable!()
2014            }
2015            Operation::DataReplacement { replacements } => {
2016                log::warn!(
2017                    "Building manifest with DataReplacement operation. This operation is not stable yet, please use with caution."
2018                );
2019
2020                let (old_fragment_ids, new_datafiles): (Vec<&u64>, Vec<&DataFile>) = replacements
2021                    .iter()
2022                    .map(|DataReplacementGroup(fragment_id, new_file)| (fragment_id, new_file))
2023                    .unzip();
2024
2025                // 1. make sure the new files all have the same fields / or empty
2026                // NOTE: arguably this requirement could be relaxed in the future
2027                // for the sake of simplicity, we require the new files to have the same fields
2028                if new_datafiles
2029                    .iter()
2030                    .map(|f| f.fields.clone())
2031                    .collect::<HashSet<_>>()
2032                    .len()
2033                    > 1
2034                {
2035                    let field_info = new_datafiles
2036                        .iter()
2037                        .enumerate()
2038                        .map(|(id, f)| (id, f.fields.clone()))
2039                        .fold("".to_string(), |acc, (id, fields)| {
2040                            format!("{}File {}: {:?}\n", acc, id, fields)
2041                        });
2042
2043                    return Err(Error::invalid_input(format!(
2044                        "All new data files must have the same fields, but found different fields:\n{field_info}"
2045                    )));
2046                }
2047
2048                let existing_fragments = maybe_existing_fragments?;
2049
2050                // Collect replaced field IDs before consuming new_datafiles
2051                let replaced_fields: Vec<u32> = new_datafiles
2052                    .first()
2053                    .map(|f| {
2054                        f.fields
2055                            .iter()
2056                            .filter(|&&id| id >= 0)
2057                            .map(|&id| id as u32)
2058                            .collect()
2059                    })
2060                    .unwrap_or_default();
2061
2062                // 2. check that the fragments being modified have isomorphic layouts along the columns being replaced
2063                // 3. add modified fragments to final_fragments
2064                for (frag_id, new_file) in old_fragment_ids.iter().zip(new_datafiles) {
2065                    let frag = existing_fragments
2066                        .iter()
2067                        .find(|f| f.id == **frag_id)
2068                        .ok_or_else(|| {
2069                            Error::invalid_input(
2070                                "Fragment being replaced not found in existing fragments",
2071                            )
2072                        })?;
2073                    let mut new_frag = frag.clone();
2074
2075                    // TODO(rmeng): check new file and fragment are the same length
2076
2077                    let mut columns_covered = HashSet::new();
2078                    for file in &mut new_frag.files {
2079                        if file.fields == new_file.fields
2080                            && file.file_major_version == new_file.file_major_version
2081                            && file.file_minor_version == new_file.file_minor_version
2082                        {
2083                            // assign the new file path / size to the fragment
2084                            file.path = new_file.path.clone();
2085                            file.file_size_bytes = new_file.file_size_bytes.clone();
2086                        }
2087                        columns_covered.extend(file.fields.iter());
2088                    }
2089                    // SPECIAL CASE: if the column(s) being replaced are not covered by the fragment
2090                    // Then it means it's a all-NULL column that is being replaced with real data
2091                    // just add it to the final fragments
2092                    if columns_covered.is_disjoint(&new_file.fields.iter().collect()) {
2093                        new_frag.add_file(
2094                            new_file.path.clone(),
2095                            new_file.fields.clone(),
2096                            new_file.column_indices.clone(),
2097                            &LanceFileVersion::try_from_major_minor(
2098                                new_file.file_major_version,
2099                                new_file.file_minor_version,
2100                            )
2101                            .expect("Expected valid file version"),
2102                            new_file.file_size_bytes.get(),
2103                        );
2104                    }
2105
2106                    // Nothing changed in the current fragment, which is not expected -- error out
2107                    if &new_frag == frag {
2108                        return Err(Error::invalid_input(
2109                            "Expected to modify the fragment but no changes were made. This means the new data files does not align with any exiting datafiles. Please check if the schema of the new data files matches the schema of the old data files including the file major and minor versions",
2110                        ));
2111                    }
2112                    final_fragments.push(new_frag);
2113                }
2114
2115                let fragments_changed = old_fragment_ids
2116                    .iter()
2117                    .cloned()
2118                    .cloned()
2119                    .collect::<HashSet<_>>();
2120
2121                // 4. push fragments that didn't change back to final_fragments
2122                let unmodified_fragments = existing_fragments
2123                    .iter()
2124                    .filter(|f| !fragments_changed.contains(&f.id))
2125                    .cloned()
2126                    .collect::<Vec<_>>();
2127
2128                final_fragments.extend(unmodified_fragments);
2129
2130                // 5. Invalidate index bitmaps for replaced fields
2131                let modified_fragments: Vec<Fragment> = final_fragments
2132                    .iter()
2133                    .filter(|f| fragments_changed.contains(&f.id))
2134                    .cloned()
2135                    .collect();
2136
2137                Self::prune_updated_fields_from_indices(
2138                    &mut final_indices,
2139                    &modified_fragments,
2140                    &replaced_fields,
2141                );
2142            }
2143            Operation::UpdateMemWalState { merged_generations } => {
2144                update_mem_wal_index_merged_generations(
2145                    &mut final_indices,
2146                    current_manifest.map_or(1, |m| m.version + 1),
2147                    merged_generations.clone(),
2148                )?;
2149            }
2150            Operation::UpdateBases { .. } => {
2151                // UpdateBases operation doesn't modify fragments or indices
2152                // Base paths are handled in the manifest creation section below
2153                final_fragments.extend(maybe_existing_fragments?.clone());
2154            }
2155        };
2156
2157        // If a fragment was reserved then it may not belong at the end of the fragments list.
2158        final_fragments.sort_by_key(|frag| frag.id);
2159
2160        // Clean up data files that only contain tombstoned fields
2161        Self::remove_tombstoned_data_files(&mut final_fragments);
2162
2163        let user_requested_version = match (&config.storage_format, config.use_legacy_format) {
2164            (Some(storage_format), _) => Some(storage_format.lance_file_version()?),
2165            (None, Some(true)) => Some(LanceFileVersion::Legacy),
2166            (None, Some(false)) => Some(LanceFileVersion::V2_0),
2167            (None, None) => None,
2168        };
2169
2170        let mut manifest = if let Some(current_manifest) = current_manifest {
2171            // OVERWRITE with initial_bases on existing dataset is not allowed (caught by validation)
2172            // So we always use new_from_previous which preserves base_paths
2173            let mut prev_manifest =
2174                Manifest::new_from_previous(current_manifest, schema, Arc::new(final_fragments));
2175
2176            if let (Some(user_requested_version), Operation::Overwrite { .. }) =
2177                (user_requested_version, &self.operation)
2178            {
2179                // If this is an overwrite operation and the user has requested a specific version
2180                // then overwrite with that version.  Otherwise, if the user didn't request a specific
2181                // version, then overwrite with whatever version we had before.
2182                prev_manifest.data_storage_format = DataStorageFormat::new(user_requested_version);
2183            }
2184
2185            prev_manifest
2186        } else {
2187            let data_storage_format =
2188                Self::data_storage_format_from_files(&final_fragments, user_requested_version)?;
2189            Manifest::new(
2190                schema,
2191                Arc::new(final_fragments),
2192                data_storage_format,
2193                reference_paths,
2194            )
2195        };
2196
2197        manifest.tag.clone_from(&self.tag);
2198
2199        if config.auto_set_feature_flags {
2200            apply_feature_flags(
2201                &mut manifest,
2202                config.use_stable_row_ids,
2203                config.disable_transaction_file,
2204            )?;
2205        }
2206        manifest.set_timestamp(timestamp_to_nanos(config.timestamp));
2207
2208        manifest.update_max_fragment_id();
2209
2210        match &self.operation {
2211            Operation::Overwrite {
2212                config_upsert_values: Some(tm),
2213                ..
2214            } => {
2215                manifest.config_mut().extend(tm.clone());
2216            }
2217            Operation::UpdateConfig {
2218                config_updates,
2219                table_metadata_updates,
2220                schema_metadata_updates,
2221                field_metadata_updates,
2222            } => {
2223                if let Some(config_updates) = config_updates {
2224                    let mut config = manifest.config.clone();
2225                    apply_update_map(&mut config, config_updates);
2226                    manifest.config = config;
2227                }
2228                if let Some(table_metadata_updates) = table_metadata_updates {
2229                    let mut table_metadata = manifest.table_metadata.clone();
2230                    apply_update_map(&mut table_metadata, table_metadata_updates);
2231                    manifest.table_metadata = table_metadata;
2232                }
2233                if let Some(schema_metadata_updates) = schema_metadata_updates {
2234                    let mut schema_metadata = manifest.schema.metadata.clone();
2235                    apply_update_map(&mut schema_metadata, schema_metadata_updates);
2236                    manifest.schema.metadata = schema_metadata;
2237                }
2238                for (field_id, field_metadata_update) in field_metadata_updates {
2239                    if let Some(field) = manifest.schema.field_by_id_mut(*field_id) {
2240                        apply_update_map(&mut field.metadata, field_metadata_update);
2241                    } else {
2242                        return Err(Error::invalid_input_source(
2243                            format!("Field with id {} does not exist", field_id).into(),
2244                        ));
2245                    }
2246                }
2247            }
2248            _ => {}
2249        }
2250
2251        // Handle UpdateBases operation to update manifest base_paths
2252        if let Operation::UpdateBases { new_bases } = &self.operation {
2253            // Validate and add new base paths to the manifest
2254            for new_base in new_bases {
2255                // Check for conflicts with existing base paths
2256                if let Some(existing_base) = manifest
2257                    .base_paths
2258                    .values()
2259                    .find(|bp| bp.name == new_base.name || bp.path == new_base.path)
2260                {
2261                    return Err(Error::invalid_input(format!(
2262                        "Conflict detected: Base path with name '{:?}' or path '{}' already exists. Existing: name='{:?}', path='{}'",
2263                        new_base.name, new_base.path, existing_base.name, existing_base.path
2264                    )));
2265                }
2266
2267                // Assign a new ID if not already assigned
2268                let mut base_to_add = new_base.clone();
2269                if base_to_add.id == 0 {
2270                    let next_id = manifest
2271                        .base_paths
2272                        .keys()
2273                        .max()
2274                        .map(|&id| id + 1)
2275                        .unwrap_or(1);
2276                    base_to_add.id = next_id;
2277                }
2278
2279                manifest.base_paths.insert(base_to_add.id, base_to_add);
2280            }
2281        }
2282
2283        if let Operation::ReserveFragments { num_fragments } = self.operation {
2284            manifest.max_fragment_id = Some(manifest.max_fragment_id.unwrap_or(0) + num_fragments);
2285        }
2286
2287        manifest.transaction_file = Some(transaction_file_path.to_string());
2288
2289        if let Some(next_row_id) = next_row_id {
2290            manifest.next_row_id = next_row_id;
2291        }
2292
2293        Ok((manifest, final_indices))
2294    }
2295
2296    fn register_pure_rewrite_rows_update_frags_in_indices(
2297        indices: &mut [IndexMetadata],
2298        pure_update_frag_ids: &[u64],
2299        original_fragment_ids: &[u64],
2300        fields_for_preserving_frag_bitmap: &[u32],
2301    ) {
2302        if pure_update_frag_ids.is_empty() {
2303            return;
2304        }
2305
2306        let value_updated_field_set = fields_for_preserving_frag_bitmap
2307            .iter()
2308            .collect::<HashSet<_>>();
2309
2310        for index in indices.iter_mut() {
2311            let index_covers_modified_field = index.fields.iter().any(|field_id| {
2312                value_updated_field_set.contains(&u32::try_from(*field_id).unwrap())
2313            });
2314
2315            if !index_covers_modified_field
2316                && let Some(fragment_bitmap) = &mut index.fragment_bitmap
2317            {
2318                // check if all the original fragments contains the updating rows are covered
2319                // by the index(index fragment bitmap contains these frag ids).
2320                // if not, that means not all the updating rows are indexed, so we could not
2321                // index them.
2322                let index_covers_all_original_fragments = original_fragment_ids
2323                    .iter()
2324                    .all(|&fragment_id| fragment_bitmap.contains(fragment_id as u32));
2325
2326                if index_covers_all_original_fragments {
2327                    for fragment_id in pure_update_frag_ids.iter().map(|f| *f as u32) {
2328                        fragment_bitmap.insert(fragment_id);
2329                    }
2330                }
2331            }
2332        }
2333    }
2334
2335    /// If an operation modifies one or more fields in a fragment then we need to remove
2336    /// that fragment from any indices that cover one of the modified fields.
2337    fn prune_updated_fields_from_indices(
2338        indices: &mut [IndexMetadata],
2339        updated_fragments: &[Fragment],
2340        fields_modified: &[u32],
2341    ) {
2342        if fields_modified.is_empty() {
2343            return;
2344        }
2345
2346        // If we modified any fields in the fragments then we need to remove those fragments
2347        // from the index if the index covers one of those modified fields.
2348        let fields_modified_set = fields_modified.iter().collect::<HashSet<_>>();
2349        for index in indices.iter_mut() {
2350            if index
2351                .fields
2352                .iter()
2353                .any(|field_id| fields_modified_set.contains(&u32::try_from(*field_id).unwrap()))
2354                && let Some(fragment_bitmap) = &mut index.fragment_bitmap
2355            {
2356                for fragment_id in updated_fragments.iter().map(|f| f.id as u32) {
2357                    fragment_bitmap.remove(fragment_id);
2358                }
2359            }
2360        }
2361    }
2362
2363    fn is_vector_index(index: &IndexMetadata) -> bool {
2364        if let Some(details) = &index.index_details {
2365            details.type_url.ends_with("VectorIndexDetails")
2366        } else {
2367            false
2368        }
2369    }
2370
2371    /// Remove data files that only contain tombstoned fields (-2)
2372    /// These files no longer contain any live data and can be safely dropped
2373    fn remove_tombstoned_data_files(fragments: &mut [Fragment]) {
2374        for fragment in fragments {
2375            fragment.files.retain(|file| {
2376                // Keep file if it has at least one non-tombstoned field
2377                file.fields.iter().any(|&field_id| field_id != -2)
2378            });
2379        }
2380    }
2381
2382    fn retain_relevant_indices(
2383        indices: &mut Vec<IndexMetadata>,
2384        schema: &Schema,
2385        fragments: &[Fragment],
2386    ) {
2387        let field_ids = schema
2388            .fields_pre_order()
2389            .map(|f| f.id)
2390            .collect::<HashSet<_>>();
2391
2392        // Remove indices for fields no longer in schema
2393        indices.retain(|existing_index| {
2394            existing_index
2395                .fields
2396                .iter()
2397                .all(|field_id| field_ids.contains(field_id))
2398                || is_system_index(existing_index)
2399        });
2400
2401        // Fragment bitmaps record which fragments the index was originally built for.
2402        // Operations like updates and data replacement prune these bitmaps, and
2403        // effective_fragment_bitmap intersects with existing fragments at query time.
2404
2405        // Apply retention logic for indices with empty bitmaps per index name
2406        // (except for fragment reuse indices which are always kept)
2407        let mut indices_by_name: std::collections::HashMap<String, Vec<&IndexMetadata>> =
2408            std::collections::HashMap::new();
2409
2410        // Group indices by name
2411        for index in indices.iter() {
2412            if index.name != FRAG_REUSE_INDEX_NAME {
2413                indices_by_name
2414                    .entry(index.name.clone())
2415                    .or_default()
2416                    .push(index);
2417            }
2418        }
2419
2420        // Build a set of UUIDs to keep based on retention rules
2421        let mut uuids_to_keep = std::collections::HashSet::new();
2422
2423        let existing_fragments = fragments
2424            .iter()
2425            .map(|f| f.id as u32)
2426            .collect::<RoaringBitmap>();
2427
2428        // For each group of indices with the same name
2429        for (_, same_name_indices) in indices_by_name {
2430            if same_name_indices.len() > 1 {
2431                // Separate empty and non-empty indices
2432                let (empty_indices, non_empty_indices): (Vec<_>, Vec<_>) =
2433                    same_name_indices.iter().partition(|index| {
2434                        index
2435                            .effective_fragment_bitmap(&existing_fragments)
2436                            .as_ref()
2437                            .is_none_or(|bitmap| bitmap.is_empty())
2438                    });
2439
2440                if non_empty_indices.is_empty() {
2441                    // All indices are empty - for scalar indices, keep only the first (oldest) one
2442                    // For vector indices, remove all of them
2443                    let mut sorted_indices = empty_indices;
2444                    sorted_indices.sort_by_key(|index: &&IndexMetadata| index.dataset_version); // Sort by ascending dataset_version
2445
2446                    // Keep only the first (oldest) if it's not a vector index
2447                    if let Some(oldest) = sorted_indices.first()
2448                        && !Self::is_vector_index(oldest)
2449                    {
2450                        uuids_to_keep.insert(oldest.uuid);
2451                    }
2452                } else {
2453                    // At least one index has non-empty bitmap - keep all non-empty indices
2454                    for index in non_empty_indices {
2455                        uuids_to_keep.insert(index.uuid);
2456                    }
2457                }
2458            } else {
2459                // Single index - keep it unless it's an empty vector index
2460                if let Some(index) = same_name_indices.first() {
2461                    let is_empty = index
2462                        .effective_fragment_bitmap(&existing_fragments)
2463                        .as_ref()
2464                        .is_none_or(|bitmap| bitmap.is_empty());
2465                    let is_vector = Self::is_vector_index(index);
2466
2467                    // Keep the index unless it's an empty vector index
2468                    if !is_empty || !is_vector {
2469                        uuids_to_keep.insert(index.uuid);
2470                    }
2471                }
2472            }
2473        }
2474
2475        // Use Vec::retain to safely remove indices
2476        indices.retain(|index| {
2477            index.name == FRAG_REUSE_INDEX_NAME || uuids_to_keep.contains(&index.uuid)
2478        });
2479    }
2480
2481    fn recalculate_fragment_bitmap(
2482        old: &RoaringBitmap,
2483        groups: &[RewriteGroup],
2484    ) -> Result<RoaringBitmap> {
2485        let mut new_bitmap = old.clone();
2486        for group in groups {
2487            let any_in_index = group
2488                .old_fragments
2489                .iter()
2490                .any(|frag| old.contains(frag.id as u32));
2491            let all_in_index = group
2492                .old_fragments
2493                .iter()
2494                .all(|frag| old.contains(frag.id as u32));
2495            // Any rewrite group may or may not be covered by the index.  However, if any fragment
2496            // in a rewrite group was previously covered by the index then all fragments in the rewrite
2497            // group must have been previously covered by the index.  plan_compaction takes care of
2498            // this for us so this should be safe to assume.
2499            if any_in_index {
2500                if all_in_index {
2501                    for frag_id in group.old_fragments.iter().map(|frag| frag.id as u32) {
2502                        new_bitmap.remove(frag_id);
2503                    }
2504                    new_bitmap.extend(group.new_fragments.iter().map(|frag| frag.id as u32));
2505                } else {
2506                    return Err(Error::invalid_input(
2507                        "The compaction plan included a rewrite group that was a split of indexed and non-indexed data",
2508                    ));
2509                }
2510            }
2511        }
2512        Ok(new_bitmap)
2513    }
2514
2515    fn handle_rewrite_indices(
2516        indices: &mut [IndexMetadata],
2517        rewritten_indices: &[RewrittenIndex],
2518        groups: &[RewriteGroup],
2519    ) -> Result<()> {
2520        let mut modified_indices = HashSet::new();
2521
2522        for rewritten_index in rewritten_indices {
2523            if !modified_indices.insert(rewritten_index.old_id) {
2524                return Err(Error::invalid_input(format!(
2525                    "An invalid compaction plan must have been generated because multiple tasks modified the same index: {}",
2526                    rewritten_index.old_id
2527                )));
2528            }
2529
2530            // Skip indices that no longer exist (may have been removed by concurrent operation)
2531            let Some(index) = indices
2532                .iter_mut()
2533                .find(|idx| idx.uuid == rewritten_index.old_id)
2534            else {
2535                continue;
2536            };
2537
2538            index.fragment_bitmap = Some(Self::recalculate_fragment_bitmap(
2539                index.fragment_bitmap.as_ref().ok_or_else(|| {
2540                    Error::invalid_input(format!(
2541                        "Cannot rewrite index {} which did not store fragment bitmap",
2542                        index.uuid
2543                    ))
2544                })?,
2545                groups,
2546            )?);
2547            index.uuid = rewritten_index.new_id;
2548        }
2549        Ok(())
2550    }
2551
2552    fn handle_rewrite_fragments(
2553        final_fragments: &mut Vec<Fragment>,
2554        groups: &[RewriteGroup],
2555        fragment_id: &mut u64,
2556        version: u64,
2557        _next_row_id: Option<&u64>,
2558    ) -> Result<()> {
2559        for group in groups {
2560            // If the old fragments are contiguous, find the range
2561            let replace_range = {
2562                let start = final_fragments
2563                    .iter()
2564                    .enumerate()
2565                    .find(|(_, f)| f.id == group.old_fragments[0].id)
2566                    .ok_or_else(|| {
2567                        Error::commit_conflict_source(
2568                            version,
2569                            format!(
2570                                "dataset does not contain a fragment a rewrite operation wants to replace: id={}",
2571                                group.old_fragments[0].id
2572                            )
2573                            .into(),
2574                        )
2575                    })?
2576                    .0;
2577
2578                // Verify old_fragments matches contiguous range
2579                let mut i = 1;
2580                loop {
2581                    if i == group.old_fragments.len() {
2582                        break Some(start..start + i);
2583                    }
2584                    if final_fragments[start + i].id != group.old_fragments[i].id {
2585                        break None;
2586                    }
2587                    i += 1;
2588                }
2589            };
2590
2591            let new_fragments = Self::fragments_with_ids(group.new_fragments.clone(), fragment_id)
2592                .collect::<Vec<_>>();
2593
2594            // Version metadata for rewritten fragments is handled by the compaction code
2595            // (recalc_versions_for_rewritten_fragments) which preserves version information
2596            // from the original fragments. We don't modify it here.
2597
2598            if let Some(replace_range) = replace_range {
2599                // Efficiently path using slice
2600                final_fragments.splice(replace_range, new_fragments);
2601            } else {
2602                // Slower path for non-contiguous ranges
2603                for fragment in group.old_fragments.iter() {
2604                    final_fragments.retain(|f| f.id != fragment.id);
2605                }
2606                final_fragments.extend(new_fragments);
2607            }
2608        }
2609        Ok(())
2610    }
2611
2612    /// collect the pure(the num of row IDs are equal to the physical rows) "rewrite rows" updated fragment ids
2613    fn collect_pure_rewrite_row_update_frags_ids(fragments: &[Fragment]) -> Result<Vec<u64>> {
2614        let mut pure_update_frag_ids = Vec::new();
2615
2616        for fragment in fragments {
2617            let physical_rows = fragment
2618                .physical_rows
2619                .ok_or_else(|| Error::internal("Fragment does not have physical rows"))?
2620                as u64;
2621
2622            if let Some(row_id_meta) = &fragment.row_id_meta {
2623                let existing_row_count = match row_id_meta {
2624                    RowIdMeta::Inline(data) => {
2625                        let sequence = read_row_ids(data)?;
2626                        sequence.len() as u64
2627                    }
2628                    _ => 0,
2629                };
2630
2631                // only filter the fragments that match: all the rows have row id,
2632                // which means it does not contain inserted rows in this fragment
2633                if existing_row_count == physical_rows {
2634                    pure_update_frag_ids.push(fragment.id);
2635                }
2636            }
2637        }
2638
2639        Ok(pure_update_frag_ids)
2640    }
2641
2642    fn assign_row_ids(next_row_id: &mut u64, fragments: &mut [Fragment]) -> Result<()> {
2643        for fragment in fragments {
2644            let physical_rows = fragment
2645                .physical_rows
2646                .ok_or_else(|| Error::internal("Fragment does not have physical rows"))?
2647                as u64;
2648
2649            if fragment.row_id_meta.is_some() {
2650                // we may meet merge insert case, it only has partial row ids.
2651                // so here, we need to check if the row ids match the physical rows
2652                // if yes, continue
2653                // if not, fill the remaining row ids to the physical rows, then update row_id_meta
2654
2655                // Check if existing row IDs match the physical rows count
2656                let existing_row_count = match &fragment.row_id_meta {
2657                    Some(RowIdMeta::Inline(data)) => {
2658                        // Parse the serialized row ID sequence to get the count
2659                        let sequence = read_row_ids(data)?;
2660                        sequence.len() as u64
2661                    }
2662                    _ => 0,
2663                };
2664
2665                match existing_row_count.cmp(&physical_rows) {
2666                    Ordering::Equal => {
2667                        // Row IDs already match physical rows, continue to next fragment
2668                        continue;
2669                    }
2670                    Ordering::Less => {
2671                        // Partial row IDs - need to fill the remaining ones
2672                        let remaining_rows = physical_rows - existing_row_count;
2673                        let new_row_ids = *next_row_id..(*next_row_id + remaining_rows);
2674
2675                        // Merge existing and new row IDs
2676                        let combined_sequence = match &fragment.row_id_meta {
2677                            Some(RowIdMeta::Inline(data)) => read_row_ids(data)?,
2678                            _ => {
2679                                return Err(Error::internal(
2680                                    "Failed to deserialize existing row ID sequence",
2681                                ));
2682                            }
2683                        };
2684
2685                        let mut row_ids: Vec<u64> = combined_sequence.iter().collect();
2686                        for row_id in new_row_ids {
2687                            row_ids.push(row_id);
2688                        }
2689                        let combined_sequence = RowIdSequence::from(row_ids.as_slice());
2690
2691                        let serialized = write_row_ids(&combined_sequence);
2692                        fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
2693                        *next_row_id += remaining_rows;
2694                    }
2695                    Ordering::Greater => {
2696                        // More row IDs than physical rows - this shouldn't happen
2697                        return Err(Error::internal(format!(
2698                            "Fragment has more row IDs ({}) than physical rows ({})",
2699                            existing_row_count, physical_rows
2700                        )));
2701                    }
2702                }
2703            } else {
2704                let row_ids = *next_row_id..(*next_row_id + physical_rows);
2705                let sequence = RowIdSequence::from(row_ids);
2706                // TODO: write to a separate file if large. Possibly share a file with other fragments.
2707                let serialized = write_row_ids(&sequence);
2708                fragment.row_id_meta = Some(RowIdMeta::Inline(serialized));
2709                *next_row_id += physical_rows;
2710            }
2711        }
2712        Ok(())
2713    }
2714}
2715
2716impl From<&DataReplacementGroup> for pb::transaction::DataReplacementGroup {
2717    fn from(DataReplacementGroup(fragment_id, new_file): &DataReplacementGroup) -> Self {
2718        Self {
2719            fragment_id: *fragment_id,
2720            new_file: Some(new_file.into()),
2721        }
2722    }
2723}
2724
2725/// Convert a protobug DataReplacementGroup to a rust native DataReplacementGroup
2726/// this is unfortunately TryFrom instead of From because of the Option in the pb::DataReplacementGroup
2727impl TryFrom<pb::transaction::DataReplacementGroup> for DataReplacementGroup {
2728    type Error = Error;
2729
2730    fn try_from(message: pb::transaction::DataReplacementGroup) -> Result<Self> {
2731        Ok(Self(
2732            message.fragment_id,
2733            message
2734                .new_file
2735                .ok_or(Error::invalid_input(
2736                    "DataReplacementGroup must have a new_file",
2737                ))?
2738                .try_into()?,
2739        ))
2740    }
2741}
2742
2743impl TryFrom<pb::Transaction> for Transaction {
2744    type Error = Error;
2745
2746    fn try_from(message: pb::Transaction) -> Result<Self> {
2747        let operation = match message.operation {
2748            Some(pb::transaction::Operation::Append(pb::transaction::Append { fragments })) => {
2749                Operation::Append {
2750                    fragments: fragments
2751                        .into_iter()
2752                        .map(Fragment::try_from)
2753                        .collect::<Result<Vec<_>>>()?,
2754                }
2755            }
2756            Some(pb::transaction::Operation::Clone(pb::transaction::Clone {
2757                is_shallow,
2758                ref_name,
2759                ref_version,
2760                ref_path,
2761                branch_name,
2762            })) => Operation::Clone {
2763                is_shallow,
2764                ref_name,
2765                ref_version,
2766                ref_path,
2767                branch_name,
2768            },
2769            Some(pb::transaction::Operation::Delete(pb::transaction::Delete {
2770                updated_fragments,
2771                deleted_fragment_ids,
2772                predicate,
2773            })) => Operation::Delete {
2774                updated_fragments: updated_fragments
2775                    .into_iter()
2776                    .map(Fragment::try_from)
2777                    .collect::<Result<Vec<_>>>()?,
2778                deleted_fragment_ids,
2779                predicate,
2780            },
2781            Some(pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
2782                fragments,
2783                schema,
2784                schema_metadata: _schema_metadata, // TODO: handle metadata
2785                config_upsert_values,
2786                initial_bases,
2787            })) => {
2788                let config_upsert_option = if config_upsert_values.is_empty() {
2789                    None
2790                } else {
2791                    Some(config_upsert_values)
2792                };
2793
2794                Operation::Overwrite {
2795                    fragments: fragments
2796                        .into_iter()
2797                        .map(Fragment::try_from)
2798                        .collect::<Result<Vec<_>>>()?,
2799                    schema: Schema::from(&Fields(schema)),
2800                    config_upsert_values: config_upsert_option,
2801                    initial_bases: if initial_bases.is_empty() {
2802                        None
2803                    } else {
2804                        Some(initial_bases.into_iter().map(BasePath::from).collect())
2805                    },
2806                }
2807            }
2808            Some(pb::transaction::Operation::ReserveFragments(
2809                pb::transaction::ReserveFragments { num_fragments },
2810            )) => Operation::ReserveFragments { num_fragments },
2811            Some(pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
2812                old_fragments,
2813                new_fragments,
2814                groups,
2815                rewritten_indices,
2816            })) => {
2817                let groups = if !groups.is_empty() {
2818                    groups
2819                        .into_iter()
2820                        .map(RewriteGroup::try_from)
2821                        .collect::<Result<_>>()?
2822                } else {
2823                    vec![RewriteGroup {
2824                        old_fragments: old_fragments
2825                            .into_iter()
2826                            .map(Fragment::try_from)
2827                            .collect::<Result<Vec<_>>>()?,
2828                        new_fragments: new_fragments
2829                            .into_iter()
2830                            .map(Fragment::try_from)
2831                            .collect::<Result<Vec<_>>>()?,
2832                    }]
2833                };
2834                let rewritten_indices = rewritten_indices
2835                    .iter()
2836                    .map(RewrittenIndex::try_from)
2837                    .collect::<Result<_>>()?;
2838
2839                Operation::Rewrite {
2840                    groups,
2841                    rewritten_indices,
2842                    frag_reuse_index: None,
2843                }
2844            }
2845            Some(pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
2846                new_indices,
2847                removed_indices,
2848            })) => Operation::CreateIndex {
2849                new_indices: new_indices
2850                    .into_iter()
2851                    .map(IndexMetadata::try_from)
2852                    .collect::<Result<_>>()?,
2853                removed_indices: removed_indices
2854                    .into_iter()
2855                    .map(IndexMetadata::try_from)
2856                    .collect::<Result<_>>()?,
2857            },
2858            Some(pb::transaction::Operation::Merge(pb::transaction::Merge {
2859                fragments,
2860                schema,
2861                schema_metadata: _schema_metadata, // TODO: handle metadata
2862            })) => Operation::Merge {
2863                fragments: fragments
2864                    .into_iter()
2865                    .map(Fragment::try_from)
2866                    .collect::<Result<Vec<_>>>()?,
2867                schema: Schema::from(&Fields(schema)),
2868            },
2869            Some(pb::transaction::Operation::Restore(pb::transaction::Restore { version })) => {
2870                Operation::Restore { version }
2871            }
2872            Some(pb::transaction::Operation::Update(pb::transaction::Update {
2873                removed_fragment_ids,
2874                updated_fragments,
2875                new_fragments,
2876                fields_modified,
2877                merged_generations,
2878                fields_for_preserving_frag_bitmap,
2879                update_mode,
2880                inserted_rows,
2881            })) => Operation::Update {
2882                removed_fragment_ids,
2883                updated_fragments: updated_fragments
2884                    .into_iter()
2885                    .map(Fragment::try_from)
2886                    .collect::<Result<Vec<_>>>()?,
2887                new_fragments: new_fragments
2888                    .into_iter()
2889                    .map(Fragment::try_from)
2890                    .collect::<Result<Vec<_>>>()?,
2891                fields_modified,
2892                merged_generations: merged_generations
2893                    .into_iter()
2894                    .map(|m| MergedGeneration::try_from(m).unwrap())
2895                    .collect(),
2896                fields_for_preserving_frag_bitmap,
2897                update_mode: match update_mode {
2898                    0 => Some(UpdateMode::RewriteRows),
2899                    1 => Some(UpdateMode::RewriteColumns),
2900                    _ => Some(UpdateMode::RewriteRows),
2901                },
2902                inserted_rows_filter: inserted_rows
2903                    .map(|ik| KeyExistenceFilter::try_from(&ik))
2904                    .transpose()?,
2905            },
2906            Some(pb::transaction::Operation::Project(pb::transaction::Project { schema })) => {
2907                Operation::Project {
2908                    schema: Schema::from(&Fields(schema)),
2909                }
2910            }
2911            Some(pb::transaction::Operation::UpdateConfig(update_config)) => {
2912                // Check if new-style fields are present
2913                let has_new_fields = update_config.config_updates.is_some()
2914                    || update_config.table_metadata_updates.is_some()
2915                    || update_config.schema_metadata_updates.is_some()
2916                    || !update_config.field_metadata_updates.is_empty();
2917
2918                // Check if old-style fields are present
2919                let has_old_fields = !update_config.upsert_values.is_empty()
2920                    || !update_config.delete_keys.is_empty()
2921                    || !update_config.schema_metadata.is_empty()
2922                    || !update_config.field_metadata.is_empty();
2923
2924                // Error if both are present
2925                if has_new_fields && has_old_fields {
2926                    return Err(Error::invalid_input_source(
2927                        "Cannot mix old and new style UpdateConfig fields".into(),
2928                    ));
2929                }
2930
2931                if has_old_fields {
2932                    // Translate old-style to new-style
2933                    let config_updates = if !update_config.upsert_values.is_empty()
2934                        || !update_config.delete_keys.is_empty()
2935                    {
2936                        Some(translate_config_updates(
2937                            &update_config.upsert_values,
2938                            &update_config.delete_keys,
2939                        ))
2940                    } else {
2941                        None
2942                    };
2943
2944                    let schema_metadata_updates = if !update_config.schema_metadata.is_empty() {
2945                        Some(translate_schema_metadata_updates(
2946                            &update_config.schema_metadata,
2947                        ))
2948                    } else {
2949                        None
2950                    };
2951
2952                    let field_metadata_updates = update_config
2953                        .field_metadata
2954                        .into_iter()
2955                        .map(|(field_id, field_meta_update)| {
2956                            (
2957                                field_id as i32,
2958                                translate_schema_metadata_updates(&field_meta_update.metadata),
2959                            )
2960                        })
2961                        .collect();
2962
2963                    Operation::UpdateConfig {
2964                        config_updates,
2965                        table_metadata_updates: None,
2966                        schema_metadata_updates,
2967                        field_metadata_updates,
2968                    }
2969                } else {
2970                    // Use new-style fields directly (convert from protobuf)
2971                    Operation::UpdateConfig {
2972                        config_updates: update_config.config_updates.as_ref().map(UpdateMap::from),
2973                        table_metadata_updates: update_config
2974                            .table_metadata_updates
2975                            .as_ref()
2976                            .map(UpdateMap::from),
2977                        schema_metadata_updates: update_config
2978                            .schema_metadata_updates
2979                            .as_ref()
2980                            .map(UpdateMap::from),
2981                        field_metadata_updates: update_config
2982                            .field_metadata_updates
2983                            .iter()
2984                            .map(|(field_id, pb_update_map)| {
2985                                (*field_id, UpdateMap::from(pb_update_map))
2986                            })
2987                            .collect(),
2988                    }
2989                }
2990            }
2991            Some(pb::transaction::Operation::DataReplacement(
2992                pb::transaction::DataReplacement { replacements },
2993            )) => Operation::DataReplacement {
2994                replacements: replacements
2995                    .into_iter()
2996                    .map(DataReplacementGroup::try_from)
2997                    .collect::<Result<Vec<_>>>()?,
2998            },
2999            Some(pb::transaction::Operation::UpdateMemWalState(
3000                pb::transaction::UpdateMemWalState { merged_generations },
3001            )) => Operation::UpdateMemWalState {
3002                merged_generations: merged_generations
3003                    .into_iter()
3004                    .map(|m| MergedGeneration::try_from(m).unwrap())
3005                    .collect(),
3006            },
3007            Some(pb::transaction::Operation::UpdateBases(pb::transaction::UpdateBases {
3008                new_bases,
3009            })) => Operation::UpdateBases {
3010                new_bases: new_bases.into_iter().map(BasePath::from).collect(),
3011            },
3012            None => {
3013                return Err(Error::internal(
3014                    "Transaction message did not contain an operation".to_string(),
3015                ));
3016            }
3017        };
3018        Ok(Self {
3019            read_version: message.read_version,
3020            uuid: message.uuid.clone(),
3021            operation,
3022            tag: if message.tag.is_empty() {
3023                None
3024            } else {
3025                Some(message.tag.clone())
3026            },
3027            transaction_properties: if message.transaction_properties.is_empty() {
3028                None
3029            } else {
3030                Some(Arc::new(message.transaction_properties))
3031            },
3032        })
3033    }
3034}
3035
3036impl TryFrom<&pb::transaction::rewrite::RewrittenIndex> for RewrittenIndex {
3037    type Error = Error;
3038
3039    fn try_from(message: &pb::transaction::rewrite::RewrittenIndex) -> Result<Self> {
3040        Ok(Self {
3041            old_id: message
3042                .old_id
3043                .as_ref()
3044                .map(Uuid::try_from)
3045                .ok_or_else(|| {
3046                    Error::invalid_input("required field (old_id) missing from message".to_string())
3047                })??,
3048            new_id: message
3049                .new_id
3050                .as_ref()
3051                .map(Uuid::try_from)
3052                .ok_or_else(|| {
3053                    Error::invalid_input("required field (new_id) missing from message".to_string())
3054                })??,
3055            new_index_details: message
3056                .new_index_details
3057                .as_ref()
3058                .ok_or_else(|| {
3059                    Error::invalid_input("new_index_details is a required field".to_string())
3060                })?
3061                .clone(),
3062            new_index_version: message.new_index_version,
3063        })
3064    }
3065}
3066
3067impl TryFrom<pb::transaction::rewrite::RewriteGroup> for RewriteGroup {
3068    type Error = Error;
3069
3070    fn try_from(message: pb::transaction::rewrite::RewriteGroup) -> Result<Self> {
3071        Ok(Self {
3072            old_fragments: message
3073                .old_fragments
3074                .into_iter()
3075                .map(Fragment::try_from)
3076                .collect::<Result<Vec<_>>>()?,
3077            new_fragments: message
3078                .new_fragments
3079                .into_iter()
3080                .map(Fragment::try_from)
3081                .collect::<Result<Vec<_>>>()?,
3082        })
3083    }
3084}
3085
3086impl From<&Transaction> for pb::Transaction {
3087    fn from(value: &Transaction) -> Self {
3088        let operation = match &value.operation {
3089            Operation::Append { fragments } => {
3090                pb::transaction::Operation::Append(pb::transaction::Append {
3091                    fragments: fragments.iter().map(pb::DataFragment::from).collect(),
3092                })
3093            }
3094            Operation::Clone {
3095                is_shallow,
3096                ref_name,
3097                ref_version,
3098                ref_path,
3099                branch_name,
3100            } => pb::transaction::Operation::Clone(pb::transaction::Clone {
3101                is_shallow: *is_shallow,
3102                ref_name: ref_name.clone(),
3103                ref_version: *ref_version,
3104                ref_path: ref_path.clone(),
3105                branch_name: branch_name.clone(),
3106            }),
3107            Operation::Delete {
3108                updated_fragments,
3109                deleted_fragment_ids,
3110                predicate,
3111            } => pb::transaction::Operation::Delete(pb::transaction::Delete {
3112                updated_fragments: updated_fragments
3113                    .iter()
3114                    .map(pb::DataFragment::from)
3115                    .collect(),
3116                deleted_fragment_ids: deleted_fragment_ids.clone(),
3117                predicate: predicate.clone(),
3118            }),
3119            Operation::Overwrite {
3120                fragments,
3121                schema,
3122                config_upsert_values,
3123                initial_bases,
3124            } => {
3125                pb::transaction::Operation::Overwrite(pb::transaction::Overwrite {
3126                    fragments: fragments.iter().map(pb::DataFragment::from).collect(),
3127                    schema: Fields::from(schema).0,
3128                    schema_metadata: Default::default(), // TODO: handle metadata
3129                    config_upsert_values: config_upsert_values
3130                        .clone()
3131                        .unwrap_or(Default::default()),
3132                    initial_bases: initial_bases
3133                        .as_ref()
3134                        .map(|paths| {
3135                            paths
3136                                .iter()
3137                                .cloned()
3138                                .map(|bp: BasePath| -> pb::BasePath { bp.into() })
3139                                .collect::<Vec<pb::BasePath>>()
3140                        })
3141                        .unwrap_or_default(),
3142                })
3143            }
3144            Operation::ReserveFragments { num_fragments } => {
3145                pb::transaction::Operation::ReserveFragments(pb::transaction::ReserveFragments {
3146                    num_fragments: *num_fragments,
3147                })
3148            }
3149            Operation::Rewrite {
3150                groups,
3151                rewritten_indices,
3152                frag_reuse_index: _,
3153            } => pb::transaction::Operation::Rewrite(pb::transaction::Rewrite {
3154                groups: groups
3155                    .iter()
3156                    .map(pb::transaction::rewrite::RewriteGroup::from)
3157                    .collect(),
3158                rewritten_indices: rewritten_indices
3159                    .iter()
3160                    .map(|rewritten| rewritten.into())
3161                    .collect(),
3162                ..Default::default()
3163            }),
3164            Operation::CreateIndex {
3165                new_indices,
3166                removed_indices,
3167            } => pb::transaction::Operation::CreateIndex(pb::transaction::CreateIndex {
3168                new_indices: new_indices.iter().map(pb::IndexMetadata::from).collect(),
3169                removed_indices: removed_indices
3170                    .iter()
3171                    .map(pb::IndexMetadata::from)
3172                    .collect(),
3173            }),
3174            Operation::Merge { fragments, schema } => {
3175                pb::transaction::Operation::Merge(pb::transaction::Merge {
3176                    fragments: fragments.iter().map(pb::DataFragment::from).collect(),
3177                    schema: Fields::from(schema).0,
3178                    schema_metadata: Default::default(), // TODO: handle metadata
3179                })
3180            }
3181            Operation::Restore { version } => {
3182                pb::transaction::Operation::Restore(pb::transaction::Restore { version: *version })
3183            }
3184            Operation::Update {
3185                removed_fragment_ids,
3186                updated_fragments,
3187                new_fragments,
3188                fields_modified,
3189                merged_generations,
3190                fields_for_preserving_frag_bitmap,
3191                update_mode,
3192                inserted_rows_filter,
3193            } => pb::transaction::Operation::Update(pb::transaction::Update {
3194                removed_fragment_ids: removed_fragment_ids.clone(),
3195                updated_fragments: updated_fragments
3196                    .iter()
3197                    .map(pb::DataFragment::from)
3198                    .collect(),
3199                new_fragments: new_fragments.iter().map(pb::DataFragment::from).collect(),
3200                fields_modified: fields_modified.clone(),
3201                merged_generations: merged_generations
3202                    .iter()
3203                    .map(pb::MergedGeneration::from)
3204                    .collect(),
3205                fields_for_preserving_frag_bitmap: fields_for_preserving_frag_bitmap.clone(),
3206                update_mode: update_mode
3207                    .as_ref()
3208                    .map(|mode| match mode {
3209                        UpdateMode::RewriteRows => 0,
3210                        UpdateMode::RewriteColumns => 1,
3211                    })
3212                    .unwrap_or(0),
3213                inserted_rows: inserted_rows_filter.as_ref().map(|ik| ik.into()),
3214            }),
3215            Operation::Project { schema } => {
3216                pb::transaction::Operation::Project(pb::transaction::Project {
3217                    schema: Fields::from(schema).0,
3218                })
3219            }
3220            Operation::UpdateConfig {
3221                config_updates,
3222                table_metadata_updates,
3223                schema_metadata_updates,
3224                field_metadata_updates,
3225            } => pb::transaction::Operation::UpdateConfig(pb::transaction::UpdateConfig {
3226                config_updates: config_updates
3227                    .as_ref()
3228                    .map(pb::transaction::UpdateMap::from),
3229                table_metadata_updates: table_metadata_updates
3230                    .as_ref()
3231                    .map(pb::transaction::UpdateMap::from),
3232                schema_metadata_updates: schema_metadata_updates
3233                    .as_ref()
3234                    .map(pb::transaction::UpdateMap::from),
3235                field_metadata_updates: field_metadata_updates
3236                    .iter()
3237                    .map(|(field_id, update_map)| {
3238                        (*field_id, pb::transaction::UpdateMap::from(update_map))
3239                    })
3240                    .collect(),
3241                // Leave old fields empty - we only write new-style fields
3242                upsert_values: Default::default(),
3243                delete_keys: Default::default(),
3244                schema_metadata: Default::default(),
3245                field_metadata: Default::default(),
3246            }),
3247            Operation::DataReplacement { replacements } => {
3248                pb::transaction::Operation::DataReplacement(pb::transaction::DataReplacement {
3249                    replacements: replacements
3250                        .iter()
3251                        .map(pb::transaction::DataReplacementGroup::from)
3252                        .collect(),
3253                })
3254            }
3255            Operation::UpdateMemWalState { merged_generations } => {
3256                pb::transaction::Operation::UpdateMemWalState(pb::transaction::UpdateMemWalState {
3257                    merged_generations: merged_generations
3258                        .iter()
3259                        .map(pb::MergedGeneration::from)
3260                        .collect::<Vec<_>>(),
3261                })
3262            }
3263            Operation::UpdateBases { new_bases } => {
3264                pb::transaction::Operation::UpdateBases(pb::transaction::UpdateBases {
3265                    new_bases: new_bases
3266                        .iter()
3267                        .cloned()
3268                        .map(|bp: BasePath| -> pb::BasePath { bp.into() })
3269                        .collect::<Vec<pb::BasePath>>(),
3270                })
3271            }
3272        };
3273
3274        let transaction_properties = value
3275            .transaction_properties
3276            .as_ref()
3277            .map(|arc| arc.as_ref().clone())
3278            .unwrap_or_default();
3279        Self {
3280            read_version: value.read_version,
3281            uuid: value.uuid.clone(),
3282            operation: Some(operation),
3283            tag: value.tag.clone().unwrap_or("".to_string()),
3284            transaction_properties,
3285        }
3286    }
3287}
3288
3289impl From<&RewrittenIndex> for pb::transaction::rewrite::RewrittenIndex {
3290    fn from(value: &RewrittenIndex) -> Self {
3291        Self {
3292            old_id: Some((&value.old_id).into()),
3293            new_id: Some((&value.new_id).into()),
3294            new_index_details: Some(value.new_index_details.clone()),
3295            new_index_version: value.new_index_version,
3296        }
3297    }
3298}
3299
3300impl From<&RewriteGroup> for pb::transaction::rewrite::RewriteGroup {
3301    fn from(value: &RewriteGroup) -> Self {
3302        Self {
3303            old_fragments: value
3304                .old_fragments
3305                .iter()
3306                .map(pb::DataFragment::from)
3307                .collect(),
3308            new_fragments: value
3309                .new_fragments
3310                .iter()
3311                .map(pb::DataFragment::from)
3312                .collect(),
3313        }
3314    }
3315}
3316
3317/// Validate the operation is valid for the given manifest.
3318pub fn validate_operation(manifest: Option<&Manifest>, operation: &Operation) -> Result<()> {
3319    let manifest = match (manifest, operation) {
3320        (
3321            None,
3322            Operation::Overwrite {
3323                fragments, schema, ..
3324            },
3325        ) => {
3326            // Validate here because we are going to return early.
3327            schema_fragments_valid(None, schema, fragments)?;
3328
3329            return Ok(());
3330        }
3331        (None, Operation::Clone { .. }) => return Ok(()),
3332        (Some(manifest), _) => manifest,
3333        (None, _) => {
3334            return Err(Error::invalid_input(format!(
3335                "Cannot apply operation {} to non-existent dataset",
3336                operation.name()
3337            )));
3338        }
3339    };
3340
3341    match operation {
3342        Operation::Append { fragments } => {
3343            // Fragments must contain all fields in the schema
3344            schema_fragments_valid(Some(manifest), &manifest.schema, fragments)
3345        }
3346        Operation::Project { schema } => {
3347            schema_fragments_valid(Some(manifest), schema, manifest.fragments.as_ref())
3348        }
3349        Operation::Merge { fragments, schema } => {
3350            merge_fragments_valid(manifest, fragments)?;
3351            schema_fragments_valid(Some(manifest), schema, fragments)
3352        }
3353        Operation::Overwrite {
3354            fragments,
3355            schema,
3356            config_upsert_values: None,
3357            initial_bases: _,
3358        } => schema_fragments_valid(Some(manifest), schema, fragments),
3359        Operation::Update {
3360            updated_fragments,
3361            new_fragments,
3362            ..
3363        } => {
3364            schema_fragments_valid(Some(manifest), &manifest.schema, updated_fragments)?;
3365            schema_fragments_valid(Some(manifest), &manifest.schema, new_fragments)
3366        }
3367        _ => Ok(()),
3368    }
3369}
3370
3371fn schema_fragments_valid(
3372    manifest: Option<&Manifest>,
3373    schema: &Schema,
3374    fragments: &[Fragment],
3375) -> Result<()> {
3376    if let Some(manifest) = manifest
3377        && manifest.data_storage_format.lance_file_version()? == LanceFileVersion::Legacy
3378    {
3379        return schema_fragments_legacy_valid(schema, fragments);
3380    }
3381    // validate that each data file at least contains one field.
3382    for fragment in fragments {
3383        for data_file in &fragment.files {
3384            if data_file.fields.iter().len() == 0 {
3385                return Err(Error::invalid_input(format!(
3386                    "Datafile {} does not contain any fields",
3387                    data_file.path
3388                )));
3389            }
3390        }
3391    }
3392    Ok(())
3393}
3394
3395/// Check that each fragment contains all fields in the schema.
3396/// It is not required that the schema contains all fields in the fragment.
3397/// There may be masked fields.
3398fn schema_fragments_legacy_valid(schema: &Schema, fragments: &[Fragment]) -> Result<()> {
3399    // TODO: add additional validation. Consider consolidating with various
3400    // validate() methods in the codebase.
3401    for fragment in fragments {
3402        for field in schema.fields_pre_order() {
3403            if !fragment
3404                .files
3405                .iter()
3406                .flat_map(|f| f.fields.iter())
3407                .any(|f_id| f_id == &field.id)
3408            {
3409                return Err(Error::invalid_input(format!(
3410                    "Fragment {} does not contain field {:?}",
3411                    fragment.id, field
3412                )));
3413            }
3414        }
3415    }
3416    Ok(())
3417}
3418
3419/// Validate that Merge operations preserve all original fragments.
3420/// Merge operations should only add columns or rows, not reduce fragments.
3421/// This ensures fragments correspond at one-to-one with the original fragment list.
3422fn merge_fragments_valid(manifest: &Manifest, new_fragments: &[Fragment]) -> Result<()> {
3423    let original_fragments = manifest.fragments.as_ref();
3424
3425    // Additional validation: ensure we're not accidentally reducing the fragment count
3426    if new_fragments.len() < original_fragments.len() {
3427        return Err(Error::invalid_input(format!(
3428            "Merge operation reduced fragment count from {} to {}. \
3429             Merge operations should only add columns, not reduce fragments.",
3430            original_fragments.len(),
3431            new_fragments.len()
3432        )));
3433    }
3434
3435    // Collect new fragment IDs
3436    let new_fragment_map: HashMap<u64, &Fragment> =
3437        new_fragments.iter().map(|f| (f.id, f)).collect();
3438
3439    // Check that all original fragments are preserved in the new fragments list
3440    // Validate that each original fragment's metadata is preserved
3441    let mut missing_fragments: Vec<u64> = Vec::new();
3442    for original_fragment in original_fragments {
3443        if let Some(new_fragment) = new_fragment_map.get(&original_fragment.id) {
3444            // Validate physical_rows (row count) hasn't changed
3445            if original_fragment.physical_rows != new_fragment.physical_rows {
3446                return Err(Error::invalid_input(format!(
3447                    "Merge operation changed row count for fragment {}. \
3448                     Original: {:?}, New: {:?}. \
3449                     Merge operations should preserve fragment row counts and only add new columns.",
3450                    original_fragment.id,
3451                    original_fragment.physical_rows,
3452                    new_fragment.physical_rows
3453                )));
3454            }
3455        } else {
3456            missing_fragments.push(original_fragment.id);
3457        }
3458    }
3459
3460    if !missing_fragments.is_empty() {
3461        return Err(Error::invalid_input(format!(
3462            "Merge operation is missing original fragments: {:?}. \
3463             Merge operations should preserve all original fragments and only add new columns. \
3464             Expected fragments: {:?}, but got: {:?}",
3465            missing_fragments,
3466            original_fragments.iter().map(|f| f.id).collect::<Vec<_>>(),
3467            new_fragment_map.keys().copied().collect::<Vec<_>>()
3468        )));
3469    }
3470
3471    Ok(())
3472}
3473
3474#[cfg(test)]
3475mod tests {
3476    use super::*;
3477    use lance_io::utils::CachedFileSize;
3478
3479    #[test]
3480    fn test_rewrite_fragments() {
3481        let existing_fragments: Vec<Fragment> = (0..10).map(Fragment::new).collect();
3482
3483        let mut final_fragments = existing_fragments;
3484        let rewrite_groups = vec![
3485            // Since these are contiguous, they will be put in the same location
3486            // as 1 and 2.
3487            RewriteGroup {
3488                old_fragments: vec![Fragment::new(1), Fragment::new(2)],
3489                // These two fragments were previously reserved
3490                new_fragments: vec![Fragment::new(15), Fragment::new(16)],
3491            },
3492            // These are not contiguous, so they will be inserted at the end.
3493            RewriteGroup {
3494                old_fragments: vec![Fragment::new(5), Fragment::new(8)],
3495                // We pretend this id was not reserved.  Does not happen in practice today
3496                // but we want to leave the door open.
3497                new_fragments: vec![Fragment::new(0)],
3498            },
3499        ];
3500
3501        let mut fragment_id = 20;
3502        let version = 0;
3503
3504        Transaction::handle_rewrite_fragments(
3505            &mut final_fragments,
3506            &rewrite_groups,
3507            &mut fragment_id,
3508            version,
3509            None,
3510        )
3511        .unwrap();
3512
3513        assert_eq!(fragment_id, 21);
3514
3515        let expected_fragments: Vec<Fragment> = vec![
3516            Fragment::new(0),
3517            Fragment::new(15),
3518            Fragment::new(16),
3519            Fragment::new(3),
3520            Fragment::new(4),
3521            Fragment::new(6),
3522            Fragment::new(7),
3523            Fragment::new(9),
3524            Fragment::new(20),
3525        ];
3526
3527        assert_eq!(final_fragments, expected_fragments);
3528    }
3529
3530    #[test]
3531    fn test_merge_fragments_valid() {
3532        use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
3533        use lance_core::datatypes::Schema as LanceSchema;
3534        use lance_table::format::Manifest;
3535        use std::sync::Arc;
3536
3537        // Create a simple schema for testing
3538        let schema = ArrowSchema::new(vec![
3539            ArrowField::new("id", DataType::Int32, false),
3540            ArrowField::new("name", DataType::Utf8, false),
3541        ]);
3542
3543        // Create original fragments
3544        let original_fragments = vec![Fragment::new(1), Fragment::new(2), Fragment::new(3)];
3545
3546        // Create a manifest with original fragments
3547        let manifest = Manifest::new(
3548            LanceSchema::try_from(&schema).unwrap(),
3549            Arc::new(original_fragments),
3550            DataStorageFormat::new(LanceFileVersion::V2_0),
3551            HashMap::new(),
3552        );
3553
3554        // Test 1: Empty fragments should fail
3555        let empty_fragments = vec![];
3556        let result = merge_fragments_valid(&manifest, &empty_fragments);
3557        assert!(result.is_err());
3558        assert!(
3559            result
3560                .unwrap_err()
3561                .to_string()
3562                .contains("reduced fragment count")
3563        );
3564
3565        // Test 2: Missing original fragments should fail
3566        let missing_fragments = vec![
3567            Fragment::new(1),
3568            Fragment::new(2),
3569            // Fragment 3 is missing
3570            Fragment::new(4), // New fragment
3571        ];
3572        let result = merge_fragments_valid(&manifest, &missing_fragments);
3573        assert!(result.is_err());
3574        assert!(
3575            result
3576                .unwrap_err()
3577                .to_string()
3578                .contains("missing original fragments")
3579        );
3580
3581        // Test 3: Reduced fragment count should fail
3582        let reduced_fragments = vec![
3583            Fragment::new(1),
3584            Fragment::new(2),
3585            // Fragment 3 is missing, no new fragments added
3586        ];
3587        let result = merge_fragments_valid(&manifest, &reduced_fragments);
3588        assert!(result.is_err());
3589        assert!(
3590            result
3591                .unwrap_err()
3592                .to_string()
3593                .contains("reduced fragment count")
3594        );
3595
3596        // Test 4: Valid merge with all original fragments plus new ones should succeed
3597        let valid_fragments = vec![
3598            Fragment::new(1),
3599            Fragment::new(2),
3600            Fragment::new(3),
3601            Fragment::new(4), // New fragment
3602            Fragment::new(5), // Another new fragment
3603        ];
3604        let result = merge_fragments_valid(&manifest, &valid_fragments);
3605        assert!(result.is_ok());
3606
3607        // Test 5: Same fragments (no new ones) should succeed
3608        let same_fragments = vec![Fragment::new(1), Fragment::new(2), Fragment::new(3)];
3609        let result = merge_fragments_valid(&manifest, &same_fragments);
3610        assert!(result.is_ok());
3611    }
3612
3613    #[test]
3614    fn test_remove_tombstoned_data_files() {
3615        // Create a fragment with mixed data files: some normal, some fully tombstoned
3616        let mut fragment = Fragment::new(1);
3617
3618        // Add a normal data file with valid field IDs
3619        fragment.files.push(DataFile {
3620            path: "normal.lance".to_string(),
3621            fields: vec![1, 2, 3],
3622            column_indices: vec![],
3623            file_major_version: 2,
3624            file_minor_version: 0,
3625            file_size_bytes: CachedFileSize::new(1000),
3626            base_id: None,
3627        });
3628
3629        // Add a data file with all fields tombstoned
3630        fragment.files.push(DataFile {
3631            path: "all_tombstoned.lance".to_string(),
3632            fields: vec![-2, -2, -2],
3633            column_indices: vec![],
3634            file_major_version: 2,
3635            file_minor_version: 0,
3636            file_size_bytes: CachedFileSize::new(500),
3637            base_id: None,
3638        });
3639
3640        // Add a data file with mixed tombstoned and valid fields
3641        fragment.files.push(DataFile {
3642            path: "mixed.lance".to_string(),
3643            fields: vec![4, -2, 5],
3644            column_indices: vec![],
3645            file_major_version: 2,
3646            file_minor_version: 0,
3647            file_size_bytes: CachedFileSize::new(750),
3648            base_id: None,
3649        });
3650
3651        // Add another fully tombstoned file
3652        fragment.files.push(DataFile {
3653            path: "another_tombstoned.lance".to_string(),
3654            fields: vec![-2],
3655            column_indices: vec![],
3656            file_major_version: 2,
3657            file_minor_version: 0,
3658            file_size_bytes: CachedFileSize::new(250),
3659            base_id: None,
3660        });
3661
3662        let mut fragments = vec![fragment];
3663
3664        // Apply the cleanup
3665        Transaction::remove_tombstoned_data_files(&mut fragments);
3666
3667        // Should have removed the two fully tombstoned files
3668        assert_eq!(fragments[0].files.len(), 2);
3669        assert_eq!(fragments[0].files[0].path, "normal.lance");
3670        assert_eq!(fragments[0].files[1].path, "mixed.lance");
3671    }
3672
3673    #[test]
3674    fn test_assign_row_ids_new_fragment() {
3675        // Test assigning row IDs to a fragment without existing row IDs
3676        let mut fragments = vec![Fragment {
3677            id: 1,
3678            physical_rows: Some(100),
3679            row_id_meta: None,
3680            files: vec![],
3681            deletion_file: None,
3682            last_updated_at_version_meta: None,
3683            created_at_version_meta: None,
3684        }];
3685        let mut next_row_id = 0;
3686
3687        Transaction::assign_row_ids(&mut next_row_id, &mut fragments).unwrap();
3688
3689        assert_eq!(next_row_id, 100);
3690        assert!(fragments[0].row_id_meta.is_some());
3691
3692        if let Some(RowIdMeta::Inline(data)) = &fragments[0].row_id_meta {
3693            let sequence = read_row_ids(data).unwrap();
3694            assert_eq!(sequence.len(), 100);
3695            let row_ids: Vec<u64> = sequence.iter().collect();
3696            assert_eq!(row_ids, (0..100).collect::<Vec<u64>>());
3697        } else {
3698            panic!("Expected inline row ID metadata");
3699        }
3700    }
3701
3702    #[test]
3703    fn test_assign_row_ids_existing_complete() {
3704        // Test with fragment that already has complete row IDs
3705        let existing_sequence = RowIdSequence::from(0..50);
3706        let serialized = write_row_ids(&existing_sequence);
3707
3708        let mut fragments = vec![Fragment {
3709            id: 1,
3710            physical_rows: Some(50),
3711            row_id_meta: Some(RowIdMeta::Inline(serialized)),
3712            files: vec![],
3713            deletion_file: None,
3714            last_updated_at_version_meta: None,
3715            created_at_version_meta: None,
3716        }];
3717        let mut next_row_id = 100;
3718
3719        Transaction::assign_row_ids(&mut next_row_id, &mut fragments).unwrap();
3720
3721        // next_row_id should not change
3722        assert_eq!(next_row_id, 100);
3723
3724        if let Some(RowIdMeta::Inline(data)) = &fragments[0].row_id_meta {
3725            let sequence = read_row_ids(data).unwrap();
3726            assert_eq!(sequence.len(), 50);
3727            let row_ids: Vec<u64> = sequence.iter().collect();
3728            assert_eq!(row_ids, (0..50).collect::<Vec<u64>>());
3729        } else {
3730            panic!("Expected inline row ID metadata");
3731        }
3732    }
3733
3734    #[test]
3735    fn test_assign_row_ids_partial_existing() {
3736        // Test with fragment that has partial row IDs (merge insert case)
3737        let existing_sequence = RowIdSequence::from(0..30);
3738        let serialized = write_row_ids(&existing_sequence);
3739
3740        let mut fragments = vec![Fragment {
3741            id: 1,
3742            physical_rows: Some(50), // More physical rows than existing row IDs
3743            row_id_meta: Some(RowIdMeta::Inline(serialized)),
3744            files: vec![],
3745            deletion_file: None,
3746            last_updated_at_version_meta: None,
3747            created_at_version_meta: None,
3748        }];
3749        let mut next_row_id = 100;
3750
3751        Transaction::assign_row_ids(&mut next_row_id, &mut fragments).unwrap();
3752
3753        // next_row_id should advance by 20 (50 - 30)
3754        assert_eq!(next_row_id, 120);
3755
3756        if let Some(RowIdMeta::Inline(data)) = &fragments[0].row_id_meta {
3757            let sequence = read_row_ids(data).unwrap();
3758            assert_eq!(sequence.len(), 50);
3759            let row_ids: Vec<u64> = sequence.iter().collect();
3760            // Should contain original 0-29 plus new 100-119
3761            let mut expected = (0..30).collect::<Vec<u64>>();
3762            expected.extend(100..120);
3763            assert_eq!(row_ids, expected);
3764        } else {
3765            panic!("Expected inline row ID metadata");
3766        }
3767    }
3768
3769    #[test]
3770    fn test_assign_row_ids_excess_row_ids() {
3771        // Test error case where fragment has more row IDs than physical rows
3772        let existing_sequence = RowIdSequence::from(0..60);
3773        let serialized = write_row_ids(&existing_sequence);
3774
3775        let mut fragments = vec![Fragment {
3776            id: 1,
3777            physical_rows: Some(50), // Less physical rows than existing row IDs
3778            row_id_meta: Some(RowIdMeta::Inline(serialized)),
3779            files: vec![],
3780            deletion_file: None,
3781            last_updated_at_version_meta: None,
3782            created_at_version_meta: None,
3783        }];
3784        let mut next_row_id = 100;
3785
3786        let result = Transaction::assign_row_ids(&mut next_row_id, &mut fragments);
3787
3788        assert!(result.is_err());
3789        if let Err(Error::Internal { message, .. }) = result {
3790            assert!(message.contains("more row IDs (60) than physical rows (50)"));
3791        } else {
3792            panic!("Expected Internal error about excess row IDs");
3793        }
3794    }
3795
3796    #[test]
3797    fn test_assign_row_ids_multiple_fragments() {
3798        // Test with multiple fragments, some with existing row IDs, some without
3799        let existing_sequence = RowIdSequence::from(500..520);
3800        let serialized = write_row_ids(&existing_sequence);
3801
3802        let mut fragments = vec![
3803            Fragment {
3804                id: 1,
3805                physical_rows: Some(30), // No existing row IDs
3806                row_id_meta: None,
3807                files: vec![],
3808                deletion_file: None,
3809                last_updated_at_version_meta: None,
3810                created_at_version_meta: None,
3811            },
3812            Fragment {
3813                id: 2,
3814                physical_rows: Some(25), // Partial existing row IDs
3815                row_id_meta: Some(RowIdMeta::Inline(serialized)),
3816                files: vec![],
3817                deletion_file: None,
3818                last_updated_at_version_meta: None,
3819                created_at_version_meta: None,
3820            },
3821        ];
3822        let mut next_row_id = 1000;
3823
3824        Transaction::assign_row_ids(&mut next_row_id, &mut fragments).unwrap();
3825
3826        // Should advance by 30 (first fragment) + 5 (second fragment partial)
3827        assert_eq!(next_row_id, 1035);
3828
3829        // Check first fragment
3830        if let Some(RowIdMeta::Inline(data)) = &fragments[0].row_id_meta {
3831            let sequence = read_row_ids(data).unwrap();
3832            assert_eq!(sequence.len(), 30);
3833            let row_ids: Vec<u64> = sequence.iter().collect();
3834            assert_eq!(row_ids, (1000..1030).collect::<Vec<u64>>());
3835        } else {
3836            panic!("Expected inline row ID metadata for first fragment");
3837        }
3838
3839        // Check second fragment
3840        if let Some(RowIdMeta::Inline(data)) = &fragments[1].row_id_meta {
3841            let sequence = read_row_ids(data).unwrap();
3842            assert_eq!(sequence.len(), 25);
3843            let row_ids: Vec<u64> = sequence.iter().collect();
3844            // Should contain original 500-519 plus new 1030-1034
3845            let mut expected = (500..520).collect::<Vec<u64>>();
3846            expected.extend(1030..1035);
3847            assert_eq!(row_ids, expected);
3848        } else {
3849            panic!("Expected inline row ID metadata for second fragment");
3850        }
3851    }
3852
3853    #[test]
3854    fn test_assign_row_ids_missing_physical_rows() {
3855        // Test error case where fragment doesn't have physical_rows set
3856        let mut fragments = vec![Fragment {
3857            id: 1,
3858            physical_rows: None,
3859            row_id_meta: None,
3860            files: vec![],
3861            deletion_file: None,
3862            last_updated_at_version_meta: None,
3863            created_at_version_meta: None,
3864        }];
3865        let mut next_row_id = 0;
3866
3867        let result = Transaction::assign_row_ids(&mut next_row_id, &mut fragments);
3868
3869        assert!(result.is_err());
3870        if let Err(Error::Internal { message, .. }) = result {
3871            assert!(message.contains("Fragment does not have physical rows"));
3872        } else {
3873            panic!("Expected Internal error about missing physical rows");
3874        }
3875    }
3876
3877    // Helper functions for retain_relevant_indices tests
3878    fn create_test_index(
3879        name: &str,
3880        field_id: i32,
3881        dataset_version: u64,
3882        fragment_bitmap: Option<RoaringBitmap>,
3883        is_vector: bool,
3884    ) -> IndexMetadata {
3885        use prost_types::Any;
3886        use std::sync::Arc;
3887        use uuid::Uuid;
3888
3889        let index_details = if is_vector {
3890            Some(Arc::new(Any {
3891                type_url: "type.googleapis.com/lance.index.VectorIndexDetails".to_string(),
3892                value: vec![],
3893            }))
3894        } else {
3895            Some(Arc::new(Any {
3896                type_url: "type.googleapis.com/lance.index.ScalarIndexDetails".to_string(),
3897                value: vec![],
3898            }))
3899        };
3900
3901        IndexMetadata {
3902            uuid: Uuid::new_v4(),
3903            fields: vec![field_id],
3904            name: name.to_string(),
3905            dataset_version,
3906            fragment_bitmap,
3907            index_details,
3908            index_version: 1,
3909            created_at: None,
3910            base_id: None,
3911        }
3912    }
3913
3914    fn create_system_index(name: &str, field_id: i32) -> IndexMetadata {
3915        use prost_types::Any;
3916        use std::sync::Arc;
3917        use uuid::Uuid;
3918
3919        IndexMetadata {
3920            uuid: Uuid::new_v4(),
3921            fields: vec![field_id],
3922            name: name.to_string(),
3923            dataset_version: 1,
3924            fragment_bitmap: Some(RoaringBitmap::from_iter([1, 2])),
3925            index_details: Some(Arc::new(Any {
3926                type_url: "type.googleapis.com/lance.index.SystemIndexDetails".to_string(),
3927                value: vec![],
3928            })),
3929            index_version: 1,
3930            created_at: None,
3931            base_id: None,
3932        }
3933    }
3934
3935    fn create_test_schema(field_ids: &[i32]) -> Schema {
3936        use arrow_schema::{DataType, Field as ArrowField, Schema as ArrowSchema};
3937        use lance_core::datatypes::Schema as LanceSchema;
3938
3939        let fields: Vec<ArrowField> = field_ids
3940            .iter()
3941            .map(|id| ArrowField::new(format!("field_{}", id), DataType::Int32, false))
3942            .collect();
3943
3944        let arrow_schema = ArrowSchema::new(fields);
3945        let mut lance_schema = LanceSchema::try_from(&arrow_schema).unwrap();
3946
3947        // Assign field IDs
3948        for (i, field_id) in field_ids.iter().enumerate() {
3949            lance_schema.mut_field_by_id(i as i32).unwrap().id = *field_id;
3950        }
3951
3952        lance_schema
3953    }
3954
3955    #[test]
3956    fn test_retain_indices_removes_missing_fields() {
3957        let schema = create_test_schema(&[1, 2]);
3958        let fragments = vec![Fragment::new(1), Fragment::new(2)];
3959
3960        let mut indices = vec![
3961            create_test_index("idx1", 1, 1, Some(RoaringBitmap::from_iter([1])), false),
3962            create_test_index("idx2", 2, 1, Some(RoaringBitmap::from_iter([1])), false),
3963            create_test_index("idx3", 99, 1, Some(RoaringBitmap::from_iter([1])), false), // Field doesn't exist
3964        ];
3965
3966        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
3967
3968        assert_eq!(indices.len(), 2);
3969        assert!(indices.iter().all(|idx| idx.fields[0] != 99));
3970    }
3971
3972    #[test]
3973    fn test_retain_indices_keeps_system_indices() {
3974        use lance_index::mem_wal::MEM_WAL_INDEX_NAME;
3975
3976        let schema = create_test_schema(&[1, 2]);
3977        let fragments = vec![Fragment::new(1)];
3978
3979        let mut indices = vec![
3980            create_system_index(FRAG_REUSE_INDEX_NAME, 99), // Field doesn't exist but should be kept
3981            create_system_index(MEM_WAL_INDEX_NAME, 99), // Field doesn't exist but should be kept
3982            create_test_index("regular_idx", 99, 1, Some(RoaringBitmap::new()), false), // Should be removed
3983        ];
3984
3985        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
3986
3987        assert_eq!(indices.len(), 2);
3988        assert!(indices.iter().any(|idx| idx.name == FRAG_REUSE_INDEX_NAME));
3989        assert!(indices.iter().any(|idx| idx.name == MEM_WAL_INDEX_NAME));
3990    }
3991
3992    #[test]
3993    fn test_retain_indices_keeps_fragment_reuse_index() {
3994        let schema = create_test_schema(&[1]);
3995        let fragments = vec![Fragment::new(1)];
3996
3997        let mut indices = vec![
3998            create_system_index(FRAG_REUSE_INDEX_NAME, 1),
3999            create_test_index("other_idx", 1, 1, Some(RoaringBitmap::new()), false),
4000        ];
4001
4002        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4003
4004        // Fragment reuse index should always be kept
4005        assert!(indices.iter().any(|idx| idx.name == FRAG_REUSE_INDEX_NAME));
4006    }
4007
4008    #[test]
4009    fn test_retain_single_empty_scalar_index() {
4010        let schema = create_test_schema(&[1]);
4011        let fragments = vec![Fragment::new(1)];
4012
4013        let mut indices = vec![create_test_index(
4014            "scalar_idx",
4015            1,
4016            1,
4017            Some(RoaringBitmap::new()), // Empty bitmap
4018            false,
4019        )];
4020
4021        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4022
4023        // Single empty scalar index should be kept
4024        assert_eq!(indices.len(), 1);
4025    }
4026
4027    #[test]
4028    fn test_retain_single_empty_vector_index() {
4029        let schema = create_test_schema(&[1]);
4030        let fragments = vec![Fragment::new(1)];
4031
4032        let mut indices = vec![create_test_index(
4033            "vector_idx",
4034            1,
4035            1,
4036            Some(RoaringBitmap::new()), // Empty bitmap
4037            true,
4038        )];
4039
4040        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4041
4042        // Single empty vector index should be removed
4043        assert_eq!(indices.len(), 0);
4044    }
4045
4046    #[test]
4047    fn test_retain_single_nonempty_index() {
4048        let schema = create_test_schema(&[1]);
4049        let fragments = vec![Fragment::new(1)];
4050
4051        let mut scalar_indices = vec![create_test_index(
4052            "scalar_idx",
4053            1,
4054            1,
4055            Some(RoaringBitmap::from_iter([1])),
4056            false,
4057        )];
4058
4059        let mut vector_indices = vec![create_test_index(
4060            "vector_idx",
4061            1,
4062            1,
4063            Some(RoaringBitmap::from_iter([1])),
4064            true,
4065        )];
4066
4067        Transaction::retain_relevant_indices(&mut scalar_indices, &schema, &fragments);
4068        Transaction::retain_relevant_indices(&mut vector_indices, &schema, &fragments);
4069
4070        // Both should be kept
4071        assert_eq!(scalar_indices.len(), 1);
4072        assert_eq!(vector_indices.len(), 1);
4073    }
4074
4075    #[test]
4076    fn test_retain_single_index_with_none_bitmap() {
4077        let schema = create_test_schema(&[1]);
4078        let fragments = vec![Fragment::new(1)];
4079
4080        let mut scalar_indices = vec![create_test_index("scalar_idx", 1, 1, None, false)];
4081        let mut vector_indices = vec![create_test_index("vector_idx", 1, 1, None, true)];
4082
4083        Transaction::retain_relevant_indices(&mut scalar_indices, &schema, &fragments);
4084        Transaction::retain_relevant_indices(&mut vector_indices, &schema, &fragments);
4085
4086        // Scalar should be kept, vector should be removed
4087        assert_eq!(scalar_indices.len(), 1);
4088        assert_eq!(vector_indices.len(), 0);
4089    }
4090
4091    #[test]
4092    fn test_retain_multiple_empty_scalar_indices_keeps_oldest() {
4093        let schema = create_test_schema(&[1]);
4094        let fragments = vec![Fragment::new(1)];
4095
4096        let mut indices = vec![
4097            create_test_index("idx", 1, 3, Some(RoaringBitmap::new()), false),
4098            create_test_index("idx", 1, 1, Some(RoaringBitmap::new()), false), // Oldest
4099            create_test_index("idx", 1, 2, Some(RoaringBitmap::new()), false),
4100        ];
4101
4102        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4103
4104        // Should keep only the oldest (dataset_version = 1)
4105        assert_eq!(indices.len(), 1);
4106        assert_eq!(indices[0].dataset_version, 1);
4107    }
4108
4109    #[test]
4110    fn test_retain_multiple_empty_vector_indices_removes_all() {
4111        let schema = create_test_schema(&[1]);
4112        let fragments = vec![Fragment::new(1)];
4113
4114        let mut indices = vec![
4115            create_test_index("vec_idx", 1, 1, Some(RoaringBitmap::new()), true),
4116            create_test_index("vec_idx", 1, 2, Some(RoaringBitmap::new()), true),
4117            create_test_index("vec_idx", 1, 3, Some(RoaringBitmap::new()), true),
4118        ];
4119
4120        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4121
4122        // All empty vector indices should be removed
4123        assert_eq!(indices.len(), 0);
4124    }
4125
4126    #[test]
4127    fn test_retain_mixed_empty_nonempty_keeps_nonempty() {
4128        let schema = create_test_schema(&[1]);
4129        let fragments = vec![Fragment::new(1)];
4130
4131        let mut indices = vec![
4132            create_test_index("idx", 1, 1, Some(RoaringBitmap::new()), false), // Empty
4133            create_test_index("idx", 1, 2, Some(RoaringBitmap::from_iter([1])), false), // Non-empty
4134            create_test_index("idx", 1, 3, Some(RoaringBitmap::new()), false), // Empty
4135            create_test_index("idx", 1, 4, Some(RoaringBitmap::from_iter([1])), false), // Non-empty
4136        ];
4137
4138        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4139
4140        // Should keep only non-empty indices
4141        assert_eq!(indices.len(), 2);
4142        assert!(
4143            indices
4144                .iter()
4145                .all(|idx| idx.dataset_version == 2 || idx.dataset_version == 4)
4146        );
4147    }
4148
4149    #[test]
4150    fn test_retain_mixed_empty_nonempty_vector_keeps_nonempty() {
4151        let schema = create_test_schema(&[1]);
4152        let fragments = vec![Fragment::new(1)];
4153
4154        let mut indices = vec![
4155            create_test_index("vec_idx", 1, 1, Some(RoaringBitmap::new()), true), // Empty
4156            create_test_index("vec_idx", 1, 2, Some(RoaringBitmap::from_iter([1])), true), // Non-empty
4157            create_test_index("vec_idx", 1, 3, Some(RoaringBitmap::new()), true),          // Empty
4158        ];
4159
4160        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4161
4162        // Should keep only non-empty index
4163        assert_eq!(indices.len(), 1);
4164        assert_eq!(indices[0].dataset_version, 2);
4165    }
4166
4167    #[test]
4168    fn test_retain_fragment_bitmap_with_nonexistent_fragments() {
4169        let schema = create_test_schema(&[1]);
4170        let fragments = vec![Fragment::new(1), Fragment::new(2)]; // Only fragments 1 and 2 exist
4171
4172        let mut indices = vec![create_test_index(
4173            "idx",
4174            1,
4175            1,
4176            Some(RoaringBitmap::from_iter([1, 2, 3, 4])), // References non-existent fragments 3, 4
4177            false,
4178        )];
4179
4180        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4181
4182        // Should still keep the index (effective bitmap will be intersection with existing)
4183        assert_eq!(indices.len(), 1);
4184        // Original bitmap should be unchanged
4185        assert_eq!(
4186            indices[0].fragment_bitmap.as_ref().unwrap(),
4187            &RoaringBitmap::from_iter([1, 2, 3, 4])
4188        );
4189    }
4190
4191    #[test]
4192    fn test_retain_effective_empty_bitmap_single_index() {
4193        let schema = create_test_schema(&[1]);
4194        let fragments = vec![Fragment::new(5), Fragment::new(6)];
4195
4196        // Bitmap references fragments that don't exist, so effective bitmap is empty
4197        let mut scalar_indices = vec![create_test_index(
4198            "scalar_idx",
4199            1,
4200            1,
4201            Some(RoaringBitmap::from_iter([1, 2, 3])),
4202            false,
4203        )];
4204
4205        let mut vector_indices = vec![create_test_index(
4206            "vector_idx",
4207            1,
4208            1,
4209            Some(RoaringBitmap::from_iter([1, 2, 3])),
4210            true,
4211        )];
4212
4213        Transaction::retain_relevant_indices(&mut scalar_indices, &schema, &fragments);
4214        Transaction::retain_relevant_indices(&mut vector_indices, &schema, &fragments);
4215
4216        // Scalar should be kept (single index, even if effective bitmap is empty)
4217        // Vector should be removed (empty effective bitmap)
4218        assert_eq!(scalar_indices.len(), 1);
4219        assert_eq!(vector_indices.len(), 0);
4220    }
4221
4222    #[test]
4223    fn test_retain_different_index_names() {
4224        let schema = create_test_schema(&[1]);
4225        let fragments = vec![Fragment::new(1)];
4226
4227        let mut indices = vec![
4228            create_test_index("idx_a", 1, 1, Some(RoaringBitmap::new()), false),
4229            create_test_index("idx_b", 1, 1, Some(RoaringBitmap::new()), true),
4230            create_test_index("idx_c", 1, 1, Some(RoaringBitmap::from_iter([1])), false),
4231        ];
4232
4233        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4234
4235        // idx_a (empty scalar) should be kept, idx_b (empty vector) removed, idx_c (non-empty) kept
4236        assert_eq!(indices.len(), 2);
4237        assert!(indices.iter().any(|idx| idx.name == "idx_a"));
4238        assert!(indices.iter().any(|idx| idx.name == "idx_c"));
4239        assert!(!indices.iter().any(|idx| idx.name == "idx_b"));
4240    }
4241
4242    #[test]
4243    fn test_retain_empty_indices_vec() {
4244        let schema = create_test_schema(&[1]);
4245        let fragments = vec![Fragment::new(1)];
4246
4247        let mut indices: Vec<IndexMetadata> = vec![];
4248
4249        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4250
4251        assert_eq!(indices.len(), 0);
4252    }
4253
4254    #[test]
4255    fn test_retain_all_indices_removed() {
4256        let schema = create_test_schema(&[1]);
4257        let fragments = vec![Fragment::new(1)];
4258
4259        let mut indices = vec![
4260            create_test_index("vec1", 1, 1, Some(RoaringBitmap::new()), true),
4261            create_test_index("vec2", 1, 1, Some(RoaringBitmap::new()), true),
4262            create_test_index("idx3", 99, 1, Some(RoaringBitmap::from_iter([1])), false), // Bad field
4263        ];
4264
4265        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4266
4267        assert_eq!(indices.len(), 0);
4268    }
4269
4270    #[test]
4271    fn test_retain_complex_scenario() {
4272        let schema = create_test_schema(&[1, 2]);
4273        let fragments = vec![Fragment::new(1), Fragment::new(2)];
4274
4275        let mut indices = vec![
4276            // System index - should always be kept
4277            create_system_index(FRAG_REUSE_INDEX_NAME, 1),
4278            // Group "idx_a" - all empty scalars, keep oldest
4279            create_test_index("idx_a", 1, 3, Some(RoaringBitmap::new()), false),
4280            create_test_index("idx_a", 1, 1, Some(RoaringBitmap::new()), false), // Oldest
4281            create_test_index("idx_a", 1, 2, Some(RoaringBitmap::new()), false),
4282            // Group "vec_b" - all empty vectors, remove all
4283            create_test_index("vec_b", 1, 1, Some(RoaringBitmap::new()), true),
4284            create_test_index("vec_b", 1, 2, Some(RoaringBitmap::new()), true),
4285            // Group "idx_c" - mixed empty/non-empty, keep non-empty
4286            create_test_index("idx_c", 2, 1, Some(RoaringBitmap::new()), false),
4287            create_test_index("idx_c", 2, 2, Some(RoaringBitmap::from_iter([1])), false), // Keep
4288            create_test_index("idx_c", 2, 3, Some(RoaringBitmap::from_iter([2])), false), // Keep
4289            // Single non-empty - keep
4290            create_test_index("idx_d", 1, 1, Some(RoaringBitmap::from_iter([1, 2])), false),
4291            // Index with bad field - remove
4292            create_test_index("idx_e", 99, 1, Some(RoaringBitmap::from_iter([1])), false),
4293        ];
4294
4295        Transaction::retain_relevant_indices(&mut indices, &schema, &fragments);
4296
4297        // Expected: frag_reuse, idx_a (oldest), idx_c (2 non-empty), idx_d = 5 total
4298        assert_eq!(indices.len(), 5);
4299
4300        // Verify system index kept
4301        assert!(indices.iter().any(|idx| idx.name == FRAG_REUSE_INDEX_NAME));
4302
4303        // Verify idx_a kept oldest only
4304        let idx_a_indices: Vec<_> = indices.iter().filter(|idx| idx.name == "idx_a").collect();
4305        assert_eq!(idx_a_indices.len(), 1);
4306        assert_eq!(idx_a_indices[0].dataset_version, 1);
4307
4308        // Verify vec_b all removed
4309        assert!(!indices.iter().any(|idx| idx.name == "vec_b"));
4310
4311        // Verify idx_c kept non-empty only
4312        let idx_c_indices: Vec<_> = indices.iter().filter(|idx| idx.name == "idx_c").collect();
4313        assert_eq!(idx_c_indices.len(), 2);
4314        assert!(
4315            idx_c_indices
4316                .iter()
4317                .all(|idx| idx.dataset_version == 2 || idx.dataset_version == 3)
4318        );
4319
4320        // Verify idx_d kept
4321        assert!(indices.iter().any(|idx| idx.name == "idx_d"));
4322
4323        // Verify idx_e removed (bad field)
4324        assert!(!indices.iter().any(|idx| idx.name == "idx_e"));
4325    }
4326
4327    #[test]
4328    fn test_handle_rewrite_indices_skips_missing_index() {
4329        use uuid::Uuid;
4330
4331        // Create an empty indices list
4332        let mut indices = vec![];
4333
4334        // Create rewritten_indices referring to a non-existent index
4335        let rewritten_indices = vec![RewrittenIndex {
4336            old_id: Uuid::new_v4(),
4337            new_id: Uuid::new_v4(),
4338            new_index_details: prost_types::Any {
4339                type_url: String::new(),
4340                value: vec![],
4341            },
4342            new_index_version: 1,
4343        }];
4344
4345        // Should succeed (skip missing index) instead of error
4346        let result = Transaction::handle_rewrite_indices(&mut indices, &rewritten_indices, &[]);
4347        assert!(result.is_ok());
4348        assert!(indices.is_empty());
4349    }
4350}