lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
8use lance_file::reader::FileReader;
9use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::{Schema, StorageClass};
23use lance_core::{Error, Result};
24use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
25use lance_io::utils::read_struct;
26use snafu::location;
27
28/// Manifest of a dataset
29///
30///  * Schema
31///  * Version
32///  * Fragments.
33///  * Indices.
34#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36    /// Dataset schema.
37    pub schema: Schema,
38
39    /// Local schema, only containing fields with the default storage class (not blobs)
40    pub local_schema: Schema,
41
42    /// Dataset version
43    pub version: u64,
44
45    /// Branch name, None if the dataset is the main branch.
46    pub branch: Option<String>,
47
48    /// Version of the writer library that wrote this manifest.
49    pub writer_version: Option<WriterVersion>,
50
51    /// Fragments, the pieces to build the dataset.
52    ///
53    /// This list is stored in order, sorted by fragment id.  However, the fragment id
54    /// sequence may have gaps.
55    pub fragments: Arc<Vec<Fragment>>,
56
57    /// The file position of the version aux data.
58    pub version_aux_data: usize,
59
60    /// The file position of the index metadata.
61    pub index_section: Option<usize>,
62
63    /// The creation timestamp with nanosecond resolution as 128-bit integer
64    pub timestamp_nanos: u128,
65
66    /// An optional string tag for this version
67    pub tag: Option<String>,
68
69    /// The reader flags
70    pub reader_feature_flags: u64,
71
72    /// The writer flags
73    pub writer_feature_flags: u64,
74
75    /// The max fragment id used so far
76    /// None means never set, Some(0) means max ID used so far is 0
77    pub max_fragment_id: Option<u32>,
78
79    /// The path to the transaction file, relative to the root of the dataset
80    pub transaction_file: Option<String>,
81
82    /// Precomputed logic offset of each fragment
83    /// accelerating the fragment search using offset ranges.
84    fragment_offsets: Vec<usize>,
85
86    /// The max row id used so far.
87    pub next_row_id: u64,
88
89    /// The storage format of the data files.
90    pub data_storage_format: DataStorageFormat,
91
92    /// Table configuration.
93    pub config: HashMap<String, String>,
94
95    /// Table metadata.
96    ///
97    /// This is a key-value map that can be used to store arbitrary metadata
98    /// associated with the table. This is different than configuration, which
99    /// is used to tell libraries how to read, write, or manage the table.
100    pub table_metadata: HashMap<String, String>,
101
102    /// Blob dataset version
103    pub blob_dataset_version: Option<u64>,
104
105    /* external base paths */
106    pub base_paths: HashMap<u32, BasePath>,
107}
108
109// We use the most significant bit to indicate that a transaction is detached
110pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
111
112pub fn is_detached_version(version: u64) -> bool {
113    version & DETACHED_VERSION_MASK != 0
114}
115
116fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
117    fragments
118        .iter()
119        .map(|f| f.num_rows().unwrap_or_default())
120        .chain([0]) // Make the last offset to be the full-length of the dataset.
121        .scan(0_usize, |offset, len| {
122            let start = *offset;
123            *offset += len;
124            Some(start)
125        })
126        .collect()
127}
128
129#[derive(Default)]
130pub struct ManifestSummary {
131    pub total_fragments: u64,
132    pub total_data_files: u64,
133    pub total_files_size: u64,
134    pub total_deletion_files: u64,
135    pub total_data_file_rows: u64,
136    pub total_deletion_file_rows: u64,
137    pub total_rows: u64,
138}
139
140impl From<ManifestSummary> for BTreeMap<String, String> {
141    fn from(summary: ManifestSummary) -> Self {
142        let mut stats_map = Self::new();
143        stats_map.insert(
144            "total_fragments".to_string(),
145            summary.total_fragments.to_string(),
146        );
147        stats_map.insert(
148            "total_data_files".to_string(),
149            summary.total_data_files.to_string(),
150        );
151        stats_map.insert(
152            "total_files_size".to_string(),
153            summary.total_files_size.to_string(),
154        );
155        stats_map.insert(
156            "total_deletion_files".to_string(),
157            summary.total_deletion_files.to_string(),
158        );
159        stats_map.insert(
160            "total_data_file_rows".to_string(),
161            summary.total_data_file_rows.to_string(),
162        );
163        stats_map.insert(
164            "total_deletion_file_rows".to_string(),
165            summary.total_deletion_file_rows.to_string(),
166        );
167        stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
168        stats_map
169    }
170}
171
172impl Manifest {
173    pub fn new(
174        schema: Schema,
175        fragments: Arc<Vec<Fragment>>,
176        data_storage_format: DataStorageFormat,
177        blob_dataset_version: Option<u64>,
178        base_paths: HashMap<u32, BasePath>,
179    ) -> Self {
180        let fragment_offsets = compute_fragment_offsets(&fragments);
181        let local_schema = schema.retain_storage_class(StorageClass::Default);
182
183        Self {
184            schema,
185            local_schema,
186            version: 1,
187            branch: None,
188            writer_version: Some(WriterVersion::default()),
189            fragments,
190            version_aux_data: 0,
191            index_section: None,
192            timestamp_nanos: 0,
193            tag: None,
194            reader_feature_flags: 0,
195            writer_feature_flags: 0,
196            max_fragment_id: None,
197            transaction_file: None,
198            fragment_offsets,
199            next_row_id: 0,
200            data_storage_format,
201            config: HashMap::new(),
202            table_metadata: HashMap::new(),
203            blob_dataset_version,
204            base_paths,
205        }
206    }
207
208    pub fn new_from_previous(
209        previous: &Self,
210        schema: Schema,
211        fragments: Arc<Vec<Fragment>>,
212        new_blob_version: Option<u64>,
213    ) -> Self {
214        let fragment_offsets = compute_fragment_offsets(&fragments);
215        let local_schema = schema.retain_storage_class(StorageClass::Default);
216
217        let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
218
219        Self {
220            schema,
221            local_schema,
222            version: previous.version + 1,
223            branch: previous.branch.clone(),
224            writer_version: Some(WriterVersion::default()),
225            fragments,
226            version_aux_data: 0,
227            index_section: None, // Caller should update index if they want to keep them.
228            timestamp_nanos: 0,  // This will be set on commit
229            tag: None,
230            reader_feature_flags: 0, // These will be set on commit
231            writer_feature_flags: 0, // These will be set on commit
232            max_fragment_id: previous.max_fragment_id,
233            transaction_file: None,
234            fragment_offsets,
235            next_row_id: previous.next_row_id,
236            data_storage_format: previous.data_storage_format.clone(),
237            config: previous.config.clone(),
238            table_metadata: previous.table_metadata.clone(),
239            blob_dataset_version,
240            base_paths: previous.base_paths.clone(),
241        }
242    }
243
244    /// Performs a shallow_clone of the manifest entirely in memory without:
245    /// - Any persistent storage operations
246    /// - Modifications to the original data
247    /// - If the shallow clone is for branch, ref_name is the source branch
248    pub fn shallow_clone(
249        &self,
250        ref_name: Option<String>,
251        ref_path: String,
252        ref_base_id: u32,
253        branch_name: Option<String>,
254        transaction_file: String,
255    ) -> Self {
256        let cloned_fragments = self
257            .fragments
258            .as_ref()
259            .iter()
260            .map(|fragment| {
261                let mut cloned_fragment = fragment.clone();
262                for file in &mut cloned_fragment.files {
263                    if file.base_id.is_none() {
264                        file.base_id = Some(ref_base_id);
265                    }
266                }
267
268                if let Some(deletion) = &mut cloned_fragment.deletion_file {
269                    if deletion.base_id.is_none() {
270                        deletion.base_id = Some(ref_base_id);
271                    }
272                }
273                cloned_fragment
274            })
275            .collect::<Vec<_>>();
276
277        Self {
278            schema: self.schema.clone(),
279            local_schema: self.local_schema.clone(),
280            version: self.version,
281            branch: branch_name,
282            writer_version: self.writer_version.clone(),
283            fragments: Arc::new(cloned_fragments),
284            version_aux_data: self.version_aux_data,
285            index_section: None, // These will be set on commit
286            timestamp_nanos: self.timestamp_nanos,
287            tag: None,
288            reader_feature_flags: 0, // These will be set on commit
289            writer_feature_flags: 0, // These will be set on commit
290            max_fragment_id: self.max_fragment_id,
291            transaction_file: Some(transaction_file),
292            fragment_offsets: self.fragment_offsets.clone(),
293            next_row_id: self.next_row_id,
294            data_storage_format: self.data_storage_format.clone(),
295            config: self.config.clone(),
296            blob_dataset_version: self.blob_dataset_version,
297            base_paths: {
298                let mut base_paths = self.base_paths.clone();
299                let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
300                base_paths.insert(ref_base_id, base_path);
301                base_paths
302            },
303            table_metadata: self.table_metadata.clone(),
304        }
305    }
306
307    /// Return the `timestamp_nanos` value as a Utc DateTime
308    pub fn timestamp(&self) -> DateTime<Utc> {
309        let nanos = self.timestamp_nanos % 1_000_000_000;
310        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
311        Utc.from_utc_datetime(
312            &DateTime::from_timestamp(seconds, nanos as u32)
313                .unwrap_or_default()
314                .naive_utc(),
315        )
316    }
317
318    /// Set the `timestamp_nanos` value from a Utc DateTime
319    pub fn set_timestamp(&mut self, nanos: u128) {
320        self.timestamp_nanos = nanos;
321    }
322
323    /// Get a mutable reference to the config
324    pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
325        &mut self.config
326    }
327
328    /// Get a mutable reference to the table metadata
329    pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
330        &mut self.table_metadata
331    }
332
333    /// Get a mutable reference to the schema metadata
334    pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
335        &mut self.schema.metadata
336    }
337
338    /// Get a mutable reference to the field metadata for a specific field id
339    ///
340    /// Returns None if the field does not exist in the schema.
341    pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
342        self.schema
343            .field_by_id_mut(field_id)
344            .map(|field| &mut field.metadata)
345    }
346
347    /// Set the `config` from an iterator
348    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
349    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
350        self.config.extend(upsert_values);
351    }
352
353    /// Delete `config` keys using a slice of keys
354    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
355    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
356        self.config
357            .retain(|key, _| !delete_keys.contains(&key.as_str()));
358    }
359
360    /// Replaces the schema metadata with the given key-value pairs.
361    #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
362    pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
363        self.schema.metadata = new_metadata;
364    }
365
366    /// Replaces the metadata of the field with the given id with the given key-value pairs.
367    ///
368    /// If the field does not exist in the schema, this is a no-op.
369    #[deprecated(
370        note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
371    )]
372    pub fn replace_field_metadata(
373        &mut self,
374        field_id: i32,
375        new_metadata: HashMap<String, String>,
376    ) -> Result<()> {
377        if let Some(field) = self.schema.field_by_id_mut(field_id) {
378            field.metadata = new_metadata;
379            Ok(())
380        } else {
381            Err(Error::invalid_input(
382                format!(
383                    "Field with id {} does not exist for replace_field_metadata",
384                    field_id
385                ),
386                location!(),
387            ))
388        }
389    }
390
391    /// Check the current fragment list and update the high water mark
392    pub fn update_max_fragment_id(&mut self) {
393        // If there are no fragments, don't update max_fragment_id
394        if self.fragments.is_empty() {
395            return;
396        }
397
398        let max_fragment_id = self
399            .fragments
400            .iter()
401            .map(|f| f.id)
402            .max()
403            .unwrap() // Safe because we checked fragments is not empty
404            .try_into()
405            .unwrap();
406
407        match self.max_fragment_id {
408            None => {
409                // First time being set
410                self.max_fragment_id = Some(max_fragment_id);
411            }
412            Some(current_max) => {
413                // Only update if the computed max is greater than current
414                // This preserves the high water mark even when fragments are deleted
415                if max_fragment_id > current_max {
416                    self.max_fragment_id = Some(max_fragment_id);
417                }
418            }
419        }
420    }
421
422    /// Return the max fragment id.
423    /// Note this does not support recycling of fragment ids.
424    ///
425    /// This will return None if there are no fragments and max_fragment_id was never set.
426    pub fn max_fragment_id(&self) -> Option<u64> {
427        if let Some(max_id) = self.max_fragment_id {
428            // Return the stored high water mark
429            Some(max_id.into())
430        } else {
431            // Not yet set, compute from fragment list
432            self.fragments.iter().map(|f| f.id).max()
433        }
434    }
435
436    /// Get the max used field id
437    ///
438    /// This is different than [Schema::max_field_id] because it also considers
439    /// the field ids in the data files that have been dropped from the schema.
440    pub fn max_field_id(&self) -> i32 {
441        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
442        let fragment_max_id = self
443            .fragments
444            .iter()
445            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
446            .max()
447            .copied();
448        let fragment_max_id = fragment_max_id.unwrap_or(-1);
449        schema_max_id.max(fragment_max_id)
450    }
451
452    /// Return the fragments that are newer than the given manifest.
453    /// Note this does not support recycling of fragment ids.
454    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
455        if since.version >= self.version {
456            return Err(Error::io(
457                format!(
458                    "fragments_since: given version {} is newer than manifest version {}",
459                    since.version, self.version
460                ),
461                location!(),
462            ));
463        }
464        let start = since.max_fragment_id();
465        Ok(self
466            .fragments
467            .iter()
468            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
469            .cloned()
470            .collect())
471    }
472
473    /// Find the fragments that contain the rows, identified by the offset range.
474    ///
475    /// Note that the offsets are the logical offsets of rows, not row IDs.
476    ///
477    ///
478    /// Parameters
479    /// ----------
480    /// range: Range<usize>
481    ///     Offset range
482    ///
483    /// Returns
484    /// -------
485    /// Vec<(usize, Fragment)>
486    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
487    ///
488    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
489        let start = range.start;
490        let end = range.end;
491        let idx = self
492            .fragment_offsets
493            .binary_search(&start)
494            .unwrap_or_else(|idx| idx - 1);
495
496        let mut fragments = vec![];
497        for i in idx..self.fragments.len() {
498            if self.fragment_offsets[i] >= end
499                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
500                    <= start
501            {
502                break;
503            }
504            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
505        }
506
507        fragments
508    }
509
510    /// Whether the dataset uses stable row ids.
511    pub fn uses_stable_row_ids(&self) -> bool {
512        self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
513    }
514
515    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
516    /// and can be used to create a dataset
517    pub fn serialized(&self) -> Vec<u8> {
518        let pb_manifest: pb::Manifest = self.into();
519        pb_manifest.encode_to_vec()
520    }
521
522    pub fn should_use_legacy_format(&self) -> bool {
523        self.data_storage_format.version == LEGACY_FORMAT_VERSION
524    }
525
526    /// Get the summary information of a manifest.
527    ///
528    /// This function calculates various statistics about the manifest, including:
529    /// - total_files_size: Total size of all data files in bytes
530    /// - total_fragments: Total number of fragments in the dataset
531    /// - total_data_files: Total number of data files across all fragments
532    /// - total_deletion_files: Total number of deletion files
533    /// - total_data_file_rows: Total number of rows in data files
534    /// - total_deletion_file_rows: Total number of deleted rows in deletion files
535    /// - total_rows: Total number of rows in the dataset
536    pub fn summary(&self) -> ManifestSummary {
537        // Calculate total fragments
538        let mut summary =
539            self.fragments
540                .iter()
541                .fold(ManifestSummary::default(), |mut summary, f| {
542                    // Count data files in the current fragment
543                    summary.total_data_files += f.files.len() as u64;
544                    // Sum the number of rows for the current fragment (if available)
545                    if let Some(num_rows) = f.num_rows() {
546                        summary.total_rows += num_rows as u64;
547                    }
548                    // Sum file sizes for all data files in the current fragment (if available)
549                    for data_file in &f.files {
550                        if let Some(size_bytes) = data_file.file_size_bytes.get() {
551                            summary.total_files_size += size_bytes.get();
552                        }
553                    }
554                    // Check and count if the current fragment has a deletion file
555                    if f.deletion_file.is_some() {
556                        summary.total_deletion_files += 1;
557                    }
558                    // Sum the number of deleted rows from the deletion file (if available)
559                    if let Some(deletion_file) = &f.deletion_file {
560                        if let Some(num_deleted) = deletion_file.num_deleted_rows {
561                            summary.total_deletion_file_rows += num_deleted as u64;
562                        }
563                    }
564                    summary
565                });
566        summary.total_fragments = self.fragments.len() as u64;
567        summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
568
569        summary
570    }
571}
572
573#[derive(Debug, Clone, PartialEq)]
574pub struct BasePath {
575    pub id: u32,
576    pub name: Option<String>,
577    pub is_dataset_root: bool,
578    /// The full URI string (e.g., "s3://bucket/path")
579    pub path: String,
580}
581
582impl BasePath {
583    /// Create a new BasePath
584    ///
585    /// # Arguments
586    ///
587    /// * `id` - Unique identifier for this base path
588    /// * `path` - Full URI string (e.g., "s3://bucket/path", "/local/path")
589    /// * `name` - Optional human-readable name for this base
590    /// * `is_dataset_root` - Whether this is the dataset root or a data-only base
591    pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
592        Self {
593            id,
594            name,
595            is_dataset_root,
596            path,
597        }
598    }
599
600    /// Extract the object store path from this BasePath's URI.
601    ///
602    /// This is a synchronous operation that parses the URI without initializing an object store.
603    pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
604        ObjectStore::extract_path_from_uri(registry, &self.path)
605    }
606}
607
608impl DeepSizeOf for BasePath {
609    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
610        self.name.deep_size_of_children(context)
611            + self.path.deep_size_of_children(context) * 2
612            + size_of::<bool>()
613    }
614}
615
616#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
617pub struct WriterVersion {
618    pub library: String,
619    pub version: String,
620    pub prerelease: Option<String>,
621    pub build_metadata: Option<String>,
622}
623
624#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
625pub struct DataStorageFormat {
626    pub file_format: String,
627    pub version: String,
628}
629
630const LANCE_FORMAT_NAME: &str = "lance";
631
632impl DataStorageFormat {
633    pub fn new(version: LanceFileVersion) -> Self {
634        Self {
635            file_format: LANCE_FORMAT_NAME.to_string(),
636            version: version.resolve().to_string(),
637        }
638    }
639
640    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
641        self.version.parse::<LanceFileVersion>()
642    }
643}
644
645impl Default for DataStorageFormat {
646    fn default() -> Self {
647        Self::new(LanceFileVersion::default())
648    }
649}
650
651impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
652    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
653        Self {
654            file_format: pb.file_format,
655            version: pb.version,
656        }
657    }
658}
659
660#[derive(Debug, Clone, Copy, PartialEq, Eq)]
661pub enum VersionPart {
662    Major,
663    Minor,
664    Patch,
665}
666
667fn bump_version(version: &mut semver::Version, part: VersionPart) {
668    match part {
669        VersionPart::Major => {
670            version.major += 1;
671            version.minor = 0;
672            version.patch = 0;
673        }
674        VersionPart::Minor => {
675            version.minor += 1;
676            version.patch = 0;
677        }
678        VersionPart::Patch => {
679            version.patch += 1;
680        }
681    }
682}
683
684impl WriterVersion {
685    /// Split a version string into clean version (major.minor.patch), prerelease, and build metadata.
686    ///
687    /// Returns None if the input is not a valid semver string.
688    ///
689    /// For example:
690    /// - "2.0.0-rc.1" -> Some(("2.0.0", Some("rc.1"), None))
691    /// - "2.0.0-rc.1+build.123" -> Some(("2.0.0", Some("rc.1"), Some("build.123")))
692    /// - "2.0.0+build.123" -> Some(("2.0.0", None, Some("build.123")))
693    /// - "not-a-version" -> None
694    fn split_version(full_version: &str) -> Option<(String, Option<String>, Option<String>)> {
695        let mut parsed = semver::Version::parse(full_version).ok()?;
696
697        let prerelease = if parsed.pre.is_empty() {
698            None
699        } else {
700            Some(parsed.pre.to_string())
701        };
702
703        let build_metadata = if parsed.build.is_empty() {
704            None
705        } else {
706            Some(parsed.build.to_string())
707        };
708
709        // Remove prerelease and build metadata to get clean version
710        parsed.pre = semver::Prerelease::EMPTY;
711        parsed.build = semver::BuildMetadata::EMPTY;
712        Some((parsed.to_string(), prerelease, build_metadata))
713    }
714
715    /// Try to parse the version string as a semver string. Returns None if
716    /// not successful.
717    #[deprecated(note = "Use `lance_lib_version()` instead")]
718    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
719        // First split by '-' to separate the version from the pre-release tag
720        let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
721            (
722                &self.version[..dash_idx],
723                Some(&self.version[dash_idx + 1..]),
724            )
725        } else {
726            (self.version.as_str(), None)
727        };
728
729        let mut parts = version_part.split('.');
730        let major = parts.next().unwrap_or("0").parse().ok()?;
731        let minor = parts.next().unwrap_or("0").parse().ok()?;
732        let patch = parts.next().unwrap_or("0").parse().ok()?;
733
734        Some((major, minor, patch, tag))
735    }
736
737    /// If the library is "lance", parse the version as semver and return it.
738    /// Returns None if the library is not "lance" or the version cannot be parsed as semver.
739    ///
740    /// This method reconstructs the full semantic version by combining the version field
741    /// with the prerelease and build_metadata fields (if present). For example:
742    /// - version="2.0.0" + prerelease=Some("rc.1") -> "2.0.0-rc.1"
743    /// - version="2.0.0" + prerelease=Some("rc.1") + build_metadata=Some("build.123") -> "2.0.0-rc.1+build.123"
744    pub fn lance_lib_version(&self) -> Option<semver::Version> {
745        if self.library != "lance" {
746            return None;
747        }
748
749        let mut version = semver::Version::parse(&self.version).ok()?;
750
751        if let Some(ref prerelease) = self.prerelease {
752            version.pre = semver::Prerelease::new(prerelease).ok()?;
753        }
754
755        if let Some(ref build_metadata) = self.build_metadata {
756            version.build = semver::BuildMetadata::new(build_metadata).ok()?;
757        }
758
759        Some(version)
760    }
761
762    #[deprecated(
763        note = "Use `lance_lib_version()` instead, which safely checks the library field and returns Option"
764    )]
765    #[allow(deprecated)]
766    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
767        self.semver()
768            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
769    }
770
771    /// Check if this is a Lance library version older than the given major/minor/patch.
772    ///
773    /// # Panics
774    ///
775    /// Panics if the library is not "lance" or the version cannot be parsed as semver.
776    #[deprecated(note = "Use `lance_lib_version()` and its `older_than` method instead.")]
777    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
778        let version = self
779            .lance_lib_version()
780            .expect("Not lance library or invalid version");
781        let other = semver::Version {
782            major: major.into(),
783            minor: minor.into(),
784            patch: patch.into(),
785            pre: semver::Prerelease::EMPTY,
786            build: semver::BuildMetadata::EMPTY,
787        };
788        version < other
789    }
790
791    #[deprecated(note = "This is meant for testing and will be made private in future version.")]
792    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
793        let mut version = self.lance_lib_version().expect("Should be lance version");
794        bump_version(&mut version, part);
795        if !keep_tag {
796            version.pre = semver::Prerelease::EMPTY;
797        }
798        let (clean_version, prerelease, build_metadata) = Self::split_version(&version.to_string())
799            .expect("Bumped version should be valid semver");
800        Self {
801            library: self.library.clone(),
802            version: clean_version,
803            prerelease,
804            build_metadata,
805        }
806    }
807}
808
809impl Default for WriterVersion {
810    #[cfg(not(test))]
811    fn default() -> Self {
812        let full_version = env!("CARGO_PKG_VERSION");
813        let (version, prerelease, build_metadata) =
814            Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
815        Self {
816            library: "lance".to_string(),
817            version,
818            prerelease,
819            build_metadata,
820        }
821    }
822
823    // Unit tests always run as if they are in the next version.
824    #[cfg(test)]
825    #[allow(deprecated)]
826    fn default() -> Self {
827        let full_version = env!("CARGO_PKG_VERSION");
828        let (version, prerelease, build_metadata) =
829            Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
830        Self {
831            library: "lance".to_string(),
832            version,
833            prerelease,
834            build_metadata,
835        }
836        .bump(VersionPart::Patch, true)
837    }
838}
839
840impl ProtoStruct for Manifest {
841    type Proto = pb::Manifest;
842}
843
844impl From<pb::BasePath> for BasePath {
845    fn from(p: pb::BasePath) -> Self {
846        Self::new(p.id, p.path, p.name, p.is_dataset_root)
847    }
848}
849
850impl From<BasePath> for pb::BasePath {
851    fn from(p: BasePath) -> Self {
852        Self {
853            id: p.id,
854            name: p.name,
855            is_dataset_root: p.is_dataset_root,
856            path: p.path,
857        }
858    }
859}
860
861impl TryFrom<pb::Manifest> for Manifest {
862    type Error = Error;
863
864    fn try_from(p: pb::Manifest) -> Result<Self> {
865        let timestamp_nanos = p.timestamp.map(|ts| {
866            let sec = ts.seconds as u128 * 1e9 as u128;
867            let nanos = ts.nanos as u128;
868            sec + nanos
869        });
870        // We only use the writer version if it is fully set.
871        let writer_version = match p.writer_version {
872            Some(pb::manifest::WriterVersion {
873                library,
874                version,
875                prerelease,
876                build_metadata,
877            }) => Some(WriterVersion {
878                library,
879                version,
880                prerelease,
881                build_metadata,
882            }),
883            _ => None,
884        };
885        let fragments = Arc::new(
886            p.fragments
887                .into_iter()
888                .map(Fragment::try_from)
889                .collect::<Result<Vec<_>>>()?,
890        );
891        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
892        let fields_with_meta = FieldsWithMeta {
893            fields: Fields(p.fields),
894            metadata: p.schema_metadata,
895        };
896
897        if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
898            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
899        {
900            return Err(Error::Internal {
901                message: "All fragments must have row ids".into(),
902                location: location!(),
903            });
904        }
905
906        let data_storage_format = match p.data_format {
907            None => {
908                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
909                    // If there are fragments, they are a better indicator
910                    DataStorageFormat::new(inferred_version)
911                } else {
912                    // No fragments to inspect, best we can do is look at writer flags
913                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
914                        DataStorageFormat::new(LanceFileVersion::Stable)
915                    } else {
916                        DataStorageFormat::new(LanceFileVersion::Legacy)
917                    }
918                }
919            }
920            Some(format) => DataStorageFormat::from(format),
921        };
922
923        let schema = Schema::from(fields_with_meta);
924        let local_schema = schema.retain_storage_class(StorageClass::Default);
925
926        Ok(Self {
927            schema,
928            local_schema,
929            version: p.version,
930            branch: p.branch,
931            writer_version,
932            version_aux_data: p.version_aux_data as usize,
933            index_section: p.index_section.map(|i| i as usize),
934            timestamp_nanos: timestamp_nanos.unwrap_or(0),
935            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
936            reader_feature_flags: p.reader_feature_flags,
937            writer_feature_flags: p.writer_feature_flags,
938            max_fragment_id: p.max_fragment_id,
939            fragments,
940            transaction_file: if p.transaction_file.is_empty() {
941                None
942            } else {
943                Some(p.transaction_file)
944            },
945            fragment_offsets,
946            next_row_id: p.next_row_id,
947            data_storage_format,
948            config: p.config,
949            table_metadata: p.table_metadata,
950            blob_dataset_version: if p.blob_dataset_version == 0 {
951                None
952            } else {
953                Some(p.blob_dataset_version)
954            },
955            base_paths: p
956                .base_paths
957                .iter()
958                .map(|item| (item.id, item.clone().into()))
959                .collect(),
960        })
961    }
962}
963
964impl From<&Manifest> for pb::Manifest {
965    fn from(m: &Manifest) -> Self {
966        let timestamp_nanos = if m.timestamp_nanos == 0 {
967            None
968        } else {
969            let nanos = m.timestamp_nanos % 1e9 as u128;
970            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
971            Some(Timestamp {
972                seconds,
973                nanos: nanos as i32,
974            })
975        };
976        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
977        Self {
978            fields: fields_with_meta.fields.0,
979            schema_metadata: m
980                .schema
981                .metadata
982                .iter()
983                .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
984                .collect(),
985            version: m.version,
986            branch: m.branch.clone(),
987            writer_version: m
988                .writer_version
989                .as_ref()
990                .map(|wv| pb::manifest::WriterVersion {
991                    library: wv.library.clone(),
992                    version: wv.version.clone(),
993                    prerelease: wv.prerelease.clone(),
994                    build_metadata: wv.build_metadata.clone(),
995                }),
996            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
997            table_metadata: m.table_metadata.clone(),
998            version_aux_data: m.version_aux_data as u64,
999            index_section: m.index_section.map(|i| i as u64),
1000            timestamp: timestamp_nanos,
1001            tag: m.tag.clone().unwrap_or_default(),
1002            reader_feature_flags: m.reader_feature_flags,
1003            writer_feature_flags: m.writer_feature_flags,
1004            max_fragment_id: m.max_fragment_id,
1005            transaction_file: m.transaction_file.clone().unwrap_or_default(),
1006            next_row_id: m.next_row_id,
1007            data_format: Some(pb::manifest::DataStorageFormat {
1008                file_format: m.data_storage_format.file_format.clone(),
1009                version: m.data_storage_format.version.clone(),
1010            }),
1011            config: m.config.clone(),
1012            blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
1013            base_paths: m
1014                .base_paths
1015                .values()
1016                .map(|base_path| pb::BasePath {
1017                    id: base_path.id,
1018                    name: base_path.name.clone(),
1019                    is_dataset_root: base_path.is_dataset_root,
1020                    path: base_path.path.clone(),
1021                })
1022                .collect(),
1023        }
1024    }
1025}
1026
1027#[async_trait]
1028pub trait SelfDescribingFileReader {
1029    /// Open a file reader without any cached schema
1030    ///
1031    /// In this case the schema will first need to be loaded
1032    /// from the file itself.
1033    ///
1034    /// When loading files from a dataset it is preferable to use
1035    /// the fragment reader to avoid this overhead.
1036    async fn try_new_self_described(
1037        object_store: &ObjectStore,
1038        path: &Path,
1039        cache: Option<&LanceCache>,
1040    ) -> Result<Self>
1041    where
1042        Self: Sized,
1043    {
1044        let reader = object_store.open(path).await?;
1045        Self::try_new_self_described_from_reader(reader.into(), cache).await
1046    }
1047
1048    async fn try_new_self_described_from_reader(
1049        reader: Arc<dyn Reader>,
1050        cache: Option<&LanceCache>,
1051    ) -> Result<Self>
1052    where
1053        Self: Sized;
1054}
1055
1056#[async_trait]
1057impl SelfDescribingFileReader for FileReader {
1058    async fn try_new_self_described_from_reader(
1059        reader: Arc<dyn Reader>,
1060        cache: Option<&LanceCache>,
1061    ) -> Result<Self> {
1062        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
1063        let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
1064            message: format!(
1065                "Attempt to open file at {} as self-describing but it did not contain a manifest",
1066                reader.path(),
1067            ),
1068            location: location!(),
1069        })?;
1070        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
1071        if manifest.should_use_legacy_format() {
1072            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1073        }
1074        let schema = manifest.schema;
1075        let max_field_id = schema.max_field_id().unwrap_or_default();
1076        Self::try_new_from_reader(
1077            reader.path(),
1078            reader.clone(),
1079            Some(metadata),
1080            schema,
1081            0,
1082            0,
1083            max_field_id,
1084            cache,
1085        )
1086        .await
1087    }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092    use crate::format::{DataFile, DeletionFile, DeletionFileType};
1093    use std::num::NonZero;
1094
1095    use super::*;
1096
1097    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
1098    use lance_core::datatypes::Field;
1099
1100    #[test]
1101    fn test_writer_version() {
1102        let wv = WriterVersion::default();
1103        assert_eq!(wv.library, "lance");
1104
1105        // Parse the actual cargo version to check if it has a pre-release tag
1106        let cargo_version = env!("CARGO_PKG_VERSION");
1107        let expected_tag = if cargo_version.contains('-') {
1108            Some(cargo_version.split('-').nth(1).unwrap())
1109        } else {
1110            None
1111        };
1112
1113        // Verify the version field contains only major.minor.patch
1114        let version_parts: Vec<&str> = wv.version.split('.').collect();
1115        assert_eq!(
1116            version_parts.len(),
1117            3,
1118            "Version should be major.minor.patch"
1119        );
1120        assert!(
1121            !wv.version.contains('-'),
1122            "Version field should not contain prerelease"
1123        );
1124
1125        // Verify the prerelease field matches the expected tag
1126        assert_eq!(wv.prerelease.as_deref(), expected_tag);
1127        // Build metadata should be None for default version
1128        assert_eq!(wv.build_metadata, None);
1129
1130        // Verify lance_lib_version() reconstructs the full semver correctly
1131        let version = wv.lance_lib_version().unwrap();
1132        assert_eq!(
1133            version.major,
1134            env!("CARGO_PKG_VERSION_MAJOR").parse::<u64>().unwrap()
1135        );
1136        assert_eq!(
1137            version.minor,
1138            env!("CARGO_PKG_VERSION_MINOR").parse::<u64>().unwrap()
1139        );
1140        assert_eq!(
1141            version.patch,
1142            // Unit tests run against (major,minor,patch + 1)
1143            env!("CARGO_PKG_VERSION_PATCH").parse::<u64>().unwrap() + 1
1144        );
1145        assert_eq!(version.pre.as_str(), expected_tag.unwrap_or(""));
1146
1147        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1148            let mut bumped_version = version.clone();
1149            bump_version(&mut bumped_version, *part);
1150            assert!(version < bumped_version);
1151        }
1152    }
1153
1154    #[test]
1155    fn test_writer_version_split() {
1156        // Test splitting version with prerelease
1157        let (version, prerelease, build_metadata) =
1158            WriterVersion::split_version("2.0.0-rc.1").unwrap();
1159        assert_eq!(version, "2.0.0");
1160        assert_eq!(prerelease, Some("rc.1".to_string()));
1161        assert_eq!(build_metadata, None);
1162
1163        // Test splitting version without prerelease
1164        let (version, prerelease, build_metadata) = WriterVersion::split_version("2.0.0").unwrap();
1165        assert_eq!(version, "2.0.0");
1166        assert_eq!(prerelease, None);
1167        assert_eq!(build_metadata, None);
1168
1169        // Test splitting version with prerelease and build metadata
1170        let (version, prerelease, build_metadata) =
1171            WriterVersion::split_version("2.0.0-rc.1+build.123").unwrap();
1172        assert_eq!(version, "2.0.0");
1173        assert_eq!(prerelease, Some("rc.1".to_string()));
1174        assert_eq!(build_metadata, Some("build.123".to_string()));
1175
1176        // Test splitting version with only build metadata
1177        let (version, prerelease, build_metadata) =
1178            WriterVersion::split_version("2.0.0+build.123").unwrap();
1179        assert_eq!(version, "2.0.0");
1180        assert_eq!(prerelease, None);
1181        assert_eq!(build_metadata, Some("build.123".to_string()));
1182
1183        // Test with invalid version returns None
1184        assert!(WriterVersion::split_version("not-a-version").is_none());
1185    }
1186
1187    #[test]
1188    fn test_writer_version_comparison_with_prerelease() {
1189        let v1 = WriterVersion {
1190            library: "lance".to_string(),
1191            version: "2.0.0".to_string(),
1192            prerelease: Some("rc.1".to_string()),
1193            build_metadata: None,
1194        };
1195
1196        let v2 = WriterVersion {
1197            library: "lance".to_string(),
1198            version: "2.0.0".to_string(),
1199            prerelease: None,
1200            build_metadata: None,
1201        };
1202
1203        let semver1 = v1.lance_lib_version().unwrap();
1204        let semver2 = v2.lance_lib_version().unwrap();
1205
1206        // rc.1 should be less than the release version
1207        assert!(semver1 < semver2);
1208    }
1209
1210    #[test]
1211    fn test_writer_version_with_build_metadata() {
1212        let v = WriterVersion {
1213            library: "lance".to_string(),
1214            version: "2.0.0".to_string(),
1215            prerelease: Some("rc.1".to_string()),
1216            build_metadata: Some("build.123".to_string()),
1217        };
1218
1219        let semver = v.lance_lib_version().unwrap();
1220        assert_eq!(semver.to_string(), "2.0.0-rc.1+build.123");
1221        assert_eq!(semver.major, 2);
1222        assert_eq!(semver.minor, 0);
1223        assert_eq!(semver.patch, 0);
1224        assert_eq!(semver.pre.as_str(), "rc.1");
1225        assert_eq!(semver.build.as_str(), "build.123");
1226    }
1227
1228    #[test]
1229    fn test_writer_version_non_semver() {
1230        // Test that Lance library can have non-semver version strings
1231        let v = WriterVersion {
1232            library: "lance".to_string(),
1233            version: "custom-build-v1".to_string(),
1234            prerelease: None,
1235            build_metadata: None,
1236        };
1237
1238        // lance_lib_version should return None for non-semver
1239        assert!(v.lance_lib_version().is_none());
1240
1241        // But the WriterVersion itself should still be valid and usable
1242        assert_eq!(v.library, "lance");
1243        assert_eq!(v.version, "custom-build-v1");
1244    }
1245
1246    #[test]
1247    #[allow(deprecated)]
1248    fn test_older_than_with_prerelease() {
1249        // Test that older_than correctly handles prerelease
1250        let v_rc = WriterVersion {
1251            library: "lance".to_string(),
1252            version: "2.0.0".to_string(),
1253            prerelease: Some("rc.1".to_string()),
1254            build_metadata: None,
1255        };
1256
1257        // 2.0.0-rc.1 should be older than 2.0.0
1258        assert!(v_rc.older_than(2, 0, 0));
1259
1260        // 2.0.0-rc.1 should be older than 2.0.1
1261        assert!(v_rc.older_than(2, 0, 1));
1262
1263        // 2.0.0-rc.1 should not be older than 1.9.9
1264        assert!(!v_rc.older_than(1, 9, 9));
1265
1266        let v_release = WriterVersion {
1267            library: "lance".to_string(),
1268            version: "2.0.0".to_string(),
1269            prerelease: None,
1270            build_metadata: None,
1271        };
1272
1273        // 2.0.0 should not be older than 2.0.0
1274        assert!(!v_release.older_than(2, 0, 0));
1275
1276        // 2.0.0 should be older than 2.0.1
1277        assert!(v_release.older_than(2, 0, 1));
1278    }
1279
1280    #[test]
1281    fn test_fragments_by_offset_range() {
1282        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1283            "a",
1284            arrow_schema::DataType::Int64,
1285            false,
1286        )]);
1287        let schema = Schema::try_from(&arrow_schema).unwrap();
1288        let fragments = vec![
1289            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1290            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1291            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1292        ];
1293        let manifest = Manifest::new(
1294            schema,
1295            Arc::new(fragments),
1296            DataStorageFormat::default(),
1297            /*blob_dataset_version= */ None,
1298            HashMap::new(),
1299        );
1300
1301        let actual = manifest.fragments_by_offset_range(0..10);
1302        assert_eq!(actual.len(), 1);
1303        assert_eq!(actual[0].0, 0);
1304        assert_eq!(actual[0].1.id, 0);
1305
1306        let actual = manifest.fragments_by_offset_range(5..15);
1307        assert_eq!(actual.len(), 2);
1308        assert_eq!(actual[0].0, 0);
1309        assert_eq!(actual[0].1.id, 0);
1310        assert_eq!(actual[1].0, 10);
1311        assert_eq!(actual[1].1.id, 1);
1312
1313        let actual = manifest.fragments_by_offset_range(15..50);
1314        assert_eq!(actual.len(), 2);
1315        assert_eq!(actual[0].0, 10);
1316        assert_eq!(actual[0].1.id, 1);
1317        assert_eq!(actual[1].0, 25);
1318        assert_eq!(actual[1].1.id, 2);
1319
1320        // Out of range
1321        let actual = manifest.fragments_by_offset_range(45..100);
1322        assert!(actual.is_empty());
1323
1324        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1325    }
1326
1327    #[test]
1328    fn test_max_field_id() {
1329        // Validate that max field id handles varying field ids by fragment.
1330        let mut field0 =
1331            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1332        field0.set_id(-1, &mut 0);
1333        let mut field2 =
1334            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1335        field2.set_id(-1, &mut 2);
1336
1337        let schema = Schema {
1338            fields: vec![field0, field2],
1339            metadata: Default::default(),
1340        };
1341        let fragments = vec![
1342            Fragment {
1343                id: 0,
1344                files: vec![DataFile::new_legacy_from_fields(
1345                    "path1",
1346                    vec![0, 1, 2],
1347                    None,
1348                )],
1349                deletion_file: None,
1350                row_id_meta: None,
1351                physical_rows: None,
1352                created_at_version_meta: None,
1353                last_updated_at_version_meta: None,
1354            },
1355            Fragment {
1356                id: 1,
1357                files: vec![
1358                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1359                    DataFile::new_legacy_from_fields("path3", vec![2], None),
1360                ],
1361                deletion_file: None,
1362                row_id_meta: None,
1363                physical_rows: None,
1364                created_at_version_meta: None,
1365                last_updated_at_version_meta: None,
1366            },
1367        ];
1368
1369        let manifest = Manifest::new(
1370            schema,
1371            Arc::new(fragments),
1372            DataStorageFormat::default(),
1373            /*blob_dataset_version= */ None,
1374            HashMap::new(),
1375        );
1376
1377        assert_eq!(manifest.max_field_id(), 43);
1378    }
1379
1380    #[test]
1381    fn test_config() {
1382        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1383            "a",
1384            arrow_schema::DataType::Int64,
1385            false,
1386        )]);
1387        let schema = Schema::try_from(&arrow_schema).unwrap();
1388        let fragments = vec![
1389            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1390            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1391            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1392        ];
1393        let mut manifest = Manifest::new(
1394            schema,
1395            Arc::new(fragments),
1396            DataStorageFormat::default(),
1397            /*blob_dataset_version= */ None,
1398            HashMap::new(),
1399        );
1400
1401        let mut config = manifest.config.clone();
1402        config.insert("lance.test".to_string(), "value".to_string());
1403        config.insert("other-key".to_string(), "other-value".to_string());
1404
1405        manifest.config_mut().extend(config.clone());
1406        assert_eq!(manifest.config, config.clone());
1407
1408        config.remove("other-key");
1409        manifest.config_mut().remove("other-key");
1410        assert_eq!(manifest.config, config);
1411    }
1412
1413    #[test]
1414    fn test_manifest_summary() {
1415        // Step 1: test empty manifest summary
1416        let arrow_schema = ArrowSchema::new(vec![
1417            ArrowField::new("id", arrow_schema::DataType::Int64, false),
1418            ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1419        ]);
1420        let schema = Schema::try_from(&arrow_schema).unwrap();
1421
1422        let empty_manifest = Manifest::new(
1423            schema.clone(),
1424            Arc::new(vec![]),
1425            DataStorageFormat::default(),
1426            None,
1427            HashMap::new(),
1428        );
1429
1430        let empty_summary = empty_manifest.summary();
1431        assert_eq!(empty_summary.total_rows, 0);
1432        assert_eq!(empty_summary.total_files_size, 0);
1433        assert_eq!(empty_summary.total_fragments, 0);
1434        assert_eq!(empty_summary.total_data_files, 0);
1435        assert_eq!(empty_summary.total_deletion_file_rows, 0);
1436        assert_eq!(empty_summary.total_data_file_rows, 0);
1437        assert_eq!(empty_summary.total_deletion_files, 0);
1438
1439        // Step 2: write empty files and verify summary
1440        let empty_fragments = vec![
1441            Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1442            Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1443        ];
1444
1445        let empty_files_manifest = Manifest::new(
1446            schema.clone(),
1447            Arc::new(empty_fragments),
1448            DataStorageFormat::default(),
1449            None,
1450            HashMap::new(),
1451        );
1452
1453        let empty_files_summary = empty_files_manifest.summary();
1454        assert_eq!(empty_files_summary.total_rows, 0);
1455        assert_eq!(empty_files_summary.total_files_size, 0);
1456        assert_eq!(empty_files_summary.total_fragments, 2);
1457        assert_eq!(empty_files_summary.total_data_files, 2);
1458        assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1459        assert_eq!(empty_files_summary.total_data_file_rows, 0);
1460        assert_eq!(empty_files_summary.total_deletion_files, 0);
1461
1462        // Step 3: write real data and verify summary
1463        let real_fragments = vec![
1464            Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1465            Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1466            Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1467        ];
1468
1469        let real_data_manifest = Manifest::new(
1470            schema.clone(),
1471            Arc::new(real_fragments),
1472            DataStorageFormat::default(),
1473            None,
1474            HashMap::new(),
1475        );
1476
1477        let real_data_summary = real_data_manifest.summary();
1478        assert_eq!(real_data_summary.total_rows, 425); // 100 + 250 + 75
1479        assert_eq!(real_data_summary.total_files_size, 0); // Zero for unknown
1480        assert_eq!(real_data_summary.total_fragments, 3);
1481        assert_eq!(real_data_summary.total_data_files, 3);
1482        assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1483        assert_eq!(real_data_summary.total_data_file_rows, 425);
1484        assert_eq!(real_data_summary.total_deletion_files, 0);
1485
1486        let file_version = LanceFileVersion::default();
1487        // Step 4: write deletion files and verify summary
1488        let mut fragment_with_deletion = Fragment::new(0)
1489            .with_file(
1490                "data_with_deletion.lance",
1491                vec![0, 1],
1492                vec![0, 1],
1493                &file_version,
1494                NonZero::new(1000),
1495            )
1496            .with_physical_rows(50);
1497        fragment_with_deletion.deletion_file = Some(DeletionFile {
1498            read_version: 123,
1499            id: 456,
1500            file_type: DeletionFileType::Array,
1501            num_deleted_rows: Some(10),
1502            base_id: None,
1503        });
1504
1505        let manifest_with_deletion = Manifest::new(
1506            schema,
1507            Arc::new(vec![fragment_with_deletion]),
1508            DataStorageFormat::default(),
1509            None,
1510            HashMap::new(),
1511        );
1512
1513        let deletion_summary = manifest_with_deletion.summary();
1514        assert_eq!(deletion_summary.total_rows, 40); // 50 - 10
1515        assert_eq!(deletion_summary.total_files_size, 1000);
1516        assert_eq!(deletion_summary.total_fragments, 1);
1517        assert_eq!(deletion_summary.total_data_files, 1);
1518        assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1519        assert_eq!(deletion_summary.total_data_file_rows, 50);
1520        assert_eq!(deletion_summary.total_deletion_files, 1);
1521
1522        //Just verify the transformation is OK
1523        let stats_map: BTreeMap<String, String> = deletion_summary.into();
1524        assert_eq!(stats_map.len(), 7)
1525    }
1526}