Skip to main content

lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{Fields, FieldsWithMeta, populate_schema_dictionary};
8use lance_file::previous::reader::FileReader as PreviousFileReader;
9use lance_file::version::{LEGACY_FORMAT_VERSION, LanceFileVersion};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{FLAG_STABLE_ROW_IDS, has_deprecated_v2_feature_flag};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::Schema;
23use lance_core::{Error, Result};
24use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
25use lance_io::utils::read_struct;
26
27/// Manifest of a dataset
28///
29///  * Schema
30///  * Version
31///  * Fragments.
32///  * Indices.
33#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
34pub struct Manifest {
35    /// Dataset schema.
36    pub schema: Schema,
37
38    /// Dataset version
39    pub version: u64,
40
41    /// Branch name, None if the dataset is the main branch.
42    pub branch: Option<String>,
43
44    /// Version of the writer library that wrote this manifest.
45    pub writer_version: Option<WriterVersion>,
46
47    /// Fragments, the pieces to build the dataset.
48    ///
49    /// This list is stored in order, sorted by fragment id.  However, the fragment id
50    /// sequence may have gaps.
51    pub fragments: Arc<Vec<Fragment>>,
52
53    /// The file position of the version aux data.
54    pub version_aux_data: usize,
55
56    /// The file position of the index metadata.
57    pub index_section: Option<usize>,
58
59    /// The creation timestamp with nanosecond resolution as 128-bit integer
60    pub timestamp_nanos: u128,
61
62    /// An optional string tag for this version
63    pub tag: Option<String>,
64
65    /// The reader flags
66    pub reader_feature_flags: u64,
67
68    /// The writer flags
69    pub writer_feature_flags: u64,
70
71    /// The max fragment id used so far
72    /// None means never set, Some(0) means max ID used so far is 0
73    pub max_fragment_id: Option<u32>,
74
75    /// The path to the transaction file, relative to the root of the dataset
76    pub transaction_file: Option<String>,
77
78    /// The file position of the inline transaction content inside the manifest
79    pub transaction_section: Option<usize>,
80
81    /// Precomputed logic offset of each fragment
82    /// accelerating the fragment search using offset ranges.
83    fragment_offsets: Vec<usize>,
84
85    /// The max row id used so far.
86    pub next_row_id: u64,
87
88    /// The storage format of the data files.
89    pub data_storage_format: DataStorageFormat,
90
91    /// Table configuration.
92    pub config: HashMap<String, String>,
93
94    /// Table metadata.
95    ///
96    /// This is a key-value map that can be used to store arbitrary metadata
97    /// associated with the table. This is different than configuration, which
98    /// is used to tell libraries how to read, write, or manage the table.
99    pub table_metadata: HashMap<String, String>,
100
101    /* external base paths */
102    pub base_paths: HashMap<u32, BasePath>,
103}
104
105// We use the most significant bit to indicate that a transaction is detached
106pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
107
108pub fn is_detached_version(version: u64) -> bool {
109    version & DETACHED_VERSION_MASK != 0
110}
111
112fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
113    fragments
114        .iter()
115        .map(|f| f.num_rows().unwrap_or_default())
116        .chain([0]) // Make the last offset to be the full-length of the dataset.
117        .scan(0_usize, |offset, len| {
118            let start = *offset;
119            *offset += len;
120            Some(start)
121        })
122        .collect()
123}
124
125#[derive(Default)]
126pub struct ManifestSummary {
127    pub total_fragments: u64,
128    pub total_data_files: u64,
129    pub total_files_size: u64,
130    pub total_deletion_files: u64,
131    pub total_data_file_rows: u64,
132    pub total_deletion_file_rows: u64,
133    pub total_rows: u64,
134}
135
136impl From<ManifestSummary> for BTreeMap<String, String> {
137    fn from(summary: ManifestSummary) -> Self {
138        let mut stats_map = Self::new();
139        stats_map.insert(
140            "total_fragments".to_string(),
141            summary.total_fragments.to_string(),
142        );
143        stats_map.insert(
144            "total_data_files".to_string(),
145            summary.total_data_files.to_string(),
146        );
147        stats_map.insert(
148            "total_files_size".to_string(),
149            summary.total_files_size.to_string(),
150        );
151        stats_map.insert(
152            "total_deletion_files".to_string(),
153            summary.total_deletion_files.to_string(),
154        );
155        stats_map.insert(
156            "total_data_file_rows".to_string(),
157            summary.total_data_file_rows.to_string(),
158        );
159        stats_map.insert(
160            "total_deletion_file_rows".to_string(),
161            summary.total_deletion_file_rows.to_string(),
162        );
163        stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
164        stats_map
165    }
166}
167
168impl Manifest {
169    pub fn new(
170        schema: Schema,
171        fragments: Arc<Vec<Fragment>>,
172        data_storage_format: DataStorageFormat,
173        base_paths: HashMap<u32, BasePath>,
174    ) -> Self {
175        let fragment_offsets = compute_fragment_offsets(&fragments);
176
177        Self {
178            schema,
179            version: 1,
180            branch: None,
181            writer_version: Some(WriterVersion::default()),
182            fragments,
183            version_aux_data: 0,
184            index_section: None,
185            timestamp_nanos: 0,
186            tag: None,
187            reader_feature_flags: 0,
188            writer_feature_flags: 0,
189            max_fragment_id: None,
190            transaction_file: None,
191            transaction_section: None,
192            fragment_offsets,
193            next_row_id: 0,
194            data_storage_format,
195            config: HashMap::new(),
196            table_metadata: HashMap::new(),
197            base_paths,
198        }
199    }
200
201    pub fn new_from_previous(
202        previous: &Self,
203        schema: Schema,
204        fragments: Arc<Vec<Fragment>>,
205    ) -> Self {
206        let fragment_offsets = compute_fragment_offsets(&fragments);
207
208        Self {
209            schema,
210            version: previous.version + 1,
211            branch: previous.branch.clone(),
212            writer_version: Some(WriterVersion::default()),
213            fragments,
214            version_aux_data: 0,
215            index_section: None, // Caller should update index if they want to keep them.
216            timestamp_nanos: 0,  // This will be set on commit
217            tag: None,
218            reader_feature_flags: 0, // These will be set on commit
219            writer_feature_flags: 0, // These will be set on commit
220            max_fragment_id: previous.max_fragment_id,
221            transaction_file: None,
222            transaction_section: None,
223            fragment_offsets,
224            next_row_id: previous.next_row_id,
225            data_storage_format: previous.data_storage_format.clone(),
226            config: previous.config.clone(),
227            table_metadata: previous.table_metadata.clone(),
228            base_paths: previous.base_paths.clone(),
229        }
230    }
231
232    /// Performs a shallow_clone of the manifest entirely in memory without:
233    /// - Any persistent storage operations
234    /// - Modifications to the original data
235    /// - If the shallow clone is for branch, ref_name is the source branch
236    pub fn shallow_clone(
237        &self,
238        ref_name: Option<String>,
239        ref_path: String,
240        ref_base_id: u32,
241        branch_name: Option<String>,
242        transaction_file: String,
243    ) -> Self {
244        let cloned_fragments = self
245            .fragments
246            .as_ref()
247            .iter()
248            .map(|fragment| {
249                let mut cloned_fragment = fragment.clone();
250                for file in &mut cloned_fragment.files {
251                    if file.base_id.is_none() {
252                        file.base_id = Some(ref_base_id);
253                    }
254                }
255
256                if let Some(deletion) = &mut cloned_fragment.deletion_file
257                    && deletion.base_id.is_none()
258                {
259                    deletion.base_id = Some(ref_base_id);
260                }
261                cloned_fragment
262            })
263            .collect::<Vec<_>>();
264
265        Self {
266            schema: self.schema.clone(),
267            version: self.version,
268            branch: branch_name,
269            writer_version: self.writer_version.clone(),
270            fragments: Arc::new(cloned_fragments),
271            version_aux_data: self.version_aux_data,
272            index_section: None, // These will be set on commit
273            timestamp_nanos: self.timestamp_nanos,
274            tag: None,
275            reader_feature_flags: 0, // These will be set on commit
276            writer_feature_flags: 0, // These will be set on commit
277            max_fragment_id: self.max_fragment_id,
278            transaction_file: Some(transaction_file),
279            transaction_section: None,
280            fragment_offsets: self.fragment_offsets.clone(),
281            next_row_id: self.next_row_id,
282            data_storage_format: self.data_storage_format.clone(),
283            config: self.config.clone(),
284            base_paths: {
285                let mut base_paths = self.base_paths.clone();
286                let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
287                base_paths.insert(ref_base_id, base_path);
288                base_paths
289            },
290            table_metadata: self.table_metadata.clone(),
291        }
292    }
293
294    /// Return the `timestamp_nanos` value as a Utc DateTime
295    pub fn timestamp(&self) -> DateTime<Utc> {
296        let nanos = self.timestamp_nanos % 1_000_000_000;
297        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
298        Utc.from_utc_datetime(
299            &DateTime::from_timestamp(seconds, nanos as u32)
300                .unwrap_or_default()
301                .naive_utc(),
302        )
303    }
304
305    /// Set the `timestamp_nanos` value from a Utc DateTime
306    pub fn set_timestamp(&mut self, nanos: u128) {
307        self.timestamp_nanos = nanos;
308    }
309
310    /// Get a mutable reference to the config
311    pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
312        &mut self.config
313    }
314
315    /// Get a mutable reference to the table metadata
316    pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
317        &mut self.table_metadata
318    }
319
320    /// Get a mutable reference to the schema metadata
321    pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
322        &mut self.schema.metadata
323    }
324
325    /// Get a mutable reference to the field metadata for a specific field id
326    ///
327    /// Returns None if the field does not exist in the schema.
328    pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
329        self.schema
330            .field_by_id_mut(field_id)
331            .map(|field| &mut field.metadata)
332    }
333
334    /// Set the `config` from an iterator
335    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
336    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
337        self.config.extend(upsert_values);
338    }
339
340    /// Delete `config` keys using a slice of keys
341    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
342    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
343        self.config
344            .retain(|key, _| !delete_keys.contains(&key.as_str()));
345    }
346
347    /// Replaces the schema metadata with the given key-value pairs.
348    #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
349    pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
350        self.schema.metadata = new_metadata;
351    }
352
353    /// Replaces the metadata of the field with the given id with the given key-value pairs.
354    ///
355    /// If the field does not exist in the schema, this is a no-op.
356    #[deprecated(
357        note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
358    )]
359    pub fn replace_field_metadata(
360        &mut self,
361        field_id: i32,
362        new_metadata: HashMap<String, String>,
363    ) -> Result<()> {
364        if let Some(field) = self.schema.field_by_id_mut(field_id) {
365            field.metadata = new_metadata;
366            Ok(())
367        } else {
368            Err(Error::invalid_input(format!(
369                "Field with id {} does not exist for replace_field_metadata",
370                field_id
371            )))
372        }
373    }
374
375    /// Check the current fragment list and update the high water mark
376    pub fn update_max_fragment_id(&mut self) {
377        // If there are no fragments, don't update max_fragment_id
378        if self.fragments.is_empty() {
379            return;
380        }
381
382        let max_fragment_id = self
383            .fragments
384            .iter()
385            .map(|f| f.id)
386            .max()
387            .unwrap() // Safe because we checked fragments is not empty
388            .try_into()
389            .unwrap();
390
391        match self.max_fragment_id {
392            None => {
393                // First time being set
394                self.max_fragment_id = Some(max_fragment_id);
395            }
396            Some(current_max) => {
397                // Only update if the computed max is greater than current
398                // This preserves the high water mark even when fragments are deleted
399                if max_fragment_id > current_max {
400                    self.max_fragment_id = Some(max_fragment_id);
401                }
402            }
403        }
404    }
405
406    /// Return the max fragment id.
407    /// Note this does not support recycling of fragment ids.
408    ///
409    /// This will return None if there are no fragments and max_fragment_id was never set.
410    pub fn max_fragment_id(&self) -> Option<u64> {
411        if let Some(max_id) = self.max_fragment_id {
412            // Return the stored high water mark
413            Some(max_id.into())
414        } else {
415            // Not yet set, compute from fragment list
416            self.fragments.iter().map(|f| f.id).max()
417        }
418    }
419
420    /// Get the max used field id
421    ///
422    /// This is different than [Schema::max_field_id] because it also considers
423    /// the field ids in the data files that have been dropped from the schema.
424    pub fn max_field_id(&self) -> i32 {
425        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
426        let fragment_max_id = self
427            .fragments
428            .iter()
429            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
430            .max()
431            .copied();
432        let fragment_max_id = fragment_max_id.unwrap_or(-1);
433        schema_max_id.max(fragment_max_id)
434    }
435
436    /// Return the fragments that are newer than the given manifest.
437    /// Note this does not support recycling of fragment ids.
438    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
439        if since.version >= self.version {
440            return Err(Error::invalid_input(format!(
441                "fragments_since: given version {} is newer than manifest version {}",
442                since.version, self.version
443            )));
444        }
445        let start = since.max_fragment_id();
446        Ok(self
447            .fragments
448            .iter()
449            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
450            .cloned()
451            .collect())
452    }
453
454    /// Find the fragments that contain the rows, identified by the offset range.
455    ///
456    /// Note that the offsets are the logical offsets of rows, not row IDs.
457    ///
458    ///
459    /// Parameters
460    /// ----------
461    /// range: `Range<usize>`
462    ///     Offset range
463    ///
464    /// Returns
465    /// -------
466    /// Vec<(usize, Fragment)>
467    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
468    ///
469    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
470        let start = range.start;
471        let end = range.end;
472        let idx = self
473            .fragment_offsets
474            .binary_search(&start)
475            .unwrap_or_else(|idx| idx - 1);
476
477        let mut fragments = vec![];
478        for i in idx..self.fragments.len() {
479            if self.fragment_offsets[i] >= end
480                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
481                    <= start
482            {
483                break;
484            }
485            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
486        }
487
488        fragments
489    }
490
491    /// Whether the dataset uses stable row ids.
492    pub fn uses_stable_row_ids(&self) -> bool {
493        self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
494    }
495
496    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
497    /// and can be used to create a dataset
498    pub fn serialized(&self) -> Vec<u8> {
499        let pb_manifest: pb::Manifest = self.into();
500        pb_manifest.encode_to_vec()
501    }
502
503    pub fn should_use_legacy_format(&self) -> bool {
504        self.data_storage_format.version == LEGACY_FORMAT_VERSION
505    }
506
507    /// Get the summary information of a manifest.
508    ///
509    /// This function calculates various statistics about the manifest, including:
510    /// - total_files_size: Total size of all data files in bytes
511    /// - total_fragments: Total number of fragments in the dataset
512    /// - total_data_files: Total number of data files across all fragments
513    /// - total_deletion_files: Total number of deletion files
514    /// - total_data_file_rows: Total number of rows in data files
515    /// - total_deletion_file_rows: Total number of deleted rows in deletion files
516    /// - total_rows: Total number of rows in the dataset
517    pub fn summary(&self) -> ManifestSummary {
518        // Calculate total fragments
519        let mut summary =
520            self.fragments
521                .iter()
522                .fold(ManifestSummary::default(), |mut summary, f| {
523                    // Count data files in the current fragment
524                    summary.total_data_files += f.files.len() as u64;
525                    // Sum the number of rows for the current fragment (if available)
526                    if let Some(num_rows) = f.num_rows() {
527                        summary.total_rows += num_rows as u64;
528                    }
529                    // Sum file sizes for all data files in the current fragment (if available)
530                    for data_file in &f.files {
531                        if let Some(size_bytes) = data_file.file_size_bytes.get() {
532                            summary.total_files_size += size_bytes.get();
533                        }
534                    }
535                    // Check and count if the current fragment has a deletion file
536                    if f.deletion_file.is_some() {
537                        summary.total_deletion_files += 1;
538                    }
539                    // Sum the number of deleted rows from the deletion file (if available)
540                    if let Some(deletion_file) = &f.deletion_file
541                        && let Some(num_deleted) = deletion_file.num_deleted_rows
542                    {
543                        summary.total_deletion_file_rows += num_deleted as u64;
544                    }
545                    summary
546                });
547        summary.total_fragments = self.fragments.len() as u64;
548        summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
549
550        summary
551    }
552}
553
554#[derive(Debug, Clone, PartialEq)]
555pub struct BasePath {
556    pub id: u32,
557    pub name: Option<String>,
558    pub is_dataset_root: bool,
559    /// The full URI string (e.g., "s3://bucket/path")
560    pub path: String,
561}
562
563impl BasePath {
564    /// Create a new BasePath
565    ///
566    /// # Arguments
567    ///
568    /// * `id` - Unique identifier for this base path
569    /// * `path` - Full URI string (e.g., "s3://bucket/path", "/local/path")
570    /// * `name` - Optional human-readable name for this base
571    /// * `is_dataset_root` - Whether this is the dataset root or a data-only base
572    pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
573        Self {
574            id,
575            name,
576            is_dataset_root,
577            path,
578        }
579    }
580
581    /// Extract the object store path from this BasePath's URI.
582    ///
583    /// This is a synchronous operation that parses the URI without initializing an object store.
584    pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
585        ObjectStore::extract_path_from_uri(registry, &self.path)
586    }
587}
588
589impl DeepSizeOf for BasePath {
590    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
591        self.name.deep_size_of_children(context)
592            + self.path.deep_size_of_children(context) * 2
593            + size_of::<bool>()
594    }
595}
596
597#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
598pub struct WriterVersion {
599    pub library: String,
600    pub version: String,
601    pub prerelease: Option<String>,
602    pub build_metadata: Option<String>,
603}
604
605#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
606pub struct DataStorageFormat {
607    pub file_format: String,
608    pub version: String,
609}
610
611const LANCE_FORMAT_NAME: &str = "lance";
612
613impl DataStorageFormat {
614    pub fn new(version: LanceFileVersion) -> Self {
615        Self {
616            file_format: LANCE_FORMAT_NAME.to_string(),
617            version: version.resolve().to_string(),
618        }
619    }
620
621    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
622        self.version.parse::<LanceFileVersion>()
623    }
624}
625
626impl Default for DataStorageFormat {
627    fn default() -> Self {
628        Self::new(LanceFileVersion::default())
629    }
630}
631
632impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
633    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
634        Self {
635            file_format: pb.file_format,
636            version: pb.version,
637        }
638    }
639}
640
641#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642pub enum VersionPart {
643    Major,
644    Minor,
645    Patch,
646}
647
648fn bump_version(version: &mut semver::Version, part: VersionPart) {
649    match part {
650        VersionPart::Major => {
651            version.major += 1;
652            version.minor = 0;
653            version.patch = 0;
654        }
655        VersionPart::Minor => {
656            version.minor += 1;
657            version.patch = 0;
658        }
659        VersionPart::Patch => {
660            version.patch += 1;
661        }
662    }
663}
664
665impl WriterVersion {
666    /// Split a version string into clean version (major.minor.patch), prerelease, and build metadata.
667    ///
668    /// Returns None if the input is not a valid semver string.
669    ///
670    /// For example:
671    /// - "2.0.0-rc.1" -> Some(("2.0.0", Some("rc.1"), None))
672    /// - "2.0.0-rc.1+build.123" -> Some(("2.0.0", Some("rc.1"), Some("build.123")))
673    /// - "2.0.0+build.123" -> Some(("2.0.0", None, Some("build.123")))
674    /// - "not-a-version" -> None
675    fn split_version(full_version: &str) -> Option<(String, Option<String>, Option<String>)> {
676        let mut parsed = semver::Version::parse(full_version).ok()?;
677
678        let prerelease = if parsed.pre.is_empty() {
679            None
680        } else {
681            Some(parsed.pre.to_string())
682        };
683
684        let build_metadata = if parsed.build.is_empty() {
685            None
686        } else {
687            Some(parsed.build.to_string())
688        };
689
690        // Remove prerelease and build metadata to get clean version
691        parsed.pre = semver::Prerelease::EMPTY;
692        parsed.build = semver::BuildMetadata::EMPTY;
693        Some((parsed.to_string(), prerelease, build_metadata))
694    }
695
696    /// Try to parse the version string as a semver string. Returns None if
697    /// not successful.
698    #[deprecated(note = "Use `lance_lib_version()` instead")]
699    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
700        // First split by '-' to separate the version from the pre-release tag
701        let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
702            (
703                &self.version[..dash_idx],
704                Some(&self.version[dash_idx + 1..]),
705            )
706        } else {
707            (self.version.as_str(), None)
708        };
709
710        let mut parts = version_part.split('.');
711        let major = parts.next().unwrap_or("0").parse().ok()?;
712        let minor = parts.next().unwrap_or("0").parse().ok()?;
713        let patch = parts.next().unwrap_or("0").parse().ok()?;
714
715        Some((major, minor, patch, tag))
716    }
717
718    /// If the library is "lance", parse the version as semver and return it.
719    /// Returns None if the library is not "lance" or the version cannot be parsed as semver.
720    ///
721    /// This method reconstructs the full semantic version by combining the version field
722    /// with the prerelease and build_metadata fields (if present). For example:
723    /// - version="2.0.0" + prerelease=Some("rc.1") -> "2.0.0-rc.1"
724    /// - version="2.0.0" + prerelease=Some("rc.1") + build_metadata=Some("build.123") -> "2.0.0-rc.1+build.123"
725    pub fn lance_lib_version(&self) -> Option<semver::Version> {
726        if self.library != "lance" {
727            return None;
728        }
729
730        let mut version = semver::Version::parse(&self.version).ok()?;
731
732        if let Some(ref prerelease) = self.prerelease {
733            version.pre = semver::Prerelease::new(prerelease).ok()?;
734        }
735
736        if let Some(ref build_metadata) = self.build_metadata {
737            version.build = semver::BuildMetadata::new(build_metadata).ok()?;
738        }
739
740        Some(version)
741    }
742
743    #[deprecated(
744        note = "Use `lance_lib_version()` instead, which safely checks the library field and returns Option"
745    )]
746    #[allow(deprecated)]
747    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
748        self.semver()
749            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
750    }
751
752    /// Check if this is a Lance library version older than the given major/minor/patch.
753    ///
754    /// # Panics
755    ///
756    /// Panics if the library is not "lance" or the version cannot be parsed as semver.
757    #[deprecated(note = "Use `lance_lib_version()` and its `older_than` method instead.")]
758    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
759        let version = self
760            .lance_lib_version()
761            .expect("Not lance library or invalid version");
762        let other = semver::Version {
763            major: major.into(),
764            minor: minor.into(),
765            patch: patch.into(),
766            pre: semver::Prerelease::EMPTY,
767            build: semver::BuildMetadata::EMPTY,
768        };
769        version < other
770    }
771
772    #[deprecated(note = "This is meant for testing and will be made private in future version.")]
773    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
774        let mut version = self.lance_lib_version().expect("Should be lance version");
775        bump_version(&mut version, part);
776        if !keep_tag {
777            version.pre = semver::Prerelease::EMPTY;
778        }
779        let (clean_version, prerelease, build_metadata) = Self::split_version(&version.to_string())
780            .expect("Bumped version should be valid semver");
781        Self {
782            library: self.library.clone(),
783            version: clean_version,
784            prerelease,
785            build_metadata,
786        }
787    }
788}
789
790impl Default for WriterVersion {
791    #[cfg(not(test))]
792    fn default() -> Self {
793        let full_version = env!("CARGO_PKG_VERSION");
794        let (version, prerelease, build_metadata) =
795            Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
796        Self {
797            library: "lance".to_string(),
798            version,
799            prerelease,
800            build_metadata,
801        }
802    }
803
804    // Unit tests always run as if they are in the next version.
805    #[cfg(test)]
806    #[allow(deprecated)]
807    fn default() -> Self {
808        let full_version = env!("CARGO_PKG_VERSION");
809        let (version, prerelease, build_metadata) =
810            Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
811        Self {
812            library: "lance".to_string(),
813            version,
814            prerelease,
815            build_metadata,
816        }
817        .bump(VersionPart::Patch, true)
818    }
819}
820
821impl ProtoStruct for Manifest {
822    type Proto = pb::Manifest;
823}
824
825impl From<pb::BasePath> for BasePath {
826    fn from(p: pb::BasePath) -> Self {
827        Self::new(p.id, p.path, p.name, p.is_dataset_root)
828    }
829}
830
831impl From<BasePath> for pb::BasePath {
832    fn from(p: BasePath) -> Self {
833        Self {
834            id: p.id,
835            name: p.name,
836            is_dataset_root: p.is_dataset_root,
837            path: p.path,
838        }
839    }
840}
841
842impl TryFrom<pb::Manifest> for Manifest {
843    type Error = Error;
844
845    fn try_from(p: pb::Manifest) -> Result<Self> {
846        let timestamp_nanos = p.timestamp.map(|ts| {
847            let sec = ts.seconds as u128 * 1e9 as u128;
848            let nanos = ts.nanos as u128;
849            sec + nanos
850        });
851        // We only use the writer version if it is fully set.
852        let writer_version = match p.writer_version {
853            Some(pb::manifest::WriterVersion {
854                library,
855                version,
856                prerelease,
857                build_metadata,
858            }) => Some(WriterVersion {
859                library,
860                version,
861                prerelease,
862                build_metadata,
863            }),
864            _ => None,
865        };
866        let fragments = Arc::new(
867            p.fragments
868                .into_iter()
869                .map(Fragment::try_from)
870                .collect::<Result<Vec<_>>>()?,
871        );
872        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
873        let fields_with_meta = FieldsWithMeta {
874            fields: Fields(p.fields),
875            metadata: p.schema_metadata,
876        };
877
878        if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
879            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
880        {
881            return Err(Error::internal("All fragments must have row ids"));
882        }
883
884        let data_storage_format = match p.data_format {
885            None => {
886                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
887                    // If there are fragments, they are a better indicator
888                    DataStorageFormat::new(inferred_version)
889                } else {
890                    // No fragments to inspect, best we can do is look at writer flags
891                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
892                        DataStorageFormat::new(LanceFileVersion::Stable)
893                    } else {
894                        DataStorageFormat::new(LanceFileVersion::Legacy)
895                    }
896                }
897            }
898            Some(format) => DataStorageFormat::from(format),
899        };
900
901        let schema = Schema::from(fields_with_meta);
902
903        Ok(Self {
904            schema,
905            version: p.version,
906            branch: p.branch,
907            writer_version,
908            version_aux_data: p.version_aux_data as usize,
909            index_section: p.index_section.map(|i| i as usize),
910            timestamp_nanos: timestamp_nanos.unwrap_or(0),
911            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
912            reader_feature_flags: p.reader_feature_flags,
913            writer_feature_flags: p.writer_feature_flags,
914            max_fragment_id: p.max_fragment_id,
915            fragments,
916            transaction_file: if p.transaction_file.is_empty() {
917                None
918            } else {
919                Some(p.transaction_file)
920            },
921            transaction_section: p.transaction_section.map(|i| i as usize),
922            fragment_offsets,
923            next_row_id: p.next_row_id,
924            data_storage_format,
925            config: p.config,
926            table_metadata: p.table_metadata,
927            base_paths: p
928                .base_paths
929                .iter()
930                .map(|item| (item.id, item.clone().into()))
931                .collect(),
932        })
933    }
934}
935
936impl From<&Manifest> for pb::Manifest {
937    fn from(m: &Manifest) -> Self {
938        let timestamp_nanos = if m.timestamp_nanos == 0 {
939            None
940        } else {
941            let nanos = m.timestamp_nanos % 1e9 as u128;
942            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
943            Some(Timestamp {
944                seconds,
945                nanos: nanos as i32,
946            })
947        };
948        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
949        Self {
950            fields: fields_with_meta.fields.0,
951            schema_metadata: m
952                .schema
953                .metadata
954                .iter()
955                .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
956                .collect(),
957            version: m.version,
958            branch: m.branch.clone(),
959            writer_version: m
960                .writer_version
961                .as_ref()
962                .map(|wv| pb::manifest::WriterVersion {
963                    library: wv.library.clone(),
964                    version: wv.version.clone(),
965                    prerelease: wv.prerelease.clone(),
966                    build_metadata: wv.build_metadata.clone(),
967                }),
968            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
969            table_metadata: m.table_metadata.clone(),
970            version_aux_data: m.version_aux_data as u64,
971            index_section: m.index_section.map(|i| i as u64),
972            timestamp: timestamp_nanos,
973            tag: m.tag.clone().unwrap_or_default(),
974            reader_feature_flags: m.reader_feature_flags,
975            writer_feature_flags: m.writer_feature_flags,
976            max_fragment_id: m.max_fragment_id,
977            transaction_file: m.transaction_file.clone().unwrap_or_default(),
978            next_row_id: m.next_row_id,
979            data_format: Some(pb::manifest::DataStorageFormat {
980                file_format: m.data_storage_format.file_format.clone(),
981                version: m.data_storage_format.version.clone(),
982            }),
983            config: m.config.clone(),
984            base_paths: m
985                .base_paths
986                .values()
987                .map(|base_path| pb::BasePath {
988                    id: base_path.id,
989                    name: base_path.name.clone(),
990                    is_dataset_root: base_path.is_dataset_root,
991                    path: base_path.path.clone(),
992                })
993                .collect(),
994            transaction_section: m.transaction_section.map(|i| i as u64),
995        }
996    }
997}
998
999#[async_trait]
1000pub trait SelfDescribingFileReader {
1001    /// Open a file reader without any cached schema
1002    ///
1003    /// In this case the schema will first need to be loaded
1004    /// from the file itself.
1005    ///
1006    /// When loading files from a dataset it is preferable to use
1007    /// the fragment reader to avoid this overhead.
1008    async fn try_new_self_described(
1009        object_store: &ObjectStore,
1010        path: &Path,
1011        cache: Option<&LanceCache>,
1012    ) -> Result<Self>
1013    where
1014        Self: Sized,
1015    {
1016        let reader = object_store.open(path).await?;
1017        Self::try_new_self_described_from_reader(reader.into(), cache).await
1018    }
1019
1020    async fn try_new_self_described_from_reader(
1021        reader: Arc<dyn Reader>,
1022        cache: Option<&LanceCache>,
1023    ) -> Result<Self>
1024    where
1025        Self: Sized;
1026}
1027
1028#[async_trait]
1029impl SelfDescribingFileReader for PreviousFileReader {
1030    async fn try_new_self_described_from_reader(
1031        reader: Arc<dyn Reader>,
1032        cache: Option<&LanceCache>,
1033    ) -> Result<Self> {
1034        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
1035        let manifest_position = metadata.manifest_position.ok_or(Error::internal(format!(
1036            "Attempt to open file at {} as self-describing but it did not contain a manifest",
1037            reader.path(),
1038        )))?;
1039        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
1040        if manifest.should_use_legacy_format() {
1041            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1042        }
1043        let schema = manifest.schema;
1044        let max_field_id = schema.max_field_id().unwrap_or_default();
1045        Self::try_new_from_reader(
1046            reader.path(),
1047            reader.clone(),
1048            Some(metadata),
1049            schema,
1050            0,
1051            0,
1052            max_field_id,
1053            cache,
1054        )
1055        .await
1056    }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061    use crate::format::{DataFile, DeletionFile, DeletionFileType};
1062    use std::num::NonZero;
1063
1064    use super::*;
1065
1066    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
1067    use lance_core::datatypes::Field;
1068
1069    #[test]
1070    fn test_writer_version() {
1071        let wv = WriterVersion::default();
1072        assert_eq!(wv.library, "lance");
1073
1074        // Parse the actual cargo version to check if it has a pre-release tag
1075        let cargo_version = env!("CARGO_PKG_VERSION");
1076        let expected_tag = if cargo_version.contains('-') {
1077            Some(cargo_version.split('-').nth(1).unwrap())
1078        } else {
1079            None
1080        };
1081
1082        // Verify the version field contains only major.minor.patch
1083        let version_parts: Vec<&str> = wv.version.split('.').collect();
1084        assert_eq!(
1085            version_parts.len(),
1086            3,
1087            "Version should be major.minor.patch"
1088        );
1089        assert!(
1090            !wv.version.contains('-'),
1091            "Version field should not contain prerelease"
1092        );
1093
1094        // Verify the prerelease field matches the expected tag
1095        assert_eq!(wv.prerelease.as_deref(), expected_tag);
1096        // Build metadata should be None for default version
1097        assert_eq!(wv.build_metadata, None);
1098
1099        // Verify lance_lib_version() reconstructs the full semver correctly
1100        let version = wv.lance_lib_version().unwrap();
1101        assert_eq!(
1102            version.major,
1103            env!("CARGO_PKG_VERSION_MAJOR").parse::<u64>().unwrap()
1104        );
1105        assert_eq!(
1106            version.minor,
1107            env!("CARGO_PKG_VERSION_MINOR").parse::<u64>().unwrap()
1108        );
1109        assert_eq!(
1110            version.patch,
1111            // Unit tests run against (major,minor,patch + 1)
1112            env!("CARGO_PKG_VERSION_PATCH").parse::<u64>().unwrap() + 1
1113        );
1114        assert_eq!(version.pre.as_str(), expected_tag.unwrap_or(""));
1115
1116        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1117            let mut bumped_version = version.clone();
1118            bump_version(&mut bumped_version, *part);
1119            assert!(version < bumped_version);
1120        }
1121    }
1122
1123    #[test]
1124    fn test_writer_version_split() {
1125        // Test splitting version with prerelease
1126        let (version, prerelease, build_metadata) =
1127            WriterVersion::split_version("2.0.0-rc.1").unwrap();
1128        assert_eq!(version, "2.0.0");
1129        assert_eq!(prerelease, Some("rc.1".to_string()));
1130        assert_eq!(build_metadata, None);
1131
1132        // Test splitting version without prerelease
1133        let (version, prerelease, build_metadata) = WriterVersion::split_version("2.0.0").unwrap();
1134        assert_eq!(version, "2.0.0");
1135        assert_eq!(prerelease, None);
1136        assert_eq!(build_metadata, None);
1137
1138        // Test splitting version with prerelease and build metadata
1139        let (version, prerelease, build_metadata) =
1140            WriterVersion::split_version("2.0.0-rc.1+build.123").unwrap();
1141        assert_eq!(version, "2.0.0");
1142        assert_eq!(prerelease, Some("rc.1".to_string()));
1143        assert_eq!(build_metadata, Some("build.123".to_string()));
1144
1145        // Test splitting version with only build metadata
1146        let (version, prerelease, build_metadata) =
1147            WriterVersion::split_version("2.0.0+build.123").unwrap();
1148        assert_eq!(version, "2.0.0");
1149        assert_eq!(prerelease, None);
1150        assert_eq!(build_metadata, Some("build.123".to_string()));
1151
1152        // Test with invalid version returns None
1153        assert!(WriterVersion::split_version("not-a-version").is_none());
1154    }
1155
1156    #[test]
1157    fn test_writer_version_comparison_with_prerelease() {
1158        let v1 = WriterVersion {
1159            library: "lance".to_string(),
1160            version: "2.0.0".to_string(),
1161            prerelease: Some("rc.1".to_string()),
1162            build_metadata: None,
1163        };
1164
1165        let v2 = WriterVersion {
1166            library: "lance".to_string(),
1167            version: "2.0.0".to_string(),
1168            prerelease: None,
1169            build_metadata: None,
1170        };
1171
1172        let semver1 = v1.lance_lib_version().unwrap();
1173        let semver2 = v2.lance_lib_version().unwrap();
1174
1175        // rc.1 should be less than the release version
1176        assert!(semver1 < semver2);
1177    }
1178
1179    #[test]
1180    fn test_writer_version_with_build_metadata() {
1181        let v = WriterVersion {
1182            library: "lance".to_string(),
1183            version: "2.0.0".to_string(),
1184            prerelease: Some("rc.1".to_string()),
1185            build_metadata: Some("build.123".to_string()),
1186        };
1187
1188        let semver = v.lance_lib_version().unwrap();
1189        assert_eq!(semver.to_string(), "2.0.0-rc.1+build.123");
1190        assert_eq!(semver.major, 2);
1191        assert_eq!(semver.minor, 0);
1192        assert_eq!(semver.patch, 0);
1193        assert_eq!(semver.pre.as_str(), "rc.1");
1194        assert_eq!(semver.build.as_str(), "build.123");
1195    }
1196
1197    #[test]
1198    fn test_writer_version_non_semver() {
1199        // Test that Lance library can have non-semver version strings
1200        let v = WriterVersion {
1201            library: "lance".to_string(),
1202            version: "custom-build-v1".to_string(),
1203            prerelease: None,
1204            build_metadata: None,
1205        };
1206
1207        // lance_lib_version should return None for non-semver
1208        assert!(v.lance_lib_version().is_none());
1209
1210        // But the WriterVersion itself should still be valid and usable
1211        assert_eq!(v.library, "lance");
1212        assert_eq!(v.version, "custom-build-v1");
1213    }
1214
1215    #[test]
1216    #[allow(deprecated)]
1217    fn test_older_than_with_prerelease() {
1218        // Test that older_than correctly handles prerelease
1219        let v_rc = WriterVersion {
1220            library: "lance".to_string(),
1221            version: "2.0.0".to_string(),
1222            prerelease: Some("rc.1".to_string()),
1223            build_metadata: None,
1224        };
1225
1226        // 2.0.0-rc.1 should be older than 2.0.0
1227        assert!(v_rc.older_than(2, 0, 0));
1228
1229        // 2.0.0-rc.1 should be older than 2.0.1
1230        assert!(v_rc.older_than(2, 0, 1));
1231
1232        // 2.0.0-rc.1 should not be older than 1.9.9
1233        assert!(!v_rc.older_than(1, 9, 9));
1234
1235        let v_release = WriterVersion {
1236            library: "lance".to_string(),
1237            version: "2.0.0".to_string(),
1238            prerelease: None,
1239            build_metadata: None,
1240        };
1241
1242        // 2.0.0 should not be older than 2.0.0
1243        assert!(!v_release.older_than(2, 0, 0));
1244
1245        // 2.0.0 should be older than 2.0.1
1246        assert!(v_release.older_than(2, 0, 1));
1247    }
1248
1249    #[test]
1250    fn test_fragments_by_offset_range() {
1251        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1252            "a",
1253            arrow_schema::DataType::Int64,
1254            false,
1255        )]);
1256        let schema = Schema::try_from(&arrow_schema).unwrap();
1257        let fragments = vec![
1258            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1259            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1260            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1261        ];
1262        let manifest = Manifest::new(
1263            schema,
1264            Arc::new(fragments),
1265            DataStorageFormat::default(),
1266            HashMap::new(),
1267        );
1268
1269        let actual = manifest.fragments_by_offset_range(0..10);
1270        assert_eq!(actual.len(), 1);
1271        assert_eq!(actual[0].0, 0);
1272        assert_eq!(actual[0].1.id, 0);
1273
1274        let actual = manifest.fragments_by_offset_range(5..15);
1275        assert_eq!(actual.len(), 2);
1276        assert_eq!(actual[0].0, 0);
1277        assert_eq!(actual[0].1.id, 0);
1278        assert_eq!(actual[1].0, 10);
1279        assert_eq!(actual[1].1.id, 1);
1280
1281        let actual = manifest.fragments_by_offset_range(15..50);
1282        assert_eq!(actual.len(), 2);
1283        assert_eq!(actual[0].0, 10);
1284        assert_eq!(actual[0].1.id, 1);
1285        assert_eq!(actual[1].0, 25);
1286        assert_eq!(actual[1].1.id, 2);
1287
1288        // Out of range
1289        let actual = manifest.fragments_by_offset_range(45..100);
1290        assert!(actual.is_empty());
1291
1292        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1293    }
1294
1295    #[test]
1296    fn test_max_field_id() {
1297        // Validate that max field id handles varying field ids by fragment.
1298        let mut field0 =
1299            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1300        field0.set_id(-1, &mut 0);
1301        let mut field2 =
1302            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1303        field2.set_id(-1, &mut 2);
1304
1305        let schema = Schema {
1306            fields: vec![field0, field2],
1307            metadata: Default::default(),
1308        };
1309        let fragments = vec![
1310            Fragment {
1311                id: 0,
1312                files: vec![DataFile::new_legacy_from_fields(
1313                    "path1",
1314                    vec![0, 1, 2],
1315                    None,
1316                )],
1317                deletion_file: None,
1318                row_id_meta: None,
1319                physical_rows: None,
1320                created_at_version_meta: None,
1321                last_updated_at_version_meta: None,
1322            },
1323            Fragment {
1324                id: 1,
1325                files: vec![
1326                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1327                    DataFile::new_legacy_from_fields("path3", vec![2], None),
1328                ],
1329                deletion_file: None,
1330                row_id_meta: None,
1331                physical_rows: None,
1332                created_at_version_meta: None,
1333                last_updated_at_version_meta: None,
1334            },
1335        ];
1336
1337        let manifest = Manifest::new(
1338            schema,
1339            Arc::new(fragments),
1340            DataStorageFormat::default(),
1341            HashMap::new(),
1342        );
1343
1344        assert_eq!(manifest.max_field_id(), 43);
1345    }
1346
1347    #[test]
1348    fn test_config() {
1349        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1350            "a",
1351            arrow_schema::DataType::Int64,
1352            false,
1353        )]);
1354        let schema = Schema::try_from(&arrow_schema).unwrap();
1355        let fragments = vec![
1356            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1357            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1358            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1359        ];
1360        let mut manifest = Manifest::new(
1361            schema,
1362            Arc::new(fragments),
1363            DataStorageFormat::default(),
1364            HashMap::new(),
1365        );
1366
1367        let mut config = manifest.config.clone();
1368        config.insert("lance.test".to_string(), "value".to_string());
1369        config.insert("other-key".to_string(), "other-value".to_string());
1370
1371        manifest.config_mut().extend(config.clone());
1372        assert_eq!(manifest.config, config.clone());
1373
1374        config.remove("other-key");
1375        manifest.config_mut().remove("other-key");
1376        assert_eq!(manifest.config, config);
1377    }
1378
1379    #[test]
1380    fn test_manifest_summary() {
1381        // Step 1: test empty manifest summary
1382        let arrow_schema = ArrowSchema::new(vec![
1383            ArrowField::new("id", arrow_schema::DataType::Int64, false),
1384            ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1385        ]);
1386        let schema = Schema::try_from(&arrow_schema).unwrap();
1387
1388        let empty_manifest = Manifest::new(
1389            schema.clone(),
1390            Arc::new(vec![]),
1391            DataStorageFormat::default(),
1392            HashMap::new(),
1393        );
1394
1395        let empty_summary = empty_manifest.summary();
1396        assert_eq!(empty_summary.total_rows, 0);
1397        assert_eq!(empty_summary.total_files_size, 0);
1398        assert_eq!(empty_summary.total_fragments, 0);
1399        assert_eq!(empty_summary.total_data_files, 0);
1400        assert_eq!(empty_summary.total_deletion_file_rows, 0);
1401        assert_eq!(empty_summary.total_data_file_rows, 0);
1402        assert_eq!(empty_summary.total_deletion_files, 0);
1403
1404        // Step 2: write empty files and verify summary
1405        let empty_fragments = vec![
1406            Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1407            Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1408        ];
1409
1410        let empty_files_manifest = Manifest::new(
1411            schema.clone(),
1412            Arc::new(empty_fragments),
1413            DataStorageFormat::default(),
1414            HashMap::new(),
1415        );
1416
1417        let empty_files_summary = empty_files_manifest.summary();
1418        assert_eq!(empty_files_summary.total_rows, 0);
1419        assert_eq!(empty_files_summary.total_files_size, 0);
1420        assert_eq!(empty_files_summary.total_fragments, 2);
1421        assert_eq!(empty_files_summary.total_data_files, 2);
1422        assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1423        assert_eq!(empty_files_summary.total_data_file_rows, 0);
1424        assert_eq!(empty_files_summary.total_deletion_files, 0);
1425
1426        // Step 3: write real data and verify summary
1427        let real_fragments = vec![
1428            Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1429            Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1430            Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1431        ];
1432
1433        let real_data_manifest = Manifest::new(
1434            schema.clone(),
1435            Arc::new(real_fragments),
1436            DataStorageFormat::default(),
1437            HashMap::new(),
1438        );
1439
1440        let real_data_summary = real_data_manifest.summary();
1441        assert_eq!(real_data_summary.total_rows, 425); // 100 + 250 + 75
1442        assert_eq!(real_data_summary.total_files_size, 0); // Zero for unknown
1443        assert_eq!(real_data_summary.total_fragments, 3);
1444        assert_eq!(real_data_summary.total_data_files, 3);
1445        assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1446        assert_eq!(real_data_summary.total_data_file_rows, 425);
1447        assert_eq!(real_data_summary.total_deletion_files, 0);
1448
1449        let file_version = LanceFileVersion::default();
1450        // Step 4: write deletion files and verify summary
1451        let mut fragment_with_deletion = Fragment::new(0)
1452            .with_file(
1453                "data_with_deletion.lance",
1454                vec![0, 1],
1455                vec![0, 1],
1456                &file_version,
1457                NonZero::new(1000),
1458            )
1459            .with_physical_rows(50);
1460        fragment_with_deletion.deletion_file = Some(DeletionFile {
1461            read_version: 123,
1462            id: 456,
1463            file_type: DeletionFileType::Array,
1464            num_deleted_rows: Some(10),
1465            base_id: None,
1466        });
1467
1468        let manifest_with_deletion = Manifest::new(
1469            schema,
1470            Arc::new(vec![fragment_with_deletion]),
1471            DataStorageFormat::default(),
1472            HashMap::new(),
1473        );
1474
1475        let deletion_summary = manifest_with_deletion.summary();
1476        assert_eq!(deletion_summary.total_rows, 40); // 50 - 10
1477        assert_eq!(deletion_summary.total_files_size, 1000);
1478        assert_eq!(deletion_summary.total_fragments, 1);
1479        assert_eq!(deletion_summary.total_data_files, 1);
1480        assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1481        assert_eq!(deletion_summary.total_data_file_rows, 50);
1482        assert_eq!(deletion_summary.total_deletion_files, 1);
1483
1484        //Just verify the transformation is OK
1485        let stats_map: BTreeMap<String, String> = deletion_summary.into();
1486        assert_eq!(stats_map.len(), 7)
1487    }
1488}