lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
8use lance_file::previous::reader::FileReader as PreviousFileReader;
9use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::Schema;
23use lance_core::{Error, Result};
24use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
25use lance_io::utils::read_struct;
26use snafu::location;
27
28/// Manifest of a dataset
29///
30///  * Schema
31///  * Version
32///  * Fragments.
33///  * Indices.
34#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36    /// Dataset schema.
37    pub schema: Schema,
38
39    /// Dataset version
40    pub version: u64,
41
42    /// Branch name, None if the dataset is the main branch.
43    pub branch: Option<String>,
44
45    /// Version of the writer library that wrote this manifest.
46    pub writer_version: Option<WriterVersion>,
47
48    /// Fragments, the pieces to build the dataset.
49    ///
50    /// This list is stored in order, sorted by fragment id.  However, the fragment id
51    /// sequence may have gaps.
52    pub fragments: Arc<Vec<Fragment>>,
53
54    /// The file position of the version aux data.
55    pub version_aux_data: usize,
56
57    /// The file position of the index metadata.
58    pub index_section: Option<usize>,
59
60    /// The creation timestamp with nanosecond resolution as 128-bit integer
61    pub timestamp_nanos: u128,
62
63    /// An optional string tag for this version
64    pub tag: Option<String>,
65
66    /// The reader flags
67    pub reader_feature_flags: u64,
68
69    /// The writer flags
70    pub writer_feature_flags: u64,
71
72    /// The max fragment id used so far
73    /// None means never set, Some(0) means max ID used so far is 0
74    pub max_fragment_id: Option<u32>,
75
76    /// The path to the transaction file, relative to the root of the dataset
77    pub transaction_file: Option<String>,
78
79    /// The file position of the inline transaction content inside the manifest
80    pub transaction_section: Option<usize>,
81
82    /// Precomputed logic offset of each fragment
83    /// accelerating the fragment search using offset ranges.
84    fragment_offsets: Vec<usize>,
85
86    /// The max row id used so far.
87    pub next_row_id: u64,
88
89    /// The storage format of the data files.
90    pub data_storage_format: DataStorageFormat,
91
92    /// Table configuration.
93    pub config: HashMap<String, String>,
94
95    /// Table metadata.
96    ///
97    /// This is a key-value map that can be used to store arbitrary metadata
98    /// associated with the table. This is different than configuration, which
99    /// is used to tell libraries how to read, write, or manage the table.
100    pub table_metadata: HashMap<String, String>,
101
102    /* external base paths */
103    pub base_paths: HashMap<u32, BasePath>,
104}
105
106// We use the most significant bit to indicate that a transaction is detached
107pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
108
109pub fn is_detached_version(version: u64) -> bool {
110    version & DETACHED_VERSION_MASK != 0
111}
112
113fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
114    fragments
115        .iter()
116        .map(|f| f.num_rows().unwrap_or_default())
117        .chain([0]) // Make the last offset to be the full-length of the dataset.
118        .scan(0_usize, |offset, len| {
119            let start = *offset;
120            *offset += len;
121            Some(start)
122        })
123        .collect()
124}
125
126#[derive(Default)]
127pub struct ManifestSummary {
128    pub total_fragments: u64,
129    pub total_data_files: u64,
130    pub total_files_size: u64,
131    pub total_deletion_files: u64,
132    pub total_data_file_rows: u64,
133    pub total_deletion_file_rows: u64,
134    pub total_rows: u64,
135}
136
137impl From<ManifestSummary> for BTreeMap<String, String> {
138    fn from(summary: ManifestSummary) -> Self {
139        let mut stats_map = Self::new();
140        stats_map.insert(
141            "total_fragments".to_string(),
142            summary.total_fragments.to_string(),
143        );
144        stats_map.insert(
145            "total_data_files".to_string(),
146            summary.total_data_files.to_string(),
147        );
148        stats_map.insert(
149            "total_files_size".to_string(),
150            summary.total_files_size.to_string(),
151        );
152        stats_map.insert(
153            "total_deletion_files".to_string(),
154            summary.total_deletion_files.to_string(),
155        );
156        stats_map.insert(
157            "total_data_file_rows".to_string(),
158            summary.total_data_file_rows.to_string(),
159        );
160        stats_map.insert(
161            "total_deletion_file_rows".to_string(),
162            summary.total_deletion_file_rows.to_string(),
163        );
164        stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
165        stats_map
166    }
167}
168
169impl Manifest {
170    pub fn new(
171        schema: Schema,
172        fragments: Arc<Vec<Fragment>>,
173        data_storage_format: DataStorageFormat,
174        base_paths: HashMap<u32, BasePath>,
175    ) -> Self {
176        let fragment_offsets = compute_fragment_offsets(&fragments);
177
178        Self {
179            schema,
180            version: 1,
181            branch: None,
182            writer_version: Some(WriterVersion::default()),
183            fragments,
184            version_aux_data: 0,
185            index_section: None,
186            timestamp_nanos: 0,
187            tag: None,
188            reader_feature_flags: 0,
189            writer_feature_flags: 0,
190            max_fragment_id: None,
191            transaction_file: None,
192            transaction_section: None,
193            fragment_offsets,
194            next_row_id: 0,
195            data_storage_format,
196            config: HashMap::new(),
197            table_metadata: HashMap::new(),
198            base_paths,
199        }
200    }
201
202    pub fn new_from_previous(
203        previous: &Self,
204        schema: Schema,
205        fragments: Arc<Vec<Fragment>>,
206    ) -> Self {
207        let fragment_offsets = compute_fragment_offsets(&fragments);
208
209        Self {
210            schema,
211            version: previous.version + 1,
212            branch: previous.branch.clone(),
213            writer_version: Some(WriterVersion::default()),
214            fragments,
215            version_aux_data: 0,
216            index_section: None, // Caller should update index if they want to keep them.
217            timestamp_nanos: 0,  // This will be set on commit
218            tag: None,
219            reader_feature_flags: 0, // These will be set on commit
220            writer_feature_flags: 0, // These will be set on commit
221            max_fragment_id: previous.max_fragment_id,
222            transaction_file: None,
223            transaction_section: None,
224            fragment_offsets,
225            next_row_id: previous.next_row_id,
226            data_storage_format: previous.data_storage_format.clone(),
227            config: previous.config.clone(),
228            table_metadata: previous.table_metadata.clone(),
229            base_paths: previous.base_paths.clone(),
230        }
231    }
232
233    /// Performs a shallow_clone of the manifest entirely in memory without:
234    /// - Any persistent storage operations
235    /// - Modifications to the original data
236    /// - If the shallow clone is for branch, ref_name is the source branch
237    pub fn shallow_clone(
238        &self,
239        ref_name: Option<String>,
240        ref_path: String,
241        ref_base_id: u32,
242        branch_name: Option<String>,
243        transaction_file: String,
244    ) -> Self {
245        let cloned_fragments = self
246            .fragments
247            .as_ref()
248            .iter()
249            .map(|fragment| {
250                let mut cloned_fragment = fragment.clone();
251                for file in &mut cloned_fragment.files {
252                    if file.base_id.is_none() {
253                        file.base_id = Some(ref_base_id);
254                    }
255                }
256
257                if let Some(deletion) = &mut cloned_fragment.deletion_file {
258                    if deletion.base_id.is_none() {
259                        deletion.base_id = Some(ref_base_id);
260                    }
261                }
262                cloned_fragment
263            })
264            .collect::<Vec<_>>();
265
266        Self {
267            schema: self.schema.clone(),
268            version: self.version,
269            branch: branch_name,
270            writer_version: self.writer_version.clone(),
271            fragments: Arc::new(cloned_fragments),
272            version_aux_data: self.version_aux_data,
273            index_section: None, // These will be set on commit
274            timestamp_nanos: self.timestamp_nanos,
275            tag: None,
276            reader_feature_flags: 0, // These will be set on commit
277            writer_feature_flags: 0, // These will be set on commit
278            max_fragment_id: self.max_fragment_id,
279            transaction_file: Some(transaction_file),
280            transaction_section: None,
281            fragment_offsets: self.fragment_offsets.clone(),
282            next_row_id: self.next_row_id,
283            data_storage_format: self.data_storage_format.clone(),
284            config: self.config.clone(),
285            base_paths: {
286                let mut base_paths = self.base_paths.clone();
287                let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
288                base_paths.insert(ref_base_id, base_path);
289                base_paths
290            },
291            table_metadata: self.table_metadata.clone(),
292        }
293    }
294
295    /// Return the `timestamp_nanos` value as a Utc DateTime
296    pub fn timestamp(&self) -> DateTime<Utc> {
297        let nanos = self.timestamp_nanos % 1_000_000_000;
298        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
299        Utc.from_utc_datetime(
300            &DateTime::from_timestamp(seconds, nanos as u32)
301                .unwrap_or_default()
302                .naive_utc(),
303        )
304    }
305
306    /// Set the `timestamp_nanos` value from a Utc DateTime
307    pub fn set_timestamp(&mut self, nanos: u128) {
308        self.timestamp_nanos = nanos;
309    }
310
311    /// Get a mutable reference to the config
312    pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
313        &mut self.config
314    }
315
316    /// Get a mutable reference to the table metadata
317    pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
318        &mut self.table_metadata
319    }
320
321    /// Get a mutable reference to the schema metadata
322    pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
323        &mut self.schema.metadata
324    }
325
326    /// Get a mutable reference to the field metadata for a specific field id
327    ///
328    /// Returns None if the field does not exist in the schema.
329    pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
330        self.schema
331            .field_by_id_mut(field_id)
332            .map(|field| &mut field.metadata)
333    }
334
335    /// Set the `config` from an iterator
336    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
337    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
338        self.config.extend(upsert_values);
339    }
340
341    /// Delete `config` keys using a slice of keys
342    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
343    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
344        self.config
345            .retain(|key, _| !delete_keys.contains(&key.as_str()));
346    }
347
348    /// Replaces the schema metadata with the given key-value pairs.
349    #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
350    pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
351        self.schema.metadata = new_metadata;
352    }
353
354    /// Replaces the metadata of the field with the given id with the given key-value pairs.
355    ///
356    /// If the field does not exist in the schema, this is a no-op.
357    #[deprecated(
358        note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
359    )]
360    pub fn replace_field_metadata(
361        &mut self,
362        field_id: i32,
363        new_metadata: HashMap<String, String>,
364    ) -> Result<()> {
365        if let Some(field) = self.schema.field_by_id_mut(field_id) {
366            field.metadata = new_metadata;
367            Ok(())
368        } else {
369            Err(Error::invalid_input(
370                format!(
371                    "Field with id {} does not exist for replace_field_metadata",
372                    field_id
373                ),
374                location!(),
375            ))
376        }
377    }
378
379    /// Check the current fragment list and update the high water mark
380    pub fn update_max_fragment_id(&mut self) {
381        // If there are no fragments, don't update max_fragment_id
382        if self.fragments.is_empty() {
383            return;
384        }
385
386        let max_fragment_id = self
387            .fragments
388            .iter()
389            .map(|f| f.id)
390            .max()
391            .unwrap() // Safe because we checked fragments is not empty
392            .try_into()
393            .unwrap();
394
395        match self.max_fragment_id {
396            None => {
397                // First time being set
398                self.max_fragment_id = Some(max_fragment_id);
399            }
400            Some(current_max) => {
401                // Only update if the computed max is greater than current
402                // This preserves the high water mark even when fragments are deleted
403                if max_fragment_id > current_max {
404                    self.max_fragment_id = Some(max_fragment_id);
405                }
406            }
407        }
408    }
409
410    /// Return the max fragment id.
411    /// Note this does not support recycling of fragment ids.
412    ///
413    /// This will return None if there are no fragments and max_fragment_id was never set.
414    pub fn max_fragment_id(&self) -> Option<u64> {
415        if let Some(max_id) = self.max_fragment_id {
416            // Return the stored high water mark
417            Some(max_id.into())
418        } else {
419            // Not yet set, compute from fragment list
420            self.fragments.iter().map(|f| f.id).max()
421        }
422    }
423
424    /// Get the max used field id
425    ///
426    /// This is different than [Schema::max_field_id] because it also considers
427    /// the field ids in the data files that have been dropped from the schema.
428    pub fn max_field_id(&self) -> i32 {
429        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
430        let fragment_max_id = self
431            .fragments
432            .iter()
433            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
434            .max()
435            .copied();
436        let fragment_max_id = fragment_max_id.unwrap_or(-1);
437        schema_max_id.max(fragment_max_id)
438    }
439
440    /// Return the fragments that are newer than the given manifest.
441    /// Note this does not support recycling of fragment ids.
442    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
443        if since.version >= self.version {
444            return Err(Error::io(
445                format!(
446                    "fragments_since: given version {} is newer than manifest version {}",
447                    since.version, self.version
448                ),
449                location!(),
450            ));
451        }
452        let start = since.max_fragment_id();
453        Ok(self
454            .fragments
455            .iter()
456            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
457            .cloned()
458            .collect())
459    }
460
461    /// Find the fragments that contain the rows, identified by the offset range.
462    ///
463    /// Note that the offsets are the logical offsets of rows, not row IDs.
464    ///
465    ///
466    /// Parameters
467    /// ----------
468    /// range: Range<usize>
469    ///     Offset range
470    ///
471    /// Returns
472    /// -------
473    /// Vec<(usize, Fragment)>
474    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
475    ///
476    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
477        let start = range.start;
478        let end = range.end;
479        let idx = self
480            .fragment_offsets
481            .binary_search(&start)
482            .unwrap_or_else(|idx| idx - 1);
483
484        let mut fragments = vec![];
485        for i in idx..self.fragments.len() {
486            if self.fragment_offsets[i] >= end
487                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
488                    <= start
489            {
490                break;
491            }
492            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
493        }
494
495        fragments
496    }
497
498    /// Whether the dataset uses stable row ids.
499    pub fn uses_stable_row_ids(&self) -> bool {
500        self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
501    }
502
503    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
504    /// and can be used to create a dataset
505    pub fn serialized(&self) -> Vec<u8> {
506        let pb_manifest: pb::Manifest = self.into();
507        pb_manifest.encode_to_vec()
508    }
509
510    pub fn should_use_legacy_format(&self) -> bool {
511        self.data_storage_format.version == LEGACY_FORMAT_VERSION
512    }
513
514    /// Get the summary information of a manifest.
515    ///
516    /// This function calculates various statistics about the manifest, including:
517    /// - total_files_size: Total size of all data files in bytes
518    /// - total_fragments: Total number of fragments in the dataset
519    /// - total_data_files: Total number of data files across all fragments
520    /// - total_deletion_files: Total number of deletion files
521    /// - total_data_file_rows: Total number of rows in data files
522    /// - total_deletion_file_rows: Total number of deleted rows in deletion files
523    /// - total_rows: Total number of rows in the dataset
524    pub fn summary(&self) -> ManifestSummary {
525        // Calculate total fragments
526        let mut summary =
527            self.fragments
528                .iter()
529                .fold(ManifestSummary::default(), |mut summary, f| {
530                    // Count data files in the current fragment
531                    summary.total_data_files += f.files.len() as u64;
532                    // Sum the number of rows for the current fragment (if available)
533                    if let Some(num_rows) = f.num_rows() {
534                        summary.total_rows += num_rows as u64;
535                    }
536                    // Sum file sizes for all data files in the current fragment (if available)
537                    for data_file in &f.files {
538                        if let Some(size_bytes) = data_file.file_size_bytes.get() {
539                            summary.total_files_size += size_bytes.get();
540                        }
541                    }
542                    // Check and count if the current fragment has a deletion file
543                    if f.deletion_file.is_some() {
544                        summary.total_deletion_files += 1;
545                    }
546                    // Sum the number of deleted rows from the deletion file (if available)
547                    if let Some(deletion_file) = &f.deletion_file {
548                        if let Some(num_deleted) = deletion_file.num_deleted_rows {
549                            summary.total_deletion_file_rows += num_deleted as u64;
550                        }
551                    }
552                    summary
553                });
554        summary.total_fragments = self.fragments.len() as u64;
555        summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
556
557        summary
558    }
559}
560
561#[derive(Debug, Clone, PartialEq)]
562pub struct BasePath {
563    pub id: u32,
564    pub name: Option<String>,
565    pub is_dataset_root: bool,
566    /// The full URI string (e.g., "s3://bucket/path")
567    pub path: String,
568}
569
570impl BasePath {
571    /// Create a new BasePath
572    ///
573    /// # Arguments
574    ///
575    /// * `id` - Unique identifier for this base path
576    /// * `path` - Full URI string (e.g., "s3://bucket/path", "/local/path")
577    /// * `name` - Optional human-readable name for this base
578    /// * `is_dataset_root` - Whether this is the dataset root or a data-only base
579    pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
580        Self {
581            id,
582            name,
583            is_dataset_root,
584            path,
585        }
586    }
587
588    /// Extract the object store path from this BasePath's URI.
589    ///
590    /// This is a synchronous operation that parses the URI without initializing an object store.
591    pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
592        ObjectStore::extract_path_from_uri(registry, &self.path)
593    }
594}
595
596impl DeepSizeOf for BasePath {
597    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
598        self.name.deep_size_of_children(context)
599            + self.path.deep_size_of_children(context) * 2
600            + size_of::<bool>()
601    }
602}
603
604#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
605pub struct WriterVersion {
606    pub library: String,
607    pub version: String,
608    pub prerelease: Option<String>,
609    pub build_metadata: Option<String>,
610}
611
612#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
613pub struct DataStorageFormat {
614    pub file_format: String,
615    pub version: String,
616}
617
618const LANCE_FORMAT_NAME: &str = "lance";
619
620impl DataStorageFormat {
621    pub fn new(version: LanceFileVersion) -> Self {
622        Self {
623            file_format: LANCE_FORMAT_NAME.to_string(),
624            version: version.resolve().to_string(),
625        }
626    }
627
628    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
629        self.version.parse::<LanceFileVersion>()
630    }
631}
632
633impl Default for DataStorageFormat {
634    fn default() -> Self {
635        Self::new(LanceFileVersion::default())
636    }
637}
638
639impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
640    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
641        Self {
642            file_format: pb.file_format,
643            version: pb.version,
644        }
645    }
646}
647
648#[derive(Debug, Clone, Copy, PartialEq, Eq)]
649pub enum VersionPart {
650    Major,
651    Minor,
652    Patch,
653}
654
655fn bump_version(version: &mut semver::Version, part: VersionPart) {
656    match part {
657        VersionPart::Major => {
658            version.major += 1;
659            version.minor = 0;
660            version.patch = 0;
661        }
662        VersionPart::Minor => {
663            version.minor += 1;
664            version.patch = 0;
665        }
666        VersionPart::Patch => {
667            version.patch += 1;
668        }
669    }
670}
671
672impl WriterVersion {
673    /// Split a version string into clean version (major.minor.patch), prerelease, and build metadata.
674    ///
675    /// Returns None if the input is not a valid semver string.
676    ///
677    /// For example:
678    /// - "2.0.0-rc.1" -> Some(("2.0.0", Some("rc.1"), None))
679    /// - "2.0.0-rc.1+build.123" -> Some(("2.0.0", Some("rc.1"), Some("build.123")))
680    /// - "2.0.0+build.123" -> Some(("2.0.0", None, Some("build.123")))
681    /// - "not-a-version" -> None
682    fn split_version(full_version: &str) -> Option<(String, Option<String>, Option<String>)> {
683        let mut parsed = semver::Version::parse(full_version).ok()?;
684
685        let prerelease = if parsed.pre.is_empty() {
686            None
687        } else {
688            Some(parsed.pre.to_string())
689        };
690
691        let build_metadata = if parsed.build.is_empty() {
692            None
693        } else {
694            Some(parsed.build.to_string())
695        };
696
697        // Remove prerelease and build metadata to get clean version
698        parsed.pre = semver::Prerelease::EMPTY;
699        parsed.build = semver::BuildMetadata::EMPTY;
700        Some((parsed.to_string(), prerelease, build_metadata))
701    }
702
703    /// Try to parse the version string as a semver string. Returns None if
704    /// not successful.
705    #[deprecated(note = "Use `lance_lib_version()` instead")]
706    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
707        // First split by '-' to separate the version from the pre-release tag
708        let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
709            (
710                &self.version[..dash_idx],
711                Some(&self.version[dash_idx + 1..]),
712            )
713        } else {
714            (self.version.as_str(), None)
715        };
716
717        let mut parts = version_part.split('.');
718        let major = parts.next().unwrap_or("0").parse().ok()?;
719        let minor = parts.next().unwrap_or("0").parse().ok()?;
720        let patch = parts.next().unwrap_or("0").parse().ok()?;
721
722        Some((major, minor, patch, tag))
723    }
724
725    /// If the library is "lance", parse the version as semver and return it.
726    /// Returns None if the library is not "lance" or the version cannot be parsed as semver.
727    ///
728    /// This method reconstructs the full semantic version by combining the version field
729    /// with the prerelease and build_metadata fields (if present). For example:
730    /// - version="2.0.0" + prerelease=Some("rc.1") -> "2.0.0-rc.1"
731    /// - version="2.0.0" + prerelease=Some("rc.1") + build_metadata=Some("build.123") -> "2.0.0-rc.1+build.123"
732    pub fn lance_lib_version(&self) -> Option<semver::Version> {
733        if self.library != "lance" {
734            return None;
735        }
736
737        let mut version = semver::Version::parse(&self.version).ok()?;
738
739        if let Some(ref prerelease) = self.prerelease {
740            version.pre = semver::Prerelease::new(prerelease).ok()?;
741        }
742
743        if let Some(ref build_metadata) = self.build_metadata {
744            version.build = semver::BuildMetadata::new(build_metadata).ok()?;
745        }
746
747        Some(version)
748    }
749
750    #[deprecated(
751        note = "Use `lance_lib_version()` instead, which safely checks the library field and returns Option"
752    )]
753    #[allow(deprecated)]
754    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
755        self.semver()
756            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
757    }
758
759    /// Check if this is a Lance library version older than the given major/minor/patch.
760    ///
761    /// # Panics
762    ///
763    /// Panics if the library is not "lance" or the version cannot be parsed as semver.
764    #[deprecated(note = "Use `lance_lib_version()` and its `older_than` method instead.")]
765    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
766        let version = self
767            .lance_lib_version()
768            .expect("Not lance library or invalid version");
769        let other = semver::Version {
770            major: major.into(),
771            minor: minor.into(),
772            patch: patch.into(),
773            pre: semver::Prerelease::EMPTY,
774            build: semver::BuildMetadata::EMPTY,
775        };
776        version < other
777    }
778
779    #[deprecated(note = "This is meant for testing and will be made private in future version.")]
780    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
781        let mut version = self.lance_lib_version().expect("Should be lance version");
782        bump_version(&mut version, part);
783        if !keep_tag {
784            version.pre = semver::Prerelease::EMPTY;
785        }
786        let (clean_version, prerelease, build_metadata) = Self::split_version(&version.to_string())
787            .expect("Bumped version should be valid semver");
788        Self {
789            library: self.library.clone(),
790            version: clean_version,
791            prerelease,
792            build_metadata,
793        }
794    }
795}
796
797impl Default for WriterVersion {
798    #[cfg(not(test))]
799    fn default() -> Self {
800        let full_version = env!("CARGO_PKG_VERSION");
801        let (version, prerelease, build_metadata) =
802            Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
803        Self {
804            library: "lance".to_string(),
805            version,
806            prerelease,
807            build_metadata,
808        }
809    }
810
811    // Unit tests always run as if they are in the next version.
812    #[cfg(test)]
813    #[allow(deprecated)]
814    fn default() -> Self {
815        let full_version = env!("CARGO_PKG_VERSION");
816        let (version, prerelease, build_metadata) =
817            Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
818        Self {
819            library: "lance".to_string(),
820            version,
821            prerelease,
822            build_metadata,
823        }
824        .bump(VersionPart::Patch, true)
825    }
826}
827
828impl ProtoStruct for Manifest {
829    type Proto = pb::Manifest;
830}
831
832impl From<pb::BasePath> for BasePath {
833    fn from(p: pb::BasePath) -> Self {
834        Self::new(p.id, p.path, p.name, p.is_dataset_root)
835    }
836}
837
838impl From<BasePath> for pb::BasePath {
839    fn from(p: BasePath) -> Self {
840        Self {
841            id: p.id,
842            name: p.name,
843            is_dataset_root: p.is_dataset_root,
844            path: p.path,
845        }
846    }
847}
848
849impl TryFrom<pb::Manifest> for Manifest {
850    type Error = Error;
851
852    fn try_from(p: pb::Manifest) -> Result<Self> {
853        let timestamp_nanos = p.timestamp.map(|ts| {
854            let sec = ts.seconds as u128 * 1e9 as u128;
855            let nanos = ts.nanos as u128;
856            sec + nanos
857        });
858        // We only use the writer version if it is fully set.
859        let writer_version = match p.writer_version {
860            Some(pb::manifest::WriterVersion {
861                library,
862                version,
863                prerelease,
864                build_metadata,
865            }) => Some(WriterVersion {
866                library,
867                version,
868                prerelease,
869                build_metadata,
870            }),
871            _ => None,
872        };
873        let fragments = Arc::new(
874            p.fragments
875                .into_iter()
876                .map(Fragment::try_from)
877                .collect::<Result<Vec<_>>>()?,
878        );
879        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
880        let fields_with_meta = FieldsWithMeta {
881            fields: Fields(p.fields),
882            metadata: p.schema_metadata,
883        };
884
885        if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
886            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
887        {
888            return Err(Error::Internal {
889                message: "All fragments must have row ids".into(),
890                location: location!(),
891            });
892        }
893
894        let data_storage_format = match p.data_format {
895            None => {
896                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
897                    // If there are fragments, they are a better indicator
898                    DataStorageFormat::new(inferred_version)
899                } else {
900                    // No fragments to inspect, best we can do is look at writer flags
901                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
902                        DataStorageFormat::new(LanceFileVersion::Stable)
903                    } else {
904                        DataStorageFormat::new(LanceFileVersion::Legacy)
905                    }
906                }
907            }
908            Some(format) => DataStorageFormat::from(format),
909        };
910
911        let schema = Schema::from(fields_with_meta);
912
913        Ok(Self {
914            schema,
915            version: p.version,
916            branch: p.branch,
917            writer_version,
918            version_aux_data: p.version_aux_data as usize,
919            index_section: p.index_section.map(|i| i as usize),
920            timestamp_nanos: timestamp_nanos.unwrap_or(0),
921            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
922            reader_feature_flags: p.reader_feature_flags,
923            writer_feature_flags: p.writer_feature_flags,
924            max_fragment_id: p.max_fragment_id,
925            fragments,
926            transaction_file: if p.transaction_file.is_empty() {
927                None
928            } else {
929                Some(p.transaction_file)
930            },
931            transaction_section: p.transaction_section.map(|i| i as usize),
932            fragment_offsets,
933            next_row_id: p.next_row_id,
934            data_storage_format,
935            config: p.config,
936            table_metadata: p.table_metadata,
937            base_paths: p
938                .base_paths
939                .iter()
940                .map(|item| (item.id, item.clone().into()))
941                .collect(),
942        })
943    }
944}
945
946impl From<&Manifest> for pb::Manifest {
947    fn from(m: &Manifest) -> Self {
948        let timestamp_nanos = if m.timestamp_nanos == 0 {
949            None
950        } else {
951            let nanos = m.timestamp_nanos % 1e9 as u128;
952            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
953            Some(Timestamp {
954                seconds,
955                nanos: nanos as i32,
956            })
957        };
958        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
959        Self {
960            fields: fields_with_meta.fields.0,
961            schema_metadata: m
962                .schema
963                .metadata
964                .iter()
965                .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
966                .collect(),
967            version: m.version,
968            branch: m.branch.clone(),
969            writer_version: m
970                .writer_version
971                .as_ref()
972                .map(|wv| pb::manifest::WriterVersion {
973                    library: wv.library.clone(),
974                    version: wv.version.clone(),
975                    prerelease: wv.prerelease.clone(),
976                    build_metadata: wv.build_metadata.clone(),
977                }),
978            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
979            table_metadata: m.table_metadata.clone(),
980            version_aux_data: m.version_aux_data as u64,
981            index_section: m.index_section.map(|i| i as u64),
982            timestamp: timestamp_nanos,
983            tag: m.tag.clone().unwrap_or_default(),
984            reader_feature_flags: m.reader_feature_flags,
985            writer_feature_flags: m.writer_feature_flags,
986            max_fragment_id: m.max_fragment_id,
987            transaction_file: m.transaction_file.clone().unwrap_or_default(),
988            next_row_id: m.next_row_id,
989            data_format: Some(pb::manifest::DataStorageFormat {
990                file_format: m.data_storage_format.file_format.clone(),
991                version: m.data_storage_format.version.clone(),
992            }),
993            config: m.config.clone(),
994            base_paths: m
995                .base_paths
996                .values()
997                .map(|base_path| pb::BasePath {
998                    id: base_path.id,
999                    name: base_path.name.clone(),
1000                    is_dataset_root: base_path.is_dataset_root,
1001                    path: base_path.path.clone(),
1002                })
1003                .collect(),
1004            transaction_section: m.transaction_section.map(|i| i as u64),
1005        }
1006    }
1007}
1008
1009#[async_trait]
1010pub trait SelfDescribingFileReader {
1011    /// Open a file reader without any cached schema
1012    ///
1013    /// In this case the schema will first need to be loaded
1014    /// from the file itself.
1015    ///
1016    /// When loading files from a dataset it is preferable to use
1017    /// the fragment reader to avoid this overhead.
1018    async fn try_new_self_described(
1019        object_store: &ObjectStore,
1020        path: &Path,
1021        cache: Option<&LanceCache>,
1022    ) -> Result<Self>
1023    where
1024        Self: Sized,
1025    {
1026        let reader = object_store.open(path).await?;
1027        Self::try_new_self_described_from_reader(reader.into(), cache).await
1028    }
1029
1030    async fn try_new_self_described_from_reader(
1031        reader: Arc<dyn Reader>,
1032        cache: Option<&LanceCache>,
1033    ) -> Result<Self>
1034    where
1035        Self: Sized;
1036}
1037
1038#[async_trait]
1039impl SelfDescribingFileReader for PreviousFileReader {
1040    async fn try_new_self_described_from_reader(
1041        reader: Arc<dyn Reader>,
1042        cache: Option<&LanceCache>,
1043    ) -> Result<Self> {
1044        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
1045        let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
1046            message: format!(
1047                "Attempt to open file at {} as self-describing but it did not contain a manifest",
1048                reader.path(),
1049            ),
1050            location: location!(),
1051        })?;
1052        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
1053        if manifest.should_use_legacy_format() {
1054            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1055        }
1056        let schema = manifest.schema;
1057        let max_field_id = schema.max_field_id().unwrap_or_default();
1058        Self::try_new_from_reader(
1059            reader.path(),
1060            reader.clone(),
1061            Some(metadata),
1062            schema,
1063            0,
1064            0,
1065            max_field_id,
1066            cache,
1067        )
1068        .await
1069    }
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074    use crate::format::{DataFile, DeletionFile, DeletionFileType};
1075    use std::num::NonZero;
1076
1077    use super::*;
1078
1079    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
1080    use lance_core::datatypes::Field;
1081
1082    #[test]
1083    fn test_writer_version() {
1084        let wv = WriterVersion::default();
1085        assert_eq!(wv.library, "lance");
1086
1087        // Parse the actual cargo version to check if it has a pre-release tag
1088        let cargo_version = env!("CARGO_PKG_VERSION");
1089        let expected_tag = if cargo_version.contains('-') {
1090            Some(cargo_version.split('-').nth(1).unwrap())
1091        } else {
1092            None
1093        };
1094
1095        // Verify the version field contains only major.minor.patch
1096        let version_parts: Vec<&str> = wv.version.split('.').collect();
1097        assert_eq!(
1098            version_parts.len(),
1099            3,
1100            "Version should be major.minor.patch"
1101        );
1102        assert!(
1103            !wv.version.contains('-'),
1104            "Version field should not contain prerelease"
1105        );
1106
1107        // Verify the prerelease field matches the expected tag
1108        assert_eq!(wv.prerelease.as_deref(), expected_tag);
1109        // Build metadata should be None for default version
1110        assert_eq!(wv.build_metadata, None);
1111
1112        // Verify lance_lib_version() reconstructs the full semver correctly
1113        let version = wv.lance_lib_version().unwrap();
1114        assert_eq!(
1115            version.major,
1116            env!("CARGO_PKG_VERSION_MAJOR").parse::<u64>().unwrap()
1117        );
1118        assert_eq!(
1119            version.minor,
1120            env!("CARGO_PKG_VERSION_MINOR").parse::<u64>().unwrap()
1121        );
1122        assert_eq!(
1123            version.patch,
1124            // Unit tests run against (major,minor,patch + 1)
1125            env!("CARGO_PKG_VERSION_PATCH").parse::<u64>().unwrap() + 1
1126        );
1127        assert_eq!(version.pre.as_str(), expected_tag.unwrap_or(""));
1128
1129        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1130            let mut bumped_version = version.clone();
1131            bump_version(&mut bumped_version, *part);
1132            assert!(version < bumped_version);
1133        }
1134    }
1135
1136    #[test]
1137    fn test_writer_version_split() {
1138        // Test splitting version with prerelease
1139        let (version, prerelease, build_metadata) =
1140            WriterVersion::split_version("2.0.0-rc.1").unwrap();
1141        assert_eq!(version, "2.0.0");
1142        assert_eq!(prerelease, Some("rc.1".to_string()));
1143        assert_eq!(build_metadata, None);
1144
1145        // Test splitting version without prerelease
1146        let (version, prerelease, build_metadata) = WriterVersion::split_version("2.0.0").unwrap();
1147        assert_eq!(version, "2.0.0");
1148        assert_eq!(prerelease, None);
1149        assert_eq!(build_metadata, None);
1150
1151        // Test splitting version with prerelease and build metadata
1152        let (version, prerelease, build_metadata) =
1153            WriterVersion::split_version("2.0.0-rc.1+build.123").unwrap();
1154        assert_eq!(version, "2.0.0");
1155        assert_eq!(prerelease, Some("rc.1".to_string()));
1156        assert_eq!(build_metadata, Some("build.123".to_string()));
1157
1158        // Test splitting version with only build metadata
1159        let (version, prerelease, build_metadata) =
1160            WriterVersion::split_version("2.0.0+build.123").unwrap();
1161        assert_eq!(version, "2.0.0");
1162        assert_eq!(prerelease, None);
1163        assert_eq!(build_metadata, Some("build.123".to_string()));
1164
1165        // Test with invalid version returns None
1166        assert!(WriterVersion::split_version("not-a-version").is_none());
1167    }
1168
1169    #[test]
1170    fn test_writer_version_comparison_with_prerelease() {
1171        let v1 = WriterVersion {
1172            library: "lance".to_string(),
1173            version: "2.0.0".to_string(),
1174            prerelease: Some("rc.1".to_string()),
1175            build_metadata: None,
1176        };
1177
1178        let v2 = WriterVersion {
1179            library: "lance".to_string(),
1180            version: "2.0.0".to_string(),
1181            prerelease: None,
1182            build_metadata: None,
1183        };
1184
1185        let semver1 = v1.lance_lib_version().unwrap();
1186        let semver2 = v2.lance_lib_version().unwrap();
1187
1188        // rc.1 should be less than the release version
1189        assert!(semver1 < semver2);
1190    }
1191
1192    #[test]
1193    fn test_writer_version_with_build_metadata() {
1194        let v = WriterVersion {
1195            library: "lance".to_string(),
1196            version: "2.0.0".to_string(),
1197            prerelease: Some("rc.1".to_string()),
1198            build_metadata: Some("build.123".to_string()),
1199        };
1200
1201        let semver = v.lance_lib_version().unwrap();
1202        assert_eq!(semver.to_string(), "2.0.0-rc.1+build.123");
1203        assert_eq!(semver.major, 2);
1204        assert_eq!(semver.minor, 0);
1205        assert_eq!(semver.patch, 0);
1206        assert_eq!(semver.pre.as_str(), "rc.1");
1207        assert_eq!(semver.build.as_str(), "build.123");
1208    }
1209
1210    #[test]
1211    fn test_writer_version_non_semver() {
1212        // Test that Lance library can have non-semver version strings
1213        let v = WriterVersion {
1214            library: "lance".to_string(),
1215            version: "custom-build-v1".to_string(),
1216            prerelease: None,
1217            build_metadata: None,
1218        };
1219
1220        // lance_lib_version should return None for non-semver
1221        assert!(v.lance_lib_version().is_none());
1222
1223        // But the WriterVersion itself should still be valid and usable
1224        assert_eq!(v.library, "lance");
1225        assert_eq!(v.version, "custom-build-v1");
1226    }
1227
1228    #[test]
1229    #[allow(deprecated)]
1230    fn test_older_than_with_prerelease() {
1231        // Test that older_than correctly handles prerelease
1232        let v_rc = WriterVersion {
1233            library: "lance".to_string(),
1234            version: "2.0.0".to_string(),
1235            prerelease: Some("rc.1".to_string()),
1236            build_metadata: None,
1237        };
1238
1239        // 2.0.0-rc.1 should be older than 2.0.0
1240        assert!(v_rc.older_than(2, 0, 0));
1241
1242        // 2.0.0-rc.1 should be older than 2.0.1
1243        assert!(v_rc.older_than(2, 0, 1));
1244
1245        // 2.0.0-rc.1 should not be older than 1.9.9
1246        assert!(!v_rc.older_than(1, 9, 9));
1247
1248        let v_release = WriterVersion {
1249            library: "lance".to_string(),
1250            version: "2.0.0".to_string(),
1251            prerelease: None,
1252            build_metadata: None,
1253        };
1254
1255        // 2.0.0 should not be older than 2.0.0
1256        assert!(!v_release.older_than(2, 0, 0));
1257
1258        // 2.0.0 should be older than 2.0.1
1259        assert!(v_release.older_than(2, 0, 1));
1260    }
1261
1262    #[test]
1263    fn test_fragments_by_offset_range() {
1264        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1265            "a",
1266            arrow_schema::DataType::Int64,
1267            false,
1268        )]);
1269        let schema = Schema::try_from(&arrow_schema).unwrap();
1270        let fragments = vec![
1271            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1272            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1273            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1274        ];
1275        let manifest = Manifest::new(
1276            schema,
1277            Arc::new(fragments),
1278            DataStorageFormat::default(),
1279            HashMap::new(),
1280        );
1281
1282        let actual = manifest.fragments_by_offset_range(0..10);
1283        assert_eq!(actual.len(), 1);
1284        assert_eq!(actual[0].0, 0);
1285        assert_eq!(actual[0].1.id, 0);
1286
1287        let actual = manifest.fragments_by_offset_range(5..15);
1288        assert_eq!(actual.len(), 2);
1289        assert_eq!(actual[0].0, 0);
1290        assert_eq!(actual[0].1.id, 0);
1291        assert_eq!(actual[1].0, 10);
1292        assert_eq!(actual[1].1.id, 1);
1293
1294        let actual = manifest.fragments_by_offset_range(15..50);
1295        assert_eq!(actual.len(), 2);
1296        assert_eq!(actual[0].0, 10);
1297        assert_eq!(actual[0].1.id, 1);
1298        assert_eq!(actual[1].0, 25);
1299        assert_eq!(actual[1].1.id, 2);
1300
1301        // Out of range
1302        let actual = manifest.fragments_by_offset_range(45..100);
1303        assert!(actual.is_empty());
1304
1305        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1306    }
1307
1308    #[test]
1309    fn test_max_field_id() {
1310        // Validate that max field id handles varying field ids by fragment.
1311        let mut field0 =
1312            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1313        field0.set_id(-1, &mut 0);
1314        let mut field2 =
1315            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1316        field2.set_id(-1, &mut 2);
1317
1318        let schema = Schema {
1319            fields: vec![field0, field2],
1320            metadata: Default::default(),
1321        };
1322        let fragments = vec![
1323            Fragment {
1324                id: 0,
1325                files: vec![DataFile::new_legacy_from_fields(
1326                    "path1",
1327                    vec![0, 1, 2],
1328                    None,
1329                )],
1330                deletion_file: None,
1331                row_id_meta: None,
1332                physical_rows: None,
1333                created_at_version_meta: None,
1334                last_updated_at_version_meta: None,
1335            },
1336            Fragment {
1337                id: 1,
1338                files: vec![
1339                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1340                    DataFile::new_legacy_from_fields("path3", vec![2], None),
1341                ],
1342                deletion_file: None,
1343                row_id_meta: None,
1344                physical_rows: None,
1345                created_at_version_meta: None,
1346                last_updated_at_version_meta: None,
1347            },
1348        ];
1349
1350        let manifest = Manifest::new(
1351            schema,
1352            Arc::new(fragments),
1353            DataStorageFormat::default(),
1354            HashMap::new(),
1355        );
1356
1357        assert_eq!(manifest.max_field_id(), 43);
1358    }
1359
1360    #[test]
1361    fn test_config() {
1362        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1363            "a",
1364            arrow_schema::DataType::Int64,
1365            false,
1366        )]);
1367        let schema = Schema::try_from(&arrow_schema).unwrap();
1368        let fragments = vec![
1369            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1370            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1371            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1372        ];
1373        let mut manifest = Manifest::new(
1374            schema,
1375            Arc::new(fragments),
1376            DataStorageFormat::default(),
1377            HashMap::new(),
1378        );
1379
1380        let mut config = manifest.config.clone();
1381        config.insert("lance.test".to_string(), "value".to_string());
1382        config.insert("other-key".to_string(), "other-value".to_string());
1383
1384        manifest.config_mut().extend(config.clone());
1385        assert_eq!(manifest.config, config.clone());
1386
1387        config.remove("other-key");
1388        manifest.config_mut().remove("other-key");
1389        assert_eq!(manifest.config, config);
1390    }
1391
1392    #[test]
1393    fn test_manifest_summary() {
1394        // Step 1: test empty manifest summary
1395        let arrow_schema = ArrowSchema::new(vec![
1396            ArrowField::new("id", arrow_schema::DataType::Int64, false),
1397            ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1398        ]);
1399        let schema = Schema::try_from(&arrow_schema).unwrap();
1400
1401        let empty_manifest = Manifest::new(
1402            schema.clone(),
1403            Arc::new(vec![]),
1404            DataStorageFormat::default(),
1405            HashMap::new(),
1406        );
1407
1408        let empty_summary = empty_manifest.summary();
1409        assert_eq!(empty_summary.total_rows, 0);
1410        assert_eq!(empty_summary.total_files_size, 0);
1411        assert_eq!(empty_summary.total_fragments, 0);
1412        assert_eq!(empty_summary.total_data_files, 0);
1413        assert_eq!(empty_summary.total_deletion_file_rows, 0);
1414        assert_eq!(empty_summary.total_data_file_rows, 0);
1415        assert_eq!(empty_summary.total_deletion_files, 0);
1416
1417        // Step 2: write empty files and verify summary
1418        let empty_fragments = vec![
1419            Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1420            Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1421        ];
1422
1423        let empty_files_manifest = Manifest::new(
1424            schema.clone(),
1425            Arc::new(empty_fragments),
1426            DataStorageFormat::default(),
1427            HashMap::new(),
1428        );
1429
1430        let empty_files_summary = empty_files_manifest.summary();
1431        assert_eq!(empty_files_summary.total_rows, 0);
1432        assert_eq!(empty_files_summary.total_files_size, 0);
1433        assert_eq!(empty_files_summary.total_fragments, 2);
1434        assert_eq!(empty_files_summary.total_data_files, 2);
1435        assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1436        assert_eq!(empty_files_summary.total_data_file_rows, 0);
1437        assert_eq!(empty_files_summary.total_deletion_files, 0);
1438
1439        // Step 3: write real data and verify summary
1440        let real_fragments = vec![
1441            Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1442            Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1443            Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1444        ];
1445
1446        let real_data_manifest = Manifest::new(
1447            schema.clone(),
1448            Arc::new(real_fragments),
1449            DataStorageFormat::default(),
1450            HashMap::new(),
1451        );
1452
1453        let real_data_summary = real_data_manifest.summary();
1454        assert_eq!(real_data_summary.total_rows, 425); // 100 + 250 + 75
1455        assert_eq!(real_data_summary.total_files_size, 0); // Zero for unknown
1456        assert_eq!(real_data_summary.total_fragments, 3);
1457        assert_eq!(real_data_summary.total_data_files, 3);
1458        assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1459        assert_eq!(real_data_summary.total_data_file_rows, 425);
1460        assert_eq!(real_data_summary.total_deletion_files, 0);
1461
1462        let file_version = LanceFileVersion::default();
1463        // Step 4: write deletion files and verify summary
1464        let mut fragment_with_deletion = Fragment::new(0)
1465            .with_file(
1466                "data_with_deletion.lance",
1467                vec![0, 1],
1468                vec![0, 1],
1469                &file_version,
1470                NonZero::new(1000),
1471            )
1472            .with_physical_rows(50);
1473        fragment_with_deletion.deletion_file = Some(DeletionFile {
1474            read_version: 123,
1475            id: 456,
1476            file_type: DeletionFileType::Array,
1477            num_deleted_rows: Some(10),
1478            base_id: None,
1479        });
1480
1481        let manifest_with_deletion = Manifest::new(
1482            schema,
1483            Arc::new(vec![fragment_with_deletion]),
1484            DataStorageFormat::default(),
1485            HashMap::new(),
1486        );
1487
1488        let deletion_summary = manifest_with_deletion.summary();
1489        assert_eq!(deletion_summary.total_rows, 40); // 50 - 10
1490        assert_eq!(deletion_summary.total_files_size, 1000);
1491        assert_eq!(deletion_summary.total_fragments, 1);
1492        assert_eq!(deletion_summary.total_data_files, 1);
1493        assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1494        assert_eq!(deletion_summary.total_data_file_rows, 50);
1495        assert_eq!(deletion_summary.total_deletion_files, 1);
1496
1497        //Just verify the transformation is OK
1498        let stats_map: BTreeMap<String, String> = deletion_summary.into();
1499        assert_eq!(stats_map.len(), 7)
1500    }
1501}