lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
8use lance_file::reader::FileReader;
9use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::{Schema, StorageClass};
23use lance_core::{Error, Result};
24use lance_io::object_store::ObjectStore;
25use lance_io::utils::read_struct;
26use snafu::location;
27
28/// Manifest of a dataset
29///
30///  * Schema
31///  * Version
32///  * Fragments.
33///  * Indices.
34#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36    /// Dataset schema.
37    pub schema: Schema,
38
39    /// Local schema, only containing fields with the default storage class (not blobs)
40    pub local_schema: Schema,
41
42    /// Dataset version
43    pub version: u64,
44
45    /// Branch name, None if the dataset is the main branch.
46    pub branch: Option<String>,
47
48    /// Version of the writer library that wrote this manifest.
49    pub writer_version: Option<WriterVersion>,
50
51    /// Fragments, the pieces to build the dataset.
52    ///
53    /// This list is stored in order, sorted by fragment id.  However, the fragment id
54    /// sequence may have gaps.
55    pub fragments: Arc<Vec<Fragment>>,
56
57    /// The file position of the version aux data.
58    pub version_aux_data: usize,
59
60    /// The file position of the index metadata.
61    pub index_section: Option<usize>,
62
63    /// The creation timestamp with nanosecond resolution as 128-bit integer
64    pub timestamp_nanos: u128,
65
66    /// An optional string tag for this version
67    pub tag: Option<String>,
68
69    /// The reader flags
70    pub reader_feature_flags: u64,
71
72    /// The writer flags
73    pub writer_feature_flags: u64,
74
75    /// The max fragment id used so far
76    /// None means never set, Some(0) means max ID used so far is 0
77    pub max_fragment_id: Option<u32>,
78
79    /// The path to the transaction file, relative to the root of the dataset
80    pub transaction_file: Option<String>,
81
82    /// Precomputed logic offset of each fragment
83    /// accelerating the fragment search using offset ranges.
84    fragment_offsets: Vec<usize>,
85
86    /// The max row id used so far.
87    pub next_row_id: u64,
88
89    /// The storage format of the data files.
90    pub data_storage_format: DataStorageFormat,
91
92    /// Table configuration.
93    pub config: HashMap<String, String>,
94
95    /// Table metadata.
96    ///
97    /// This is a key-value map that can be used to store arbitrary metadata
98    /// associated with the table. This is different than configuration, which
99    /// is used to tell libraries how to read, write, or manage the table.
100    pub table_metadata: HashMap<String, String>,
101
102    /// Blob dataset version
103    pub blob_dataset_version: Option<u64>,
104
105    /* external base paths */
106    pub base_paths: HashMap<u32, BasePath>,
107}
108
109// We use the most significant bit to indicate that a transaction is detached
110pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
111
112pub fn is_detached_version(version: u64) -> bool {
113    version & DETACHED_VERSION_MASK != 0
114}
115
116fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
117    fragments
118        .iter()
119        .map(|f| f.num_rows().unwrap_or_default())
120        .chain([0]) // Make the last offset to be the full-length of the dataset.
121        .scan(0_usize, |offset, len| {
122            let start = *offset;
123            *offset += len;
124            Some(start)
125        })
126        .collect()
127}
128
129#[derive(Default)]
130pub struct ManifestSummary {
131    pub total_fragments: u64,
132    pub total_data_files: u64,
133    pub total_files_size: u64,
134    pub total_deletion_files: u64,
135    pub total_data_file_rows: u64,
136    pub total_deletion_file_rows: u64,
137    pub total_rows: u64,
138}
139
140impl From<ManifestSummary> for BTreeMap<String, String> {
141    fn from(summary: ManifestSummary) -> Self {
142        let mut stats_map = Self::new();
143        stats_map.insert(
144            "total_fragments".to_string(),
145            summary.total_fragments.to_string(),
146        );
147        stats_map.insert(
148            "total_data_files".to_string(),
149            summary.total_data_files.to_string(),
150        );
151        stats_map.insert(
152            "total_files_size".to_string(),
153            summary.total_files_size.to_string(),
154        );
155        stats_map.insert(
156            "total_deletion_files".to_string(),
157            summary.total_deletion_files.to_string(),
158        );
159        stats_map.insert(
160            "total_data_file_rows".to_string(),
161            summary.total_data_file_rows.to_string(),
162        );
163        stats_map.insert(
164            "total_deletion_file_rows".to_string(),
165            summary.total_deletion_file_rows.to_string(),
166        );
167        stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
168        stats_map
169    }
170}
171
172impl Manifest {
173    pub fn new(
174        schema: Schema,
175        fragments: Arc<Vec<Fragment>>,
176        data_storage_format: DataStorageFormat,
177        blob_dataset_version: Option<u64>,
178        base_paths: HashMap<u32, BasePath>,
179    ) -> Self {
180        let fragment_offsets = compute_fragment_offsets(&fragments);
181        let local_schema = schema.retain_storage_class(StorageClass::Default);
182
183        Self {
184            schema,
185            local_schema,
186            version: 1,
187            branch: None,
188            writer_version: Some(WriterVersion::default()),
189            fragments,
190            version_aux_data: 0,
191            index_section: None,
192            timestamp_nanos: 0,
193            tag: None,
194            reader_feature_flags: 0,
195            writer_feature_flags: 0,
196            max_fragment_id: None,
197            transaction_file: None,
198            fragment_offsets,
199            next_row_id: 0,
200            data_storage_format,
201            config: HashMap::new(),
202            table_metadata: HashMap::new(),
203            blob_dataset_version,
204            base_paths,
205        }
206    }
207
208    pub fn new_from_previous(
209        previous: &Self,
210        schema: Schema,
211        fragments: Arc<Vec<Fragment>>,
212        new_blob_version: Option<u64>,
213    ) -> Self {
214        let fragment_offsets = compute_fragment_offsets(&fragments);
215        let local_schema = schema.retain_storage_class(StorageClass::Default);
216
217        let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
218
219        Self {
220            schema,
221            local_schema,
222            version: previous.version + 1,
223            branch: previous.branch.clone(),
224            writer_version: Some(WriterVersion::default()),
225            fragments,
226            version_aux_data: 0,
227            index_section: None, // Caller should update index if they want to keep them.
228            timestamp_nanos: 0,  // This will be set on commit
229            tag: None,
230            reader_feature_flags: 0, // These will be set on commit
231            writer_feature_flags: 0, // These will be set on commit
232            max_fragment_id: previous.max_fragment_id,
233            transaction_file: None,
234            fragment_offsets,
235            next_row_id: previous.next_row_id,
236            data_storage_format: previous.data_storage_format.clone(),
237            config: previous.config.clone(),
238            table_metadata: previous.table_metadata.clone(),
239            blob_dataset_version,
240            base_paths: previous.base_paths.clone(),
241        }
242    }
243
244    /// Performs a shallow_clone of the manifest entirely in memory without:
245    /// - Any persistent storage operations
246    /// - Modifications to the original data
247    /// - If the shallow clone is for branch, ref_name is the source branch
248    pub fn shallow_clone(
249        &self,
250        ref_name: Option<String>,
251        ref_path: String,
252        ref_base_id: u32,
253        branch_name: Option<String>,
254        transaction_file: String,
255    ) -> Self {
256        let cloned_fragments = self
257            .fragments
258            .as_ref()
259            .iter()
260            .map(|fragment| {
261                let mut cloned_fragment = fragment.clone();
262                for file in &mut cloned_fragment.files {
263                    if file.base_id.is_none() {
264                        file.base_id = Some(ref_base_id);
265                    }
266                }
267
268                if let Some(deletion) = &mut cloned_fragment.deletion_file {
269                    if deletion.base_id.is_none() {
270                        deletion.base_id = Some(ref_base_id);
271                    }
272                }
273                cloned_fragment
274            })
275            .collect::<Vec<_>>();
276
277        Self {
278            schema: self.schema.clone(),
279            local_schema: self.local_schema.clone(),
280            version: self.version,
281            branch: branch_name,
282            writer_version: self.writer_version.clone(),
283            fragments: Arc::new(cloned_fragments),
284            version_aux_data: self.version_aux_data,
285            index_section: None, // These will be set on commit
286            timestamp_nanos: self.timestamp_nanos,
287            tag: None,
288            reader_feature_flags: 0, // These will be set on commit
289            writer_feature_flags: 0, // These will be set on commit
290            max_fragment_id: self.max_fragment_id,
291            transaction_file: Some(transaction_file),
292            fragment_offsets: self.fragment_offsets.clone(),
293            next_row_id: self.next_row_id,
294            data_storage_format: self.data_storage_format.clone(),
295            config: self.config.clone(),
296            blob_dataset_version: self.blob_dataset_version,
297            base_paths: {
298                let mut base_paths = self.base_paths.clone();
299                let base_path = BasePath {
300                    id: ref_base_id,
301                    name: ref_name,
302                    is_dataset_root: true,
303                    path: ref_path,
304                };
305                base_paths.insert(ref_base_id, base_path);
306                base_paths
307            },
308            table_metadata: self.table_metadata.clone(),
309        }
310    }
311
312    /// Return the `timestamp_nanos` value as a Utc DateTime
313    pub fn timestamp(&self) -> DateTime<Utc> {
314        let nanos = self.timestamp_nanos % 1_000_000_000;
315        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
316        Utc.from_utc_datetime(
317            &DateTime::from_timestamp(seconds, nanos as u32)
318                .unwrap_or_default()
319                .naive_utc(),
320        )
321    }
322
323    /// Set the `timestamp_nanos` value from a Utc DateTime
324    pub fn set_timestamp(&mut self, nanos: u128) {
325        self.timestamp_nanos = nanos;
326    }
327
328    /// Get a mutable reference to the config
329    pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
330        &mut self.config
331    }
332
333    /// Get a mutable reference to the table metadata
334    pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
335        &mut self.table_metadata
336    }
337
338    /// Get a mutable reference to the schema metadata
339    pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
340        &mut self.schema.metadata
341    }
342
343    /// Get a mutable reference to the field metadata for a specific field id
344    ///
345    /// Returns None if the field does not exist in the schema.
346    pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
347        self.schema
348            .field_by_id_mut(field_id)
349            .map(|field| &mut field.metadata)
350    }
351
352    /// Set the `config` from an iterator
353    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
354    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
355        self.config.extend(upsert_values);
356    }
357
358    /// Delete `config` keys using a slice of keys
359    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
360    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
361        self.config
362            .retain(|key, _| !delete_keys.contains(&key.as_str()));
363    }
364
365    /// Replaces the schema metadata with the given key-value pairs.
366    #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
367    pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
368        self.schema.metadata = new_metadata;
369    }
370
371    /// Replaces the metadata of the field with the given id with the given key-value pairs.
372    ///
373    /// If the field does not exist in the schema, this is a no-op.
374    #[deprecated(
375        note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
376    )]
377    pub fn replace_field_metadata(
378        &mut self,
379        field_id: i32,
380        new_metadata: HashMap<String, String>,
381    ) -> Result<()> {
382        if let Some(field) = self.schema.field_by_id_mut(field_id) {
383            field.metadata = new_metadata;
384            Ok(())
385        } else {
386            Err(Error::invalid_input(
387                format!(
388                    "Field with id {} does not exist for replace_field_metadata",
389                    field_id
390                ),
391                location!(),
392            ))
393        }
394    }
395
396    /// Check the current fragment list and update the high water mark
397    pub fn update_max_fragment_id(&mut self) {
398        // If there are no fragments, don't update max_fragment_id
399        if self.fragments.is_empty() {
400            return;
401        }
402
403        let max_fragment_id = self
404            .fragments
405            .iter()
406            .map(|f| f.id)
407            .max()
408            .unwrap() // Safe because we checked fragments is not empty
409            .try_into()
410            .unwrap();
411
412        match self.max_fragment_id {
413            None => {
414                // First time being set
415                self.max_fragment_id = Some(max_fragment_id);
416            }
417            Some(current_max) => {
418                // Only update if the computed max is greater than current
419                // This preserves the high water mark even when fragments are deleted
420                if max_fragment_id > current_max {
421                    self.max_fragment_id = Some(max_fragment_id);
422                }
423            }
424        }
425    }
426
427    /// Return the max fragment id.
428    /// Note this does not support recycling of fragment ids.
429    ///
430    /// This will return None if there are no fragments and max_fragment_id was never set.
431    pub fn max_fragment_id(&self) -> Option<u64> {
432        if let Some(max_id) = self.max_fragment_id {
433            // Return the stored high water mark
434            Some(max_id.into())
435        } else {
436            // Not yet set, compute from fragment list
437            self.fragments.iter().map(|f| f.id).max()
438        }
439    }
440
441    /// Get the max used field id
442    ///
443    /// This is different than [Schema::max_field_id] because it also considers
444    /// the field ids in the data files that have been dropped from the schema.
445    pub fn max_field_id(&self) -> i32 {
446        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
447        let fragment_max_id = self
448            .fragments
449            .iter()
450            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
451            .max()
452            .copied();
453        let fragment_max_id = fragment_max_id.unwrap_or(-1);
454        schema_max_id.max(fragment_max_id)
455    }
456
457    /// Return the fragments that are newer than the given manifest.
458    /// Note this does not support recycling of fragment ids.
459    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
460        if since.version >= self.version {
461            return Err(Error::io(
462                format!(
463                    "fragments_since: given version {} is newer than manifest version {}",
464                    since.version, self.version
465                ),
466                location!(),
467            ));
468        }
469        let start = since.max_fragment_id();
470        Ok(self
471            .fragments
472            .iter()
473            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
474            .cloned()
475            .collect())
476    }
477
478    /// Find the fragments that contain the rows, identified by the offset range.
479    ///
480    /// Note that the offsets are the logical offsets of rows, not row IDs.
481    ///
482    ///
483    /// Parameters
484    /// ----------
485    /// range: Range<usize>
486    ///     Offset range
487    ///
488    /// Returns
489    /// -------
490    /// Vec<(usize, Fragment)>
491    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
492    ///
493    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
494        let start = range.start;
495        let end = range.end;
496        let idx = self
497            .fragment_offsets
498            .binary_search(&start)
499            .unwrap_or_else(|idx| idx - 1);
500
501        let mut fragments = vec![];
502        for i in idx..self.fragments.len() {
503            if self.fragment_offsets[i] >= end
504                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
505                    <= start
506            {
507                break;
508            }
509            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
510        }
511
512        fragments
513    }
514
515    /// Whether the dataset uses stable row ids.
516    pub fn uses_stable_row_ids(&self) -> bool {
517        self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
518    }
519
520    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
521    /// and can be used to create a dataset
522    pub fn serialized(&self) -> Vec<u8> {
523        let pb_manifest: pb::Manifest = self.into();
524        pb_manifest.encode_to_vec()
525    }
526
527    pub fn should_use_legacy_format(&self) -> bool {
528        self.data_storage_format.version == LEGACY_FORMAT_VERSION
529    }
530
531    /// Get the summary information of a manifest.
532    ///
533    /// This function calculates various statistics about the manifest, including:
534    /// - total_files_size: Total size of all data files in bytes
535    /// - total_fragments: Total number of fragments in the dataset
536    /// - total_data_files: Total number of data files across all fragments
537    /// - total_deletion_files: Total number of deletion files
538    /// - total_data_file_rows: Total number of rows in data files
539    /// - total_deletion_file_rows: Total number of deleted rows in deletion files
540    /// - total_rows: Total number of rows in the dataset
541    pub fn summary(&self) -> ManifestSummary {
542        // Calculate total fragments
543        let mut summary =
544            self.fragments
545                .iter()
546                .fold(ManifestSummary::default(), |mut summary, f| {
547                    // Count data files in the current fragment
548                    summary.total_data_files += f.files.len() as u64;
549                    // Sum the number of rows for the current fragment (if available)
550                    if let Some(num_rows) = f.num_rows() {
551                        summary.total_rows += num_rows as u64;
552                    }
553                    // Sum file sizes for all data files in the current fragment (if available)
554                    for data_file in &f.files {
555                        if let Some(size_bytes) = data_file.file_size_bytes.get() {
556                            summary.total_files_size += size_bytes.get();
557                        }
558                    }
559                    // Check and count if the current fragment has a deletion file
560                    if f.deletion_file.is_some() {
561                        summary.total_deletion_files += 1;
562                    }
563                    // Sum the number of deleted rows from the deletion file (if available)
564                    if let Some(deletion_file) = &f.deletion_file {
565                        if let Some(num_deleted) = deletion_file.num_deleted_rows {
566                            summary.total_deletion_file_rows += num_deleted as u64;
567                        }
568                    }
569                    summary
570                });
571        summary.total_fragments = self.fragments.len() as u64;
572        summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
573
574        summary
575    }
576}
577
578#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
579pub struct BasePath {
580    pub id: u32,
581    pub name: Option<String>,
582    pub is_dataset_root: bool,
583    pub path: String,
584}
585
586#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
587pub struct WriterVersion {
588    pub library: String,
589    pub version: String,
590}
591
592#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
593pub struct DataStorageFormat {
594    pub file_format: String,
595    pub version: String,
596}
597
598const LANCE_FORMAT_NAME: &str = "lance";
599
600impl DataStorageFormat {
601    pub fn new(version: LanceFileVersion) -> Self {
602        Self {
603            file_format: LANCE_FORMAT_NAME.to_string(),
604            version: version.resolve().to_string(),
605        }
606    }
607
608    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
609        self.version.parse::<LanceFileVersion>()
610    }
611}
612
613impl Default for DataStorageFormat {
614    fn default() -> Self {
615        Self::new(LanceFileVersion::default())
616    }
617}
618
619impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
620    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
621        Self {
622            file_format: pb.file_format,
623            version: pb.version,
624        }
625    }
626}
627
628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
629pub enum VersionPart {
630    Major,
631    Minor,
632    Patch,
633}
634
635impl WriterVersion {
636    /// Try to parse the version string as a semver string. Returns None if
637    /// not successful.
638    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
639        let mut parts = self.version.split('.');
640        let major = parts.next().unwrap_or("0").parse().ok()?;
641        let minor = parts.next().unwrap_or("0").parse().ok()?;
642        let patch = parts.next().unwrap_or("0").parse().ok()?;
643        let tag = parts.next();
644        Some((major, minor, patch, tag))
645    }
646
647    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
648        self.semver()
649            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
650    }
651
652    /// Return true if self is older than the given major/minor/patch
653    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
654        let version = self.semver_or_panic();
655        (version.0, version.1, version.2) < (major, minor, patch)
656    }
657
658    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
659        let parts = self.semver_or_panic();
660        let tag = if keep_tag { parts.3 } else { None };
661        let new_parts = match part {
662            VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
663            VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
664            VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
665        };
666        let new_version = if let Some(tag) = tag {
667            format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
668        } else {
669            format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
670        };
671        Self {
672            library: self.library.clone(),
673            version: new_version,
674        }
675    }
676}
677
678impl Default for WriterVersion {
679    #[cfg(not(test))]
680    fn default() -> Self {
681        Self {
682            library: "lance".to_string(),
683            version: env!("CARGO_PKG_VERSION").to_string(),
684        }
685    }
686
687    // Unit tests always run as if they are in the next version.
688    #[cfg(test)]
689    fn default() -> Self {
690        Self {
691            library: "lance".to_string(),
692            version: env!("CARGO_PKG_VERSION").to_string(),
693        }
694        .bump(VersionPart::Patch, true)
695    }
696}
697
698impl ProtoStruct for Manifest {
699    type Proto = pb::Manifest;
700}
701
702impl From<pb::BasePath> for BasePath {
703    fn from(p: pb::BasePath) -> Self {
704        Self {
705            id: p.id,
706            name: p.name,
707            is_dataset_root: p.is_dataset_root,
708            path: p.path,
709        }
710    }
711}
712
713impl TryFrom<pb::Manifest> for Manifest {
714    type Error = Error;
715
716    fn try_from(p: pb::Manifest) -> Result<Self> {
717        let timestamp_nanos = p.timestamp.map(|ts| {
718            let sec = ts.seconds as u128 * 1e9 as u128;
719            let nanos = ts.nanos as u128;
720            sec + nanos
721        });
722        // We only use the writer version if it is fully set.
723        let writer_version = match p.writer_version {
724            Some(pb::manifest::WriterVersion { library, version }) => {
725                Some(WriterVersion { library, version })
726            }
727            _ => None,
728        };
729        let fragments = Arc::new(
730            p.fragments
731                .into_iter()
732                .map(Fragment::try_from)
733                .collect::<Result<Vec<_>>>()?,
734        );
735        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
736        let fields_with_meta = FieldsWithMeta {
737            fields: Fields(p.fields),
738            metadata: p.schema_metadata,
739        };
740
741        if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
742            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
743        {
744            return Err(Error::Internal {
745                message: "All fragments must have row ids".into(),
746                location: location!(),
747            });
748        }
749
750        let data_storage_format = match p.data_format {
751            None => {
752                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
753                    // If there are fragments, they are a better indicator
754                    DataStorageFormat::new(inferred_version)
755                } else {
756                    // No fragments to inspect, best we can do is look at writer flags
757                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
758                        DataStorageFormat::new(LanceFileVersion::Stable)
759                    } else {
760                        DataStorageFormat::new(LanceFileVersion::Legacy)
761                    }
762                }
763            }
764            Some(format) => DataStorageFormat::from(format),
765        };
766
767        let schema = Schema::from(fields_with_meta);
768        let local_schema = schema.retain_storage_class(StorageClass::Default);
769
770        Ok(Self {
771            schema,
772            local_schema,
773            version: p.version,
774            branch: p.branch,
775            writer_version,
776            version_aux_data: p.version_aux_data as usize,
777            index_section: p.index_section.map(|i| i as usize),
778            timestamp_nanos: timestamp_nanos.unwrap_or(0),
779            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
780            reader_feature_flags: p.reader_feature_flags,
781            writer_feature_flags: p.writer_feature_flags,
782            max_fragment_id: p.max_fragment_id,
783            fragments,
784            transaction_file: if p.transaction_file.is_empty() {
785                None
786            } else {
787                Some(p.transaction_file)
788            },
789            fragment_offsets,
790            next_row_id: p.next_row_id,
791            data_storage_format,
792            config: p.config,
793            table_metadata: p.table_metadata,
794            blob_dataset_version: if p.blob_dataset_version == 0 {
795                None
796            } else {
797                Some(p.blob_dataset_version)
798            },
799            base_paths: p
800                .base_paths
801                .iter()
802                .map(|item| (item.id, item.clone().into()))
803                .collect(),
804        })
805    }
806}
807
808impl From<&Manifest> for pb::Manifest {
809    fn from(m: &Manifest) -> Self {
810        let timestamp_nanos = if m.timestamp_nanos == 0 {
811            None
812        } else {
813            let nanos = m.timestamp_nanos % 1e9 as u128;
814            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
815            Some(Timestamp {
816                seconds,
817                nanos: nanos as i32,
818            })
819        };
820        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
821        Self {
822            fields: fields_with_meta.fields.0,
823            schema_metadata: m
824                .schema
825                .metadata
826                .iter()
827                .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
828                .collect(),
829            version: m.version,
830            branch: m.branch.clone(),
831            writer_version: m
832                .writer_version
833                .as_ref()
834                .map(|wv| pb::manifest::WriterVersion {
835                    library: wv.library.clone(),
836                    version: wv.version.clone(),
837                }),
838            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
839            table_metadata: m.table_metadata.clone(),
840            version_aux_data: m.version_aux_data as u64,
841            index_section: m.index_section.map(|i| i as u64),
842            timestamp: timestamp_nanos,
843            tag: m.tag.clone().unwrap_or_default(),
844            reader_feature_flags: m.reader_feature_flags,
845            writer_feature_flags: m.writer_feature_flags,
846            max_fragment_id: m.max_fragment_id,
847            transaction_file: m.transaction_file.clone().unwrap_or_default(),
848            next_row_id: m.next_row_id,
849            data_format: Some(pb::manifest::DataStorageFormat {
850                file_format: m.data_storage_format.file_format.clone(),
851                version: m.data_storage_format.version.clone(),
852            }),
853            config: m.config.clone(),
854            blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
855            base_paths: m
856                .base_paths
857                .values()
858                .map(|base_path| pb::BasePath {
859                    id: base_path.id,
860                    name: base_path.name.clone(),
861                    is_dataset_root: base_path.is_dataset_root,
862                    path: base_path.path.clone(),
863                })
864                .collect(),
865        }
866    }
867}
868
869#[async_trait]
870pub trait SelfDescribingFileReader {
871    /// Open a file reader without any cached schema
872    ///
873    /// In this case the schema will first need to be loaded
874    /// from the file itself.
875    ///
876    /// When loading files from a dataset it is preferable to use
877    /// the fragment reader to avoid this overhead.
878    async fn try_new_self_described(
879        object_store: &ObjectStore,
880        path: &Path,
881        cache: Option<&LanceCache>,
882    ) -> Result<Self>
883    where
884        Self: Sized,
885    {
886        let reader = object_store.open(path).await?;
887        Self::try_new_self_described_from_reader(reader.into(), cache).await
888    }
889
890    async fn try_new_self_described_from_reader(
891        reader: Arc<dyn Reader>,
892        cache: Option<&LanceCache>,
893    ) -> Result<Self>
894    where
895        Self: Sized;
896}
897
898#[async_trait]
899impl SelfDescribingFileReader for FileReader {
900    async fn try_new_self_described_from_reader(
901        reader: Arc<dyn Reader>,
902        cache: Option<&LanceCache>,
903    ) -> Result<Self> {
904        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
905        let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
906            message: format!(
907                "Attempt to open file at {} as self-describing but it did not contain a manifest",
908                reader.path(),
909            ),
910            location: location!(),
911        })?;
912        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
913        if manifest.should_use_legacy_format() {
914            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
915        }
916        let schema = manifest.schema;
917        let max_field_id = schema.max_field_id().unwrap_or_default();
918        Self::try_new_from_reader(
919            reader.path(),
920            reader.clone(),
921            Some(metadata),
922            schema,
923            0,
924            0,
925            max_field_id,
926            cache,
927        )
928        .await
929    }
930}
931
932#[cfg(test)]
933mod tests {
934    use crate::format::{DataFile, DeletionFile, DeletionFileType};
935    use std::num::NonZero;
936
937    use super::*;
938
939    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
940    use lance_core::datatypes::Field;
941
942    #[test]
943    fn test_writer_version() {
944        let wv = WriterVersion::default();
945        assert_eq!(wv.library, "lance");
946        let parts = wv.semver().unwrap();
947        assert_eq!(
948            parts,
949            (
950                env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
951                env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
952                // Unit tests run against (major,minor,patch + 1)
953                env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
954                None
955            )
956        );
957        assert_eq!(
958            format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
959            env!("CARGO_PKG_VERSION")
960        );
961        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
962            let bumped = wv.bump(*part, false);
963            let bumped_parts = bumped.semver_or_panic();
964            assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
965        }
966    }
967
968    #[test]
969    fn test_fragments_by_offset_range() {
970        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
971            "a",
972            arrow_schema::DataType::Int64,
973            false,
974        )]);
975        let schema = Schema::try_from(&arrow_schema).unwrap();
976        let fragments = vec![
977            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
978            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
979            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
980        ];
981        let manifest = Manifest::new(
982            schema,
983            Arc::new(fragments),
984            DataStorageFormat::default(),
985            /*blob_dataset_version= */ None,
986            HashMap::new(),
987        );
988
989        let actual = manifest.fragments_by_offset_range(0..10);
990        assert_eq!(actual.len(), 1);
991        assert_eq!(actual[0].0, 0);
992        assert_eq!(actual[0].1.id, 0);
993
994        let actual = manifest.fragments_by_offset_range(5..15);
995        assert_eq!(actual.len(), 2);
996        assert_eq!(actual[0].0, 0);
997        assert_eq!(actual[0].1.id, 0);
998        assert_eq!(actual[1].0, 10);
999        assert_eq!(actual[1].1.id, 1);
1000
1001        let actual = manifest.fragments_by_offset_range(15..50);
1002        assert_eq!(actual.len(), 2);
1003        assert_eq!(actual[0].0, 10);
1004        assert_eq!(actual[0].1.id, 1);
1005        assert_eq!(actual[1].0, 25);
1006        assert_eq!(actual[1].1.id, 2);
1007
1008        // Out of range
1009        let actual = manifest.fragments_by_offset_range(45..100);
1010        assert!(actual.is_empty());
1011
1012        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1013    }
1014
1015    #[test]
1016    fn test_max_field_id() {
1017        // Validate that max field id handles varying field ids by fragment.
1018        let mut field0 =
1019            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1020        field0.set_id(-1, &mut 0);
1021        let mut field2 =
1022            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1023        field2.set_id(-1, &mut 2);
1024
1025        let schema = Schema {
1026            fields: vec![field0, field2],
1027            metadata: Default::default(),
1028        };
1029        let fragments = vec![
1030            Fragment {
1031                id: 0,
1032                files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
1033                deletion_file: None,
1034                row_id_meta: None,
1035                physical_rows: None,
1036            },
1037            Fragment {
1038                id: 1,
1039                files: vec![
1040                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
1041                    DataFile::new_legacy_from_fields("path3", vec![2]),
1042                ],
1043                deletion_file: None,
1044                row_id_meta: None,
1045                physical_rows: None,
1046            },
1047        ];
1048
1049        let manifest = Manifest::new(
1050            schema,
1051            Arc::new(fragments),
1052            DataStorageFormat::default(),
1053            /*blob_dataset_version= */ None,
1054            HashMap::new(),
1055        );
1056
1057        assert_eq!(manifest.max_field_id(), 43);
1058    }
1059
1060    #[test]
1061    fn test_config() {
1062        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1063            "a",
1064            arrow_schema::DataType::Int64,
1065            false,
1066        )]);
1067        let schema = Schema::try_from(&arrow_schema).unwrap();
1068        let fragments = vec![
1069            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1070            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1071            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1072        ];
1073        let mut manifest = Manifest::new(
1074            schema,
1075            Arc::new(fragments),
1076            DataStorageFormat::default(),
1077            /*blob_dataset_version= */ None,
1078            HashMap::new(),
1079        );
1080
1081        let mut config = manifest.config.clone();
1082        config.insert("lance.test".to_string(), "value".to_string());
1083        config.insert("other-key".to_string(), "other-value".to_string());
1084
1085        manifest.config_mut().extend(config.clone());
1086        assert_eq!(manifest.config, config.clone());
1087
1088        config.remove("other-key");
1089        manifest.config_mut().remove("other-key");
1090        assert_eq!(manifest.config, config);
1091    }
1092
1093    #[test]
1094    fn test_manifest_summary() {
1095        // Step 1: test empty manifest summary
1096        let arrow_schema = ArrowSchema::new(vec![
1097            ArrowField::new("id", arrow_schema::DataType::Int64, false),
1098            ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1099        ]);
1100        let schema = Schema::try_from(&arrow_schema).unwrap();
1101
1102        let empty_manifest = Manifest::new(
1103            schema.clone(),
1104            Arc::new(vec![]),
1105            DataStorageFormat::default(),
1106            None,
1107            HashMap::new(),
1108        );
1109
1110        let empty_summary = empty_manifest.summary();
1111        assert_eq!(empty_summary.total_rows, 0);
1112        assert_eq!(empty_summary.total_files_size, 0);
1113        assert_eq!(empty_summary.total_fragments, 0);
1114        assert_eq!(empty_summary.total_data_files, 0);
1115        assert_eq!(empty_summary.total_deletion_file_rows, 0);
1116        assert_eq!(empty_summary.total_data_file_rows, 0);
1117        assert_eq!(empty_summary.total_deletion_files, 0);
1118
1119        // Step 2: write empty files and verify summary
1120        let empty_fragments = vec![
1121            Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1122            Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1123        ];
1124
1125        let empty_files_manifest = Manifest::new(
1126            schema.clone(),
1127            Arc::new(empty_fragments),
1128            DataStorageFormat::default(),
1129            None,
1130            HashMap::new(),
1131        );
1132
1133        let empty_files_summary = empty_files_manifest.summary();
1134        assert_eq!(empty_files_summary.total_rows, 0);
1135        assert_eq!(empty_files_summary.total_files_size, 0);
1136        assert_eq!(empty_files_summary.total_fragments, 2);
1137        assert_eq!(empty_files_summary.total_data_files, 2);
1138        assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1139        assert_eq!(empty_files_summary.total_data_file_rows, 0);
1140        assert_eq!(empty_files_summary.total_deletion_files, 0);
1141
1142        // Step 3: write real data and verify summary
1143        let real_fragments = vec![
1144            Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1145            Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1146            Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1147        ];
1148
1149        let real_data_manifest = Manifest::new(
1150            schema.clone(),
1151            Arc::new(real_fragments),
1152            DataStorageFormat::default(),
1153            None,
1154            HashMap::new(),
1155        );
1156
1157        let real_data_summary = real_data_manifest.summary();
1158        assert_eq!(real_data_summary.total_rows, 425); // 100 + 250 + 75
1159        assert_eq!(real_data_summary.total_files_size, 0); // Zero for unknown
1160        assert_eq!(real_data_summary.total_fragments, 3);
1161        assert_eq!(real_data_summary.total_data_files, 3);
1162        assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1163        assert_eq!(real_data_summary.total_data_file_rows, 425);
1164        assert_eq!(real_data_summary.total_deletion_files, 0);
1165
1166        let file_version = LanceFileVersion::default();
1167        // Step 4: write deletion files and verify summary
1168        let mut fragment_with_deletion = Fragment::new(0)
1169            .with_file(
1170                "data_with_deletion.lance",
1171                vec![0, 1],
1172                vec![0, 1],
1173                &file_version,
1174                NonZero::new(1000),
1175            )
1176            .with_physical_rows(50);
1177        fragment_with_deletion.deletion_file = Some(DeletionFile {
1178            read_version: 123,
1179            id: 456,
1180            file_type: DeletionFileType::Array,
1181            num_deleted_rows: Some(10),
1182            base_id: None,
1183        });
1184
1185        let manifest_with_deletion = Manifest::new(
1186            schema,
1187            Arc::new(vec![fragment_with_deletion]),
1188            DataStorageFormat::default(),
1189            None,
1190            HashMap::new(),
1191        );
1192
1193        let deletion_summary = manifest_with_deletion.summary();
1194        assert_eq!(deletion_summary.total_rows, 40); // 50 - 10
1195        assert_eq!(deletion_summary.total_files_size, 1000);
1196        assert_eq!(deletion_summary.total_fragments, 1);
1197        assert_eq!(deletion_summary.total_data_files, 1);
1198        assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1199        assert_eq!(deletion_summary.total_data_file_rows, 50);
1200        assert_eq!(deletion_summary.total_deletion_files, 1);
1201
1202        //Just verify the transformation is OK
1203        let stats_map: BTreeMap<String, String> = deletion_summary.into();
1204        assert_eq!(stats_map.len(), 7)
1205    }
1206}