Skip to main content

lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{Fields, FieldsWithMeta, populate_schema_dictionary};
8use lance_file::previous::reader::FileReader as PreviousFileReader;
9use lance_file::version::{LEGACY_FORMAT_VERSION, LanceFileVersion};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{FLAG_STABLE_ROW_IDS, has_deprecated_v2_feature_flag};
20use crate::format::fragment::DataFileFieldInterner;
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::Schema;
24use lance_core::{Error, Result};
25use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
26use lance_io::utils::read_struct;
27
28/// Manifest of a dataset
29///
30///  * Schema
31///  * Version
32///  * Fragments.
33///  * Indices.
34#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36    /// Dataset schema.
37    pub schema: Schema,
38
39    /// Dataset version
40    pub version: u64,
41
42    /// Branch name, None if the dataset is the main branch.
43    pub branch: Option<String>,
44
45    /// Version of the writer library that wrote this manifest.
46    pub writer_version: Option<WriterVersion>,
47
48    /// Fragments, the pieces to build the dataset.
49    ///
50    /// This list is stored in order, sorted by fragment id.  However, the fragment id
51    /// sequence may have gaps.
52    pub fragments: Arc<Vec<Fragment>>,
53
54    /// The file position of the version aux data.
55    pub version_aux_data: usize,
56
57    /// The file position of the index metadata.
58    pub index_section: Option<usize>,
59
60    /// The creation timestamp with nanosecond resolution as 128-bit integer
61    pub timestamp_nanos: u128,
62
63    /// An optional string tag for this version
64    pub tag: Option<String>,
65
66    /// The reader flags
67    pub reader_feature_flags: u64,
68
69    /// The writer flags
70    pub writer_feature_flags: u64,
71
72    /// The max fragment id used so far
73    /// None means never set, Some(0) means max ID used so far is 0
74    pub max_fragment_id: Option<u32>,
75
76    /// The path to the transaction file, relative to the root of the dataset
77    pub transaction_file: Option<String>,
78
79    /// The file position of the inline transaction content inside the manifest
80    pub transaction_section: Option<usize>,
81
82    /// Precomputed logic offset of each fragment
83    /// accelerating the fragment search using offset ranges.
84    fragment_offsets: Vec<usize>,
85
86    /// The max row id used so far.
87    pub next_row_id: u64,
88
89    /// The storage format of the data files.
90    pub data_storage_format: DataStorageFormat,
91
92    /// Table configuration.
93    pub config: HashMap<String, String>,
94
95    /// Table metadata.
96    ///
97    /// This is a key-value map that can be used to store arbitrary metadata
98    /// associated with the table. This is different than configuration, which
99    /// is used to tell libraries how to read, write, or manage the table.
100    pub table_metadata: HashMap<String, String>,
101
102    /* external base paths */
103    pub base_paths: HashMap<u32, BasePath>,
104}
105
106// We use the most significant bit to indicate that a transaction is detached
107pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
108
109pub fn is_detached_version(version: u64) -> bool {
110    version & DETACHED_VERSION_MASK != 0
111}
112
113fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
114    fragments
115        .iter()
116        .map(|f| f.num_rows().unwrap_or_default())
117        .chain([0]) // Make the last offset to be the full-length of the dataset.
118        .scan(0_usize, |offset, len| {
119            let start = *offset;
120            *offset += len;
121            Some(start)
122        })
123        .collect()
124}
125
126#[derive(Default)]
127pub struct ManifestSummary {
128    pub total_fragments: u64,
129    pub total_data_files: u64,
130    pub total_files_size: u64,
131    pub total_deletion_files: u64,
132    pub total_data_file_rows: u64,
133    pub total_deletion_file_rows: u64,
134    pub total_rows: u64,
135}
136
137impl From<ManifestSummary> for BTreeMap<String, String> {
138    fn from(summary: ManifestSummary) -> Self {
139        let mut stats_map = Self::new();
140        stats_map.insert(
141            "total_fragments".to_string(),
142            summary.total_fragments.to_string(),
143        );
144        stats_map.insert(
145            "total_data_files".to_string(),
146            summary.total_data_files.to_string(),
147        );
148        stats_map.insert(
149            "total_files_size".to_string(),
150            summary.total_files_size.to_string(),
151        );
152        stats_map.insert(
153            "total_deletion_files".to_string(),
154            summary.total_deletion_files.to_string(),
155        );
156        stats_map.insert(
157            "total_data_file_rows".to_string(),
158            summary.total_data_file_rows.to_string(),
159        );
160        stats_map.insert(
161            "total_deletion_file_rows".to_string(),
162            summary.total_deletion_file_rows.to_string(),
163        );
164        stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
165        stats_map
166    }
167}
168
169impl Manifest {
170    pub fn new(
171        schema: Schema,
172        fragments: Arc<Vec<Fragment>>,
173        data_storage_format: DataStorageFormat,
174        base_paths: HashMap<u32, BasePath>,
175    ) -> Self {
176        let fragment_offsets = compute_fragment_offsets(&fragments);
177
178        Self {
179            schema,
180            version: 1,
181            branch: None,
182            writer_version: Some(WriterVersion::default()),
183            fragments,
184            version_aux_data: 0,
185            index_section: None,
186            timestamp_nanos: 0,
187            tag: None,
188            reader_feature_flags: 0,
189            writer_feature_flags: 0,
190            max_fragment_id: None,
191            transaction_file: None,
192            transaction_section: None,
193            fragment_offsets,
194            next_row_id: 0,
195            data_storage_format,
196            config: HashMap::new(),
197            table_metadata: HashMap::new(),
198            base_paths,
199        }
200    }
201
202    pub fn new_from_previous(
203        previous: &Self,
204        schema: Schema,
205        fragments: Arc<Vec<Fragment>>,
206    ) -> Self {
207        let fragment_offsets = compute_fragment_offsets(&fragments);
208
209        Self {
210            schema,
211            version: previous.version + 1,
212            branch: previous.branch.clone(),
213            writer_version: Some(WriterVersion::default()),
214            fragments,
215            version_aux_data: 0,
216            index_section: None, // Caller should update index if they want to keep them.
217            timestamp_nanos: 0,  // This will be set on commit
218            tag: None,
219            reader_feature_flags: 0, // These will be set on commit
220            writer_feature_flags: 0, // These will be set on commit
221            max_fragment_id: previous.max_fragment_id,
222            transaction_file: None,
223            transaction_section: None,
224            fragment_offsets,
225            next_row_id: previous.next_row_id,
226            data_storage_format: previous.data_storage_format.clone(),
227            config: previous.config.clone(),
228            table_metadata: previous.table_metadata.clone(),
229            base_paths: previous.base_paths.clone(),
230        }
231    }
232
233    /// Performs a shallow_clone of the manifest entirely in memory without:
234    /// - Any persistent storage operations
235    /// - Modifications to the original data
236    /// - If the shallow clone is for branch, ref_name is the source branch
237    pub fn shallow_clone(
238        &self,
239        ref_name: Option<String>,
240        ref_path: String,
241        ref_base_id: u32,
242        branch_name: Option<String>,
243        transaction_file: String,
244    ) -> Self {
245        let cloned_fragments = self
246            .fragments
247            .as_ref()
248            .iter()
249            .map(|fragment| {
250                let mut cloned_fragment = fragment.clone();
251                for file in &mut cloned_fragment.files {
252                    if file.base_id.is_none() {
253                        file.base_id = Some(ref_base_id);
254                    }
255                }
256
257                if let Some(deletion) = &mut cloned_fragment.deletion_file
258                    && deletion.base_id.is_none()
259                {
260                    deletion.base_id = Some(ref_base_id);
261                }
262                cloned_fragment
263            })
264            .collect::<Vec<_>>();
265
266        Self {
267            schema: self.schema.clone(),
268            version: self.version,
269            branch: branch_name,
270            writer_version: self.writer_version.clone(),
271            fragments: Arc::new(cloned_fragments),
272            version_aux_data: self.version_aux_data,
273            index_section: None, // These will be set on commit
274            timestamp_nanos: self.timestamp_nanos,
275            tag: None,
276            reader_feature_flags: 0, // These will be set on commit
277            writer_feature_flags: 0, // These will be set on commit
278            max_fragment_id: self.max_fragment_id,
279            transaction_file: Some(transaction_file),
280            transaction_section: None,
281            fragment_offsets: self.fragment_offsets.clone(),
282            next_row_id: self.next_row_id,
283            data_storage_format: self.data_storage_format.clone(),
284            config: self.config.clone(),
285            base_paths: {
286                let mut base_paths = self.base_paths.clone();
287                let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
288                base_paths.insert(ref_base_id, base_path);
289                base_paths
290            },
291            table_metadata: self.table_metadata.clone(),
292        }
293    }
294
295    /// Return the `timestamp_nanos` value as a Utc DateTime
296    pub fn timestamp(&self) -> DateTime<Utc> {
297        let nanos = self.timestamp_nanos % 1_000_000_000;
298        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
299        Utc.from_utc_datetime(
300            &DateTime::from_timestamp(seconds, nanos as u32)
301                .unwrap_or_default()
302                .naive_utc(),
303        )
304    }
305
306    /// Set the `timestamp_nanos` value from a Utc DateTime
307    pub fn set_timestamp(&mut self, nanos: u128) {
308        self.timestamp_nanos = nanos;
309    }
310
311    /// Get a mutable reference to the config
312    pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
313        &mut self.config
314    }
315
316    /// Get a mutable reference to the table metadata
317    pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
318        &mut self.table_metadata
319    }
320
321    /// Get a mutable reference to the schema metadata
322    pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
323        &mut self.schema.metadata
324    }
325
326    /// Get a mutable reference to the field metadata for a specific field id
327    ///
328    /// Returns None if the field does not exist in the schema.
329    pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
330        self.schema
331            .field_by_id_mut(field_id)
332            .map(|field| &mut field.metadata)
333    }
334
335    /// Set the `config` from an iterator
336    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
337    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
338        self.config.extend(upsert_values);
339    }
340
341    /// Delete `config` keys using a slice of keys
342    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
343    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
344        self.config
345            .retain(|key, _| !delete_keys.contains(&key.as_str()));
346    }
347
348    /// Replaces the schema metadata with the given key-value pairs.
349    #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
350    pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
351        self.schema.metadata = new_metadata;
352    }
353
354    /// Replaces the metadata of the field with the given id with the given key-value pairs.
355    ///
356    /// If the field does not exist in the schema, this is a no-op.
357    #[deprecated(
358        note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
359    )]
360    pub fn replace_field_metadata(
361        &mut self,
362        field_id: i32,
363        new_metadata: HashMap<String, String>,
364    ) -> Result<()> {
365        if let Some(field) = self.schema.field_by_id_mut(field_id) {
366            field.metadata = new_metadata;
367            Ok(())
368        } else {
369            Err(Error::invalid_input(format!(
370                "Field with id {} does not exist for replace_field_metadata",
371                field_id
372            )))
373        }
374    }
375
376    /// Check the current fragment list and update the high water mark
377    pub fn update_max_fragment_id(&mut self) {
378        // If there are no fragments, don't update max_fragment_id
379        if self.fragments.is_empty() {
380            return;
381        }
382
383        let max_fragment_id = self
384            .fragments
385            .iter()
386            .map(|f| f.id)
387            .max()
388            .unwrap() // Safe because we checked fragments is not empty
389            .try_into()
390            .unwrap();
391
392        match self.max_fragment_id {
393            None => {
394                // First time being set
395                self.max_fragment_id = Some(max_fragment_id);
396            }
397            Some(current_max) => {
398                // Only update if the computed max is greater than current
399                // This preserves the high water mark even when fragments are deleted
400                if max_fragment_id > current_max {
401                    self.max_fragment_id = Some(max_fragment_id);
402                }
403            }
404        }
405    }
406
407    /// Return the max fragment id.
408    /// Note this does not support recycling of fragment ids.
409    ///
410    /// This will return None if there are no fragments and max_fragment_id was never set.
411    pub fn max_fragment_id(&self) -> Option<u64> {
412        if let Some(max_id) = self.max_fragment_id {
413            // Return the stored high water mark
414            Some(max_id.into())
415        } else {
416            // Not yet set, compute from fragment list
417            self.fragments.iter().map(|f| f.id).max()
418        }
419    }
420
421    /// Get the max used field id
422    ///
423    /// This is different than [Schema::max_field_id] because it also considers
424    /// the field ids in the data files that have been dropped from the schema.
425    pub fn max_field_id(&self) -> i32 {
426        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
427        let fragment_max_id = self
428            .fragments
429            .iter()
430            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.iter()))
431            .max()
432            .copied();
433        let fragment_max_id = fragment_max_id.unwrap_or(-1);
434        schema_max_id.max(fragment_max_id)
435    }
436
437    /// Return the fragments that are newer than the given manifest.
438    /// Note this does not support recycling of fragment ids.
439    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
440        if since.version >= self.version {
441            return Err(Error::invalid_input(format!(
442                "fragments_since: given version {} is newer than manifest version {}",
443                since.version, self.version
444            )));
445        }
446        let start = since.max_fragment_id();
447        Ok(self
448            .fragments
449            .iter()
450            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
451            .cloned()
452            .collect())
453    }
454
455    /// Find the fragments that contain the rows, identified by the offset range.
456    ///
457    /// Note that the offsets are the logical offsets of rows, not row IDs.
458    ///
459    ///
460    /// Parameters
461    /// ----------
462    /// range: `Range<usize>`
463    ///     Offset range
464    ///
465    /// Returns
466    /// -------
467    /// Vec<(usize, Fragment)>
468    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
469    ///
470    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
471        let start = range.start;
472        let end = range.end;
473        let idx = self
474            .fragment_offsets
475            .binary_search(&start)
476            .unwrap_or_else(|idx| idx - 1);
477
478        let mut fragments = vec![];
479        for i in idx..self.fragments.len() {
480            if self.fragment_offsets[i] >= end
481                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
482                    <= start
483            {
484                break;
485            }
486            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
487        }
488
489        fragments
490    }
491
492    /// Whether the dataset uses stable row ids.
493    pub fn uses_stable_row_ids(&self) -> bool {
494        self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
495    }
496
497    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
498    /// and can be used to create a dataset
499    pub fn serialized(&self) -> Vec<u8> {
500        let pb_manifest: pb::Manifest = self.into();
501        pb_manifest.encode_to_vec()
502    }
503
504    pub fn should_use_legacy_format(&self) -> bool {
505        self.data_storage_format.version == LEGACY_FORMAT_VERSION
506    }
507
508    /// Get the summary information of a manifest.
509    ///
510    /// This function calculates various statistics about the manifest, including:
511    /// - total_files_size: Total size of all data files in bytes
512    /// - total_fragments: Total number of fragments in the dataset
513    /// - total_data_files: Total number of data files across all fragments
514    /// - total_deletion_files: Total number of deletion files
515    /// - total_data_file_rows: Total number of rows in data files
516    /// - total_deletion_file_rows: Total number of deleted rows in deletion files
517    /// - total_rows: Total number of rows in the dataset
518    pub fn summary(&self) -> ManifestSummary {
519        // Calculate total fragments
520        let mut summary =
521            self.fragments
522                .iter()
523                .fold(ManifestSummary::default(), |mut summary, f| {
524                    // Count data files in the current fragment
525                    summary.total_data_files += f.files.len() as u64;
526                    // Sum the number of rows for the current fragment (if available)
527                    if let Some(num_rows) = f.num_rows() {
528                        summary.total_rows += num_rows as u64;
529                    }
530                    // Sum file sizes for all data files in the current fragment (if available)
531                    for data_file in &f.files {
532                        if let Some(size_bytes) = data_file.file_size_bytes.get() {
533                            summary.total_files_size += size_bytes.get();
534                        }
535                    }
536                    // Check and count if the current fragment has a deletion file
537                    if f.deletion_file.is_some() {
538                        summary.total_deletion_files += 1;
539                    }
540                    // Sum the number of deleted rows from the deletion file (if available)
541                    if let Some(deletion_file) = &f.deletion_file
542                        && let Some(num_deleted) = deletion_file.num_deleted_rows
543                    {
544                        summary.total_deletion_file_rows += num_deleted as u64;
545                    }
546                    summary
547                });
548        summary.total_fragments = self.fragments.len() as u64;
549        summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
550
551        summary
552    }
553}
554
555#[derive(Debug, Clone, PartialEq)]
556pub struct BasePath {
557    pub id: u32,
558    pub name: Option<String>,
559    pub is_dataset_root: bool,
560    /// The full URI string (e.g., "s3://bucket/path")
561    pub path: String,
562}
563
564impl BasePath {
565    /// Create a new BasePath
566    ///
567    /// # Arguments
568    ///
569    /// * `id` - Unique identifier for this base path
570    /// * `path` - Full URI string (e.g., "s3://bucket/path", "/local/path")
571    /// * `name` - Optional human-readable name for this base
572    /// * `is_dataset_root` - Whether this is the dataset root or a data-only base
573    pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
574        Self {
575            id,
576            name,
577            is_dataset_root,
578            path,
579        }
580    }
581
582    /// Extract the object store path from this BasePath's URI.
583    ///
584    /// This is a synchronous operation that parses the URI without initializing an object store.
585    pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
586        ObjectStore::extract_path_from_uri(registry, &self.path)
587    }
588}
589
590impl DeepSizeOf for BasePath {
591    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
592        self.name.deep_size_of_children(context)
593            + self.path.deep_size_of_children(context) * 2
594            + size_of::<bool>()
595    }
596}
597
598#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
599pub struct WriterVersion {
600    pub library: String,
601    pub version: String,
602    pub prerelease: Option<String>,
603    pub build_metadata: Option<String>,
604}
605
606#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
607pub struct DataStorageFormat {
608    pub file_format: String,
609    pub version: String,
610}
611
612const LANCE_FORMAT_NAME: &str = "lance";
613
614impl DataStorageFormat {
615    pub fn new(version: LanceFileVersion) -> Self {
616        Self {
617            file_format: LANCE_FORMAT_NAME.to_string(),
618            version: version.resolve().to_string(),
619        }
620    }
621
622    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
623        self.version.parse::<LanceFileVersion>()
624    }
625}
626
627impl Default for DataStorageFormat {
628    fn default() -> Self {
629        Self::new(LanceFileVersion::default())
630    }
631}
632
633impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
634    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
635        Self {
636            file_format: pb.file_format,
637            version: pb.version,
638        }
639    }
640}
641
642#[derive(Debug, Clone, Copy, PartialEq, Eq)]
643pub enum VersionPart {
644    Major,
645    Minor,
646    Patch,
647}
648
649fn bump_version(version: &mut semver::Version, part: VersionPart) {
650    match part {
651        VersionPart::Major => {
652            version.major += 1;
653            version.minor = 0;
654            version.patch = 0;
655        }
656        VersionPart::Minor => {
657            version.minor += 1;
658            version.patch = 0;
659        }
660        VersionPart::Patch => {
661            version.patch += 1;
662        }
663    }
664}
665
666impl WriterVersion {
667    /// Split a version string into clean version (major.minor.patch), prerelease, and build metadata.
668    ///
669    /// Returns None if the input is not a valid semver string.
670    ///
671    /// For example:
672    /// - "2.0.0-rc.1" -> Some(("2.0.0", Some("rc.1"), None))
673    /// - "2.0.0-rc.1+build.123" -> Some(("2.0.0", Some("rc.1"), Some("build.123")))
674    /// - "2.0.0+build.123" -> Some(("2.0.0", None, Some("build.123")))
675    /// - "not-a-version" -> None
676    fn split_version(full_version: &str) -> Option<(String, Option<String>, Option<String>)> {
677        let mut parsed = semver::Version::parse(full_version).ok()?;
678
679        let prerelease = if parsed.pre.is_empty() {
680            None
681        } else {
682            Some(parsed.pre.to_string())
683        };
684
685        let build_metadata = if parsed.build.is_empty() {
686            None
687        } else {
688            Some(parsed.build.to_string())
689        };
690
691        // Remove prerelease and build metadata to get clean version
692        parsed.pre = semver::Prerelease::EMPTY;
693        parsed.build = semver::BuildMetadata::EMPTY;
694        Some((parsed.to_string(), prerelease, build_metadata))
695    }
696
697    /// Try to parse the version string as a semver string. Returns None if
698    /// not successful.
699    #[deprecated(note = "Use `lance_lib_version()` instead")]
700    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
701        // First split by '-' to separate the version from the pre-release tag
702        let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
703            (
704                &self.version[..dash_idx],
705                Some(&self.version[dash_idx + 1..]),
706            )
707        } else {
708            (self.version.as_str(), None)
709        };
710
711        let mut parts = version_part.split('.');
712        let major = parts.next().unwrap_or("0").parse().ok()?;
713        let minor = parts.next().unwrap_or("0").parse().ok()?;
714        let patch = parts.next().unwrap_or("0").parse().ok()?;
715
716        Some((major, minor, patch, tag))
717    }
718
719    /// If the library is "lance", parse the version as semver and return it.
720    /// Returns None if the library is not "lance" or the version cannot be parsed as semver.
721    ///
722    /// This method reconstructs the full semantic version by combining the version field
723    /// with the prerelease and build_metadata fields (if present). For example:
724    /// - version="2.0.0" + prerelease=Some("rc.1") -> "2.0.0-rc.1"
725    /// - version="2.0.0" + prerelease=Some("rc.1") + build_metadata=Some("build.123") -> "2.0.0-rc.1+build.123"
726    pub fn lance_lib_version(&self) -> Option<semver::Version> {
727        if self.library != "lance" {
728            return None;
729        }
730
731        let mut version = semver::Version::parse(&self.version).ok()?;
732
733        if let Some(ref prerelease) = self.prerelease {
734            version.pre = semver::Prerelease::new(prerelease).ok()?;
735        }
736
737        if let Some(ref build_metadata) = self.build_metadata {
738            version.build = semver::BuildMetadata::new(build_metadata).ok()?;
739        }
740
741        Some(version)
742    }
743
744    #[deprecated(
745        note = "Use `lance_lib_version()` instead, which safely checks the library field and returns Option"
746    )]
747    #[allow(deprecated)]
748    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
749        self.semver()
750            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
751    }
752
753    /// Check if this is a Lance library version older than the given major/minor/patch.
754    ///
755    /// # Panics
756    ///
757    /// Panics if the library is not "lance" or the version cannot be parsed as semver.
758    #[deprecated(note = "Use `lance_lib_version()` and its `older_than` method instead.")]
759    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
760        let version = self
761            .lance_lib_version()
762            .expect("Not lance library or invalid version");
763        let other = semver::Version {
764            major: major.into(),
765            minor: minor.into(),
766            patch: patch.into(),
767            pre: semver::Prerelease::EMPTY,
768            build: semver::BuildMetadata::EMPTY,
769        };
770        version < other
771    }
772
773    #[deprecated(note = "This is meant for testing and will be made private in future version.")]
774    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
775        let mut version = self.lance_lib_version().expect("Should be lance version");
776        bump_version(&mut version, part);
777        if !keep_tag {
778            version.pre = semver::Prerelease::EMPTY;
779        }
780        let (clean_version, prerelease, build_metadata) = Self::split_version(&version.to_string())
781            .expect("Bumped version should be valid semver");
782        Self {
783            library: self.library.clone(),
784            version: clean_version,
785            prerelease,
786            build_metadata,
787        }
788    }
789}
790
791impl Default for WriterVersion {
792    #[cfg(not(test))]
793    fn default() -> Self {
794        let full_version = env!("CARGO_PKG_VERSION");
795        let (version, prerelease, build_metadata) =
796            Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
797        Self {
798            library: "lance".to_string(),
799            version,
800            prerelease,
801            build_metadata,
802        }
803    }
804
805    // Unit tests always run as if they are in the next version.
806    #[cfg(test)]
807    #[allow(deprecated)]
808    fn default() -> Self {
809        let full_version = env!("CARGO_PKG_VERSION");
810        let (version, prerelease, build_metadata) =
811            Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
812        Self {
813            library: "lance".to_string(),
814            version,
815            prerelease,
816            build_metadata,
817        }
818        .bump(VersionPart::Patch, true)
819    }
820}
821
822impl ProtoStruct for Manifest {
823    type Proto = pb::Manifest;
824}
825
826impl From<pb::BasePath> for BasePath {
827    fn from(p: pb::BasePath) -> Self {
828        Self::new(p.id, p.path, p.name, p.is_dataset_root)
829    }
830}
831
832impl From<BasePath> for pb::BasePath {
833    fn from(p: BasePath) -> Self {
834        Self {
835            id: p.id,
836            name: p.name,
837            is_dataset_root: p.is_dataset_root,
838            path: p.path,
839        }
840    }
841}
842
843impl TryFrom<pb::Manifest> for Manifest {
844    type Error = Error;
845
846    fn try_from(p: pb::Manifest) -> Result<Self> {
847        let timestamp_nanos = p.timestamp.map(|ts| {
848            let sec = ts.seconds as u128 * 1e9 as u128;
849            let nanos = ts.nanos as u128;
850            sec + nanos
851        });
852        // We only use the writer version if it is fully set.
853        let writer_version = match p.writer_version {
854            Some(pb::manifest::WriterVersion {
855                library,
856                version,
857                prerelease,
858                build_metadata,
859            }) => Some(WriterVersion {
860                library,
861                version,
862                prerelease,
863                build_metadata,
864            }),
865            _ => None,
866        };
867        let mut interner = DataFileFieldInterner::default();
868        let fragments = Arc::new(
869            p.fragments
870                .into_iter()
871                .map(|f| interner.intern_fragment(f))
872                .collect::<Result<Vec<_>>>()?,
873        );
874        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
875        let fields_with_meta = FieldsWithMeta {
876            fields: Fields(p.fields),
877            metadata: p.schema_metadata,
878        };
879
880        if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
881            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
882        {
883            return Err(Error::internal("All fragments must have row ids"));
884        }
885
886        let data_storage_format = match p.data_format {
887            None => {
888                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
889                    // If there are fragments, they are a better indicator
890                    DataStorageFormat::new(inferred_version)
891                } else {
892                    // No fragments to inspect, best we can do is look at writer flags
893                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
894                        DataStorageFormat::new(LanceFileVersion::Stable)
895                    } else {
896                        DataStorageFormat::new(LanceFileVersion::Legacy)
897                    }
898                }
899            }
900            Some(format) => DataStorageFormat::from(format),
901        };
902
903        let schema = Schema::from(fields_with_meta);
904
905        Ok(Self {
906            schema,
907            version: p.version,
908            branch: p.branch,
909            writer_version,
910            version_aux_data: p.version_aux_data as usize,
911            index_section: p.index_section.map(|i| i as usize),
912            timestamp_nanos: timestamp_nanos.unwrap_or(0),
913            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
914            reader_feature_flags: p.reader_feature_flags,
915            writer_feature_flags: p.writer_feature_flags,
916            max_fragment_id: p.max_fragment_id,
917            fragments,
918            transaction_file: if p.transaction_file.is_empty() {
919                None
920            } else {
921                Some(p.transaction_file)
922            },
923            transaction_section: p.transaction_section.map(|i| i as usize),
924            fragment_offsets,
925            next_row_id: p.next_row_id,
926            data_storage_format,
927            config: p.config,
928            table_metadata: p.table_metadata,
929            base_paths: p
930                .base_paths
931                .iter()
932                .map(|item| (item.id, item.clone().into()))
933                .collect(),
934        })
935    }
936}
937
938impl From<&Manifest> for pb::Manifest {
939    fn from(m: &Manifest) -> Self {
940        let timestamp_nanos = if m.timestamp_nanos == 0 {
941            None
942        } else {
943            let nanos = m.timestamp_nanos % 1e9 as u128;
944            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
945            Some(Timestamp {
946                seconds,
947                nanos: nanos as i32,
948            })
949        };
950        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
951        Self {
952            fields: fields_with_meta.fields.0,
953            schema_metadata: m
954                .schema
955                .metadata
956                .iter()
957                .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
958                .collect(),
959            version: m.version,
960            branch: m.branch.clone(),
961            writer_version: m
962                .writer_version
963                .as_ref()
964                .map(|wv| pb::manifest::WriterVersion {
965                    library: wv.library.clone(),
966                    version: wv.version.clone(),
967                    prerelease: wv.prerelease.clone(),
968                    build_metadata: wv.build_metadata.clone(),
969                }),
970            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
971            table_metadata: m.table_metadata.clone(),
972            version_aux_data: m.version_aux_data as u64,
973            index_section: m.index_section.map(|i| i as u64),
974            timestamp: timestamp_nanos,
975            tag: m.tag.clone().unwrap_or_default(),
976            reader_feature_flags: m.reader_feature_flags,
977            writer_feature_flags: m.writer_feature_flags,
978            max_fragment_id: m.max_fragment_id,
979            transaction_file: m.transaction_file.clone().unwrap_or_default(),
980            next_row_id: m.next_row_id,
981            data_format: Some(pb::manifest::DataStorageFormat {
982                file_format: m.data_storage_format.file_format.clone(),
983                version: m.data_storage_format.version.clone(),
984            }),
985            config: m.config.clone(),
986            base_paths: m
987                .base_paths
988                .values()
989                .map(|base_path| pb::BasePath {
990                    id: base_path.id,
991                    name: base_path.name.clone(),
992                    is_dataset_root: base_path.is_dataset_root,
993                    path: base_path.path.clone(),
994                })
995                .collect(),
996            transaction_section: m.transaction_section.map(|i| i as u64),
997        }
998    }
999}
1000
1001#[async_trait]
1002pub trait SelfDescribingFileReader {
1003    /// Open a file reader without any cached schema
1004    ///
1005    /// In this case the schema will first need to be loaded
1006    /// from the file itself.
1007    ///
1008    /// When loading files from a dataset it is preferable to use
1009    /// the fragment reader to avoid this overhead.
1010    async fn try_new_self_described(
1011        object_store: &ObjectStore,
1012        path: &Path,
1013        cache: Option<&LanceCache>,
1014    ) -> Result<Self>
1015    where
1016        Self: Sized,
1017    {
1018        let reader = object_store.open(path).await?;
1019        Self::try_new_self_described_from_reader(reader.into(), cache).await
1020    }
1021
1022    async fn try_new_self_described_from_reader(
1023        reader: Arc<dyn Reader>,
1024        cache: Option<&LanceCache>,
1025    ) -> Result<Self>
1026    where
1027        Self: Sized;
1028}
1029
1030#[async_trait]
1031impl SelfDescribingFileReader for PreviousFileReader {
1032    async fn try_new_self_described_from_reader(
1033        reader: Arc<dyn Reader>,
1034        cache: Option<&LanceCache>,
1035    ) -> Result<Self> {
1036        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
1037        let manifest_position = metadata.manifest_position.ok_or(Error::internal(format!(
1038            "Attempt to open file at {} as self-describing but it did not contain a manifest",
1039            reader.path(),
1040        )))?;
1041        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
1042        if manifest.should_use_legacy_format() {
1043            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1044        }
1045        let schema = manifest.schema;
1046        let max_field_id = schema.max_field_id().unwrap_or_default();
1047        Self::try_new_from_reader(
1048            reader.path(),
1049            reader.clone(),
1050            Some(metadata),
1051            schema,
1052            0,
1053            0,
1054            max_field_id,
1055            cache,
1056        )
1057        .await
1058    }
1059}
1060
1061#[cfg(test)]
1062mod tests {
1063    use crate::format::{DataFile, DeletionFile, DeletionFileType};
1064    use std::num::NonZero;
1065
1066    use super::*;
1067
1068    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
1069    use lance_core::datatypes::Field;
1070
1071    #[test]
1072    fn test_writer_version() {
1073        let wv = WriterVersion::default();
1074        assert_eq!(wv.library, "lance");
1075
1076        // Parse the actual cargo version to check if it has a pre-release tag
1077        let cargo_version = env!("CARGO_PKG_VERSION");
1078        let expected_tag = if cargo_version.contains('-') {
1079            Some(cargo_version.split('-').nth(1).unwrap())
1080        } else {
1081            None
1082        };
1083
1084        // Verify the version field contains only major.minor.patch
1085        let version_parts: Vec<&str> = wv.version.split('.').collect();
1086        assert_eq!(
1087            version_parts.len(),
1088            3,
1089            "Version should be major.minor.patch"
1090        );
1091        assert!(
1092            !wv.version.contains('-'),
1093            "Version field should not contain prerelease"
1094        );
1095
1096        // Verify the prerelease field matches the expected tag
1097        assert_eq!(wv.prerelease.as_deref(), expected_tag);
1098        // Build metadata should be None for default version
1099        assert_eq!(wv.build_metadata, None);
1100
1101        // Verify lance_lib_version() reconstructs the full semver correctly
1102        let version = wv.lance_lib_version().unwrap();
1103        assert_eq!(
1104            version.major,
1105            env!("CARGO_PKG_VERSION_MAJOR").parse::<u64>().unwrap()
1106        );
1107        assert_eq!(
1108            version.minor,
1109            env!("CARGO_PKG_VERSION_MINOR").parse::<u64>().unwrap()
1110        );
1111        assert_eq!(
1112            version.patch,
1113            // Unit tests run against (major,minor,patch + 1)
1114            env!("CARGO_PKG_VERSION_PATCH").parse::<u64>().unwrap() + 1
1115        );
1116        assert_eq!(version.pre.as_str(), expected_tag.unwrap_or(""));
1117
1118        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1119            let mut bumped_version = version.clone();
1120            bump_version(&mut bumped_version, *part);
1121            assert!(version < bumped_version);
1122        }
1123    }
1124
1125    #[test]
1126    fn test_writer_version_split() {
1127        // Test splitting version with prerelease
1128        let (version, prerelease, build_metadata) =
1129            WriterVersion::split_version("2.0.0-rc.1").unwrap();
1130        assert_eq!(version, "2.0.0");
1131        assert_eq!(prerelease, Some("rc.1".to_string()));
1132        assert_eq!(build_metadata, None);
1133
1134        // Test splitting version without prerelease
1135        let (version, prerelease, build_metadata) = WriterVersion::split_version("2.0.0").unwrap();
1136        assert_eq!(version, "2.0.0");
1137        assert_eq!(prerelease, None);
1138        assert_eq!(build_metadata, None);
1139
1140        // Test splitting version with prerelease and build metadata
1141        let (version, prerelease, build_metadata) =
1142            WriterVersion::split_version("2.0.0-rc.1+build.123").unwrap();
1143        assert_eq!(version, "2.0.0");
1144        assert_eq!(prerelease, Some("rc.1".to_string()));
1145        assert_eq!(build_metadata, Some("build.123".to_string()));
1146
1147        // Test splitting version with only build metadata
1148        let (version, prerelease, build_metadata) =
1149            WriterVersion::split_version("2.0.0+build.123").unwrap();
1150        assert_eq!(version, "2.0.0");
1151        assert_eq!(prerelease, None);
1152        assert_eq!(build_metadata, Some("build.123".to_string()));
1153
1154        // Test with invalid version returns None
1155        assert!(WriterVersion::split_version("not-a-version").is_none());
1156    }
1157
1158    #[test]
1159    fn test_writer_version_comparison_with_prerelease() {
1160        let v1 = WriterVersion {
1161            library: "lance".to_string(),
1162            version: "2.0.0".to_string(),
1163            prerelease: Some("rc.1".to_string()),
1164            build_metadata: None,
1165        };
1166
1167        let v2 = WriterVersion {
1168            library: "lance".to_string(),
1169            version: "2.0.0".to_string(),
1170            prerelease: None,
1171            build_metadata: None,
1172        };
1173
1174        let semver1 = v1.lance_lib_version().unwrap();
1175        let semver2 = v2.lance_lib_version().unwrap();
1176
1177        // rc.1 should be less than the release version
1178        assert!(semver1 < semver2);
1179    }
1180
1181    #[test]
1182    fn test_writer_version_with_build_metadata() {
1183        let v = WriterVersion {
1184            library: "lance".to_string(),
1185            version: "2.0.0".to_string(),
1186            prerelease: Some("rc.1".to_string()),
1187            build_metadata: Some("build.123".to_string()),
1188        };
1189
1190        let semver = v.lance_lib_version().unwrap();
1191        assert_eq!(semver.to_string(), "2.0.0-rc.1+build.123");
1192        assert_eq!(semver.major, 2);
1193        assert_eq!(semver.minor, 0);
1194        assert_eq!(semver.patch, 0);
1195        assert_eq!(semver.pre.as_str(), "rc.1");
1196        assert_eq!(semver.build.as_str(), "build.123");
1197    }
1198
1199    #[test]
1200    fn test_writer_version_non_semver() {
1201        // Test that Lance library can have non-semver version strings
1202        let v = WriterVersion {
1203            library: "lance".to_string(),
1204            version: "custom-build-v1".to_string(),
1205            prerelease: None,
1206            build_metadata: None,
1207        };
1208
1209        // lance_lib_version should return None for non-semver
1210        assert!(v.lance_lib_version().is_none());
1211
1212        // But the WriterVersion itself should still be valid and usable
1213        assert_eq!(v.library, "lance");
1214        assert_eq!(v.version, "custom-build-v1");
1215    }
1216
1217    #[test]
1218    #[allow(deprecated)]
1219    fn test_older_than_with_prerelease() {
1220        // Test that older_than correctly handles prerelease
1221        let v_rc = WriterVersion {
1222            library: "lance".to_string(),
1223            version: "2.0.0".to_string(),
1224            prerelease: Some("rc.1".to_string()),
1225            build_metadata: None,
1226        };
1227
1228        // 2.0.0-rc.1 should be older than 2.0.0
1229        assert!(v_rc.older_than(2, 0, 0));
1230
1231        // 2.0.0-rc.1 should be older than 2.0.1
1232        assert!(v_rc.older_than(2, 0, 1));
1233
1234        // 2.0.0-rc.1 should not be older than 1.9.9
1235        assert!(!v_rc.older_than(1, 9, 9));
1236
1237        let v_release = WriterVersion {
1238            library: "lance".to_string(),
1239            version: "2.0.0".to_string(),
1240            prerelease: None,
1241            build_metadata: None,
1242        };
1243
1244        // 2.0.0 should not be older than 2.0.0
1245        assert!(!v_release.older_than(2, 0, 0));
1246
1247        // 2.0.0 should be older than 2.0.1
1248        assert!(v_release.older_than(2, 0, 1));
1249    }
1250
1251    #[test]
1252    fn test_fragments_by_offset_range() {
1253        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1254            "a",
1255            arrow_schema::DataType::Int64,
1256            false,
1257        )]);
1258        let schema = Schema::try_from(&arrow_schema).unwrap();
1259        let fragments = vec![
1260            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1261            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1262            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1263        ];
1264        let manifest = Manifest::new(
1265            schema,
1266            Arc::new(fragments),
1267            DataStorageFormat::default(),
1268            HashMap::new(),
1269        );
1270
1271        let actual = manifest.fragments_by_offset_range(0..10);
1272        assert_eq!(actual.len(), 1);
1273        assert_eq!(actual[0].0, 0);
1274        assert_eq!(actual[0].1.id, 0);
1275
1276        let actual = manifest.fragments_by_offset_range(5..15);
1277        assert_eq!(actual.len(), 2);
1278        assert_eq!(actual[0].0, 0);
1279        assert_eq!(actual[0].1.id, 0);
1280        assert_eq!(actual[1].0, 10);
1281        assert_eq!(actual[1].1.id, 1);
1282
1283        let actual = manifest.fragments_by_offset_range(15..50);
1284        assert_eq!(actual.len(), 2);
1285        assert_eq!(actual[0].0, 10);
1286        assert_eq!(actual[0].1.id, 1);
1287        assert_eq!(actual[1].0, 25);
1288        assert_eq!(actual[1].1.id, 2);
1289
1290        // Out of range
1291        let actual = manifest.fragments_by_offset_range(45..100);
1292        assert!(actual.is_empty());
1293
1294        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1295    }
1296
1297    #[test]
1298    fn test_max_field_id() {
1299        // Validate that max field id handles varying field ids by fragment.
1300        let mut field0 =
1301            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1302        field0.set_id(-1, &mut 0);
1303        let mut field2 =
1304            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1305        field2.set_id(-1, &mut 2);
1306
1307        let schema = Schema {
1308            fields: vec![field0, field2],
1309            metadata: Default::default(),
1310        };
1311        let fragments = vec![
1312            Fragment {
1313                id: 0,
1314                files: vec![DataFile::new_legacy_from_fields(
1315                    "path1",
1316                    vec![0, 1, 2],
1317                    None,
1318                )],
1319                deletion_file: None,
1320                row_id_meta: None,
1321                physical_rows: None,
1322                created_at_version_meta: None,
1323                last_updated_at_version_meta: None,
1324            },
1325            Fragment {
1326                id: 1,
1327                files: vec![
1328                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1329                    DataFile::new_legacy_from_fields("path3", vec![2], None),
1330                ],
1331                deletion_file: None,
1332                row_id_meta: None,
1333                physical_rows: None,
1334                created_at_version_meta: None,
1335                last_updated_at_version_meta: None,
1336            },
1337        ];
1338
1339        let manifest = Manifest::new(
1340            schema,
1341            Arc::new(fragments),
1342            DataStorageFormat::default(),
1343            HashMap::new(),
1344        );
1345
1346        assert_eq!(manifest.max_field_id(), 43);
1347    }
1348
1349    #[test]
1350    fn test_config() {
1351        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1352            "a",
1353            arrow_schema::DataType::Int64,
1354            false,
1355        )]);
1356        let schema = Schema::try_from(&arrow_schema).unwrap();
1357        let fragments = vec![
1358            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1359            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1360            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1361        ];
1362        let mut manifest = Manifest::new(
1363            schema,
1364            Arc::new(fragments),
1365            DataStorageFormat::default(),
1366            HashMap::new(),
1367        );
1368
1369        let mut config = manifest.config.clone();
1370        config.insert("lance.test".to_string(), "value".to_string());
1371        config.insert("other-key".to_string(), "other-value".to_string());
1372
1373        manifest.config_mut().extend(config.clone());
1374        assert_eq!(manifest.config, config.clone());
1375
1376        config.remove("other-key");
1377        manifest.config_mut().remove("other-key");
1378        assert_eq!(manifest.config, config);
1379    }
1380
1381    #[test]
1382    fn test_manifest_summary() {
1383        // Step 1: test empty manifest summary
1384        let arrow_schema = ArrowSchema::new(vec![
1385            ArrowField::new("id", arrow_schema::DataType::Int64, false),
1386            ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1387        ]);
1388        let schema = Schema::try_from(&arrow_schema).unwrap();
1389
1390        let empty_manifest = Manifest::new(
1391            schema.clone(),
1392            Arc::new(vec![]),
1393            DataStorageFormat::default(),
1394            HashMap::new(),
1395        );
1396
1397        let empty_summary = empty_manifest.summary();
1398        assert_eq!(empty_summary.total_rows, 0);
1399        assert_eq!(empty_summary.total_files_size, 0);
1400        assert_eq!(empty_summary.total_fragments, 0);
1401        assert_eq!(empty_summary.total_data_files, 0);
1402        assert_eq!(empty_summary.total_deletion_file_rows, 0);
1403        assert_eq!(empty_summary.total_data_file_rows, 0);
1404        assert_eq!(empty_summary.total_deletion_files, 0);
1405
1406        // Step 2: write empty files and verify summary
1407        let empty_fragments = vec![
1408            Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1409            Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1410        ];
1411
1412        let empty_files_manifest = Manifest::new(
1413            schema.clone(),
1414            Arc::new(empty_fragments),
1415            DataStorageFormat::default(),
1416            HashMap::new(),
1417        );
1418
1419        let empty_files_summary = empty_files_manifest.summary();
1420        assert_eq!(empty_files_summary.total_rows, 0);
1421        assert_eq!(empty_files_summary.total_files_size, 0);
1422        assert_eq!(empty_files_summary.total_fragments, 2);
1423        assert_eq!(empty_files_summary.total_data_files, 2);
1424        assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1425        assert_eq!(empty_files_summary.total_data_file_rows, 0);
1426        assert_eq!(empty_files_summary.total_deletion_files, 0);
1427
1428        // Step 3: write real data and verify summary
1429        let real_fragments = vec![
1430            Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1431            Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1432            Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1433        ];
1434
1435        let real_data_manifest = Manifest::new(
1436            schema.clone(),
1437            Arc::new(real_fragments),
1438            DataStorageFormat::default(),
1439            HashMap::new(),
1440        );
1441
1442        let real_data_summary = real_data_manifest.summary();
1443        assert_eq!(real_data_summary.total_rows, 425); // 100 + 250 + 75
1444        assert_eq!(real_data_summary.total_files_size, 0); // Zero for unknown
1445        assert_eq!(real_data_summary.total_fragments, 3);
1446        assert_eq!(real_data_summary.total_data_files, 3);
1447        assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1448        assert_eq!(real_data_summary.total_data_file_rows, 425);
1449        assert_eq!(real_data_summary.total_deletion_files, 0);
1450
1451        let file_version = LanceFileVersion::default();
1452        // Step 4: write deletion files and verify summary
1453        let mut fragment_with_deletion = Fragment::new(0)
1454            .with_file(
1455                "data_with_deletion.lance",
1456                vec![0, 1],
1457                vec![0, 1],
1458                &file_version,
1459                NonZero::new(1000),
1460            )
1461            .with_physical_rows(50);
1462        fragment_with_deletion.deletion_file = Some(DeletionFile {
1463            read_version: 123,
1464            id: 456,
1465            file_type: DeletionFileType::Array,
1466            num_deleted_rows: Some(10),
1467            base_id: None,
1468        });
1469
1470        let manifest_with_deletion = Manifest::new(
1471            schema,
1472            Arc::new(vec![fragment_with_deletion]),
1473            DataStorageFormat::default(),
1474            HashMap::new(),
1475        );
1476
1477        let deletion_summary = manifest_with_deletion.summary();
1478        assert_eq!(deletion_summary.total_rows, 40); // 50 - 10
1479        assert_eq!(deletion_summary.total_files_size, 1000);
1480        assert_eq!(deletion_summary.total_fragments, 1);
1481        assert_eq!(deletion_summary.total_data_files, 1);
1482        assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1483        assert_eq!(deletion_summary.total_data_file_rows, 50);
1484        assert_eq!(deletion_summary.total_deletion_files, 1);
1485
1486        //Just verify the transformation is OK
1487        let stats_map: BTreeMap<String, String> = deletion_summary.into();
1488        assert_eq!(stats_map.len(), 7)
1489    }
1490}