lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
8use lance_file::reader::FileReader;
9use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::{Schema, StorageClass};
23use lance_core::{Error, Result};
24use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
25use lance_io::utils::read_struct;
26use snafu::location;
27
28/// Manifest of a dataset
29///
30///  * Schema
31///  * Version
32///  * Fragments.
33///  * Indices.
34#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36    /// Dataset schema.
37    pub schema: Schema,
38
39    /// Local schema, only containing fields with the default storage class (not blobs)
40    pub local_schema: Schema,
41
42    /// Dataset version
43    pub version: u64,
44
45    /// Branch name, None if the dataset is the main branch.
46    pub branch: Option<String>,
47
48    /// Version of the writer library that wrote this manifest.
49    pub writer_version: Option<WriterVersion>,
50
51    /// Fragments, the pieces to build the dataset.
52    ///
53    /// This list is stored in order, sorted by fragment id.  However, the fragment id
54    /// sequence may have gaps.
55    pub fragments: Arc<Vec<Fragment>>,
56
57    /// The file position of the version aux data.
58    pub version_aux_data: usize,
59
60    /// The file position of the index metadata.
61    pub index_section: Option<usize>,
62
63    /// The creation timestamp with nanosecond resolution as 128-bit integer
64    pub timestamp_nanos: u128,
65
66    /// An optional string tag for this version
67    pub tag: Option<String>,
68
69    /// The reader flags
70    pub reader_feature_flags: u64,
71
72    /// The writer flags
73    pub writer_feature_flags: u64,
74
75    /// The max fragment id used so far
76    /// None means never set, Some(0) means max ID used so far is 0
77    pub max_fragment_id: Option<u32>,
78
79    /// The path to the transaction file, relative to the root of the dataset
80    pub transaction_file: Option<String>,
81
82    /// Precomputed logic offset of each fragment
83    /// accelerating the fragment search using offset ranges.
84    fragment_offsets: Vec<usize>,
85
86    /// The max row id used so far.
87    pub next_row_id: u64,
88
89    /// The storage format of the data files.
90    pub data_storage_format: DataStorageFormat,
91
92    /// Table configuration.
93    pub config: HashMap<String, String>,
94
95    /// Table metadata.
96    ///
97    /// This is a key-value map that can be used to store arbitrary metadata
98    /// associated with the table. This is different than configuration, which
99    /// is used to tell libraries how to read, write, or manage the table.
100    pub table_metadata: HashMap<String, String>,
101
102    /// Blob dataset version
103    pub blob_dataset_version: Option<u64>,
104
105    /* external base paths */
106    pub base_paths: HashMap<u32, BasePath>,
107}
108
109// We use the most significant bit to indicate that a transaction is detached
110pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
111
112pub fn is_detached_version(version: u64) -> bool {
113    version & DETACHED_VERSION_MASK != 0
114}
115
116fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
117    fragments
118        .iter()
119        .map(|f| f.num_rows().unwrap_or_default())
120        .chain([0]) // Make the last offset to be the full-length of the dataset.
121        .scan(0_usize, |offset, len| {
122            let start = *offset;
123            *offset += len;
124            Some(start)
125        })
126        .collect()
127}
128
129#[derive(Default)]
130pub struct ManifestSummary {
131    pub total_fragments: u64,
132    pub total_data_files: u64,
133    pub total_files_size: u64,
134    pub total_deletion_files: u64,
135    pub total_data_file_rows: u64,
136    pub total_deletion_file_rows: u64,
137    pub total_rows: u64,
138}
139
140impl From<ManifestSummary> for BTreeMap<String, String> {
141    fn from(summary: ManifestSummary) -> Self {
142        let mut stats_map = Self::new();
143        stats_map.insert(
144            "total_fragments".to_string(),
145            summary.total_fragments.to_string(),
146        );
147        stats_map.insert(
148            "total_data_files".to_string(),
149            summary.total_data_files.to_string(),
150        );
151        stats_map.insert(
152            "total_files_size".to_string(),
153            summary.total_files_size.to_string(),
154        );
155        stats_map.insert(
156            "total_deletion_files".to_string(),
157            summary.total_deletion_files.to_string(),
158        );
159        stats_map.insert(
160            "total_data_file_rows".to_string(),
161            summary.total_data_file_rows.to_string(),
162        );
163        stats_map.insert(
164            "total_deletion_file_rows".to_string(),
165            summary.total_deletion_file_rows.to_string(),
166        );
167        stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
168        stats_map
169    }
170}
171
172impl Manifest {
173    pub fn new(
174        schema: Schema,
175        fragments: Arc<Vec<Fragment>>,
176        data_storage_format: DataStorageFormat,
177        blob_dataset_version: Option<u64>,
178        base_paths: HashMap<u32, BasePath>,
179    ) -> Self {
180        let fragment_offsets = compute_fragment_offsets(&fragments);
181        let local_schema = schema.retain_storage_class(StorageClass::Default);
182
183        Self {
184            schema,
185            local_schema,
186            version: 1,
187            branch: None,
188            writer_version: Some(WriterVersion::default()),
189            fragments,
190            version_aux_data: 0,
191            index_section: None,
192            timestamp_nanos: 0,
193            tag: None,
194            reader_feature_flags: 0,
195            writer_feature_flags: 0,
196            max_fragment_id: None,
197            transaction_file: None,
198            fragment_offsets,
199            next_row_id: 0,
200            data_storage_format,
201            config: HashMap::new(),
202            table_metadata: HashMap::new(),
203            blob_dataset_version,
204            base_paths,
205        }
206    }
207
208    pub fn new_from_previous(
209        previous: &Self,
210        schema: Schema,
211        fragments: Arc<Vec<Fragment>>,
212        new_blob_version: Option<u64>,
213    ) -> Self {
214        let fragment_offsets = compute_fragment_offsets(&fragments);
215        let local_schema = schema.retain_storage_class(StorageClass::Default);
216
217        let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
218
219        Self {
220            schema,
221            local_schema,
222            version: previous.version + 1,
223            branch: previous.branch.clone(),
224            writer_version: Some(WriterVersion::default()),
225            fragments,
226            version_aux_data: 0,
227            index_section: None, // Caller should update index if they want to keep them.
228            timestamp_nanos: 0,  // This will be set on commit
229            tag: None,
230            reader_feature_flags: 0, // These will be set on commit
231            writer_feature_flags: 0, // These will be set on commit
232            max_fragment_id: previous.max_fragment_id,
233            transaction_file: None,
234            fragment_offsets,
235            next_row_id: previous.next_row_id,
236            data_storage_format: previous.data_storage_format.clone(),
237            config: previous.config.clone(),
238            table_metadata: previous.table_metadata.clone(),
239            blob_dataset_version,
240            base_paths: previous.base_paths.clone(),
241        }
242    }
243
244    /// Performs a shallow_clone of the manifest entirely in memory without:
245    /// - Any persistent storage operations
246    /// - Modifications to the original data
247    /// - If the shallow clone is for branch, ref_name is the source branch
248    pub fn shallow_clone(
249        &self,
250        ref_name: Option<String>,
251        ref_path: String,
252        ref_base_id: u32,
253        branch_name: Option<String>,
254        transaction_file: String,
255    ) -> Self {
256        let cloned_fragments = self
257            .fragments
258            .as_ref()
259            .iter()
260            .map(|fragment| {
261                let mut cloned_fragment = fragment.clone();
262                for file in &mut cloned_fragment.files {
263                    if file.base_id.is_none() {
264                        file.base_id = Some(ref_base_id);
265                    }
266                }
267
268                if let Some(deletion) = &mut cloned_fragment.deletion_file {
269                    if deletion.base_id.is_none() {
270                        deletion.base_id = Some(ref_base_id);
271                    }
272                }
273                cloned_fragment
274            })
275            .collect::<Vec<_>>();
276
277        Self {
278            schema: self.schema.clone(),
279            local_schema: self.local_schema.clone(),
280            version: self.version,
281            branch: branch_name,
282            writer_version: self.writer_version.clone(),
283            fragments: Arc::new(cloned_fragments),
284            version_aux_data: self.version_aux_data,
285            index_section: None, // These will be set on commit
286            timestamp_nanos: self.timestamp_nanos,
287            tag: None,
288            reader_feature_flags: 0, // These will be set on commit
289            writer_feature_flags: 0, // These will be set on commit
290            max_fragment_id: self.max_fragment_id,
291            transaction_file: Some(transaction_file),
292            fragment_offsets: self.fragment_offsets.clone(),
293            next_row_id: self.next_row_id,
294            data_storage_format: self.data_storage_format.clone(),
295            config: self.config.clone(),
296            blob_dataset_version: self.blob_dataset_version,
297            base_paths: {
298                let mut base_paths = self.base_paths.clone();
299                let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
300                base_paths.insert(ref_base_id, base_path);
301                base_paths
302            },
303            table_metadata: self.table_metadata.clone(),
304        }
305    }
306
307    /// Return the `timestamp_nanos` value as a Utc DateTime
308    pub fn timestamp(&self) -> DateTime<Utc> {
309        let nanos = self.timestamp_nanos % 1_000_000_000;
310        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
311        Utc.from_utc_datetime(
312            &DateTime::from_timestamp(seconds, nanos as u32)
313                .unwrap_or_default()
314                .naive_utc(),
315        )
316    }
317
318    /// Set the `timestamp_nanos` value from a Utc DateTime
319    pub fn set_timestamp(&mut self, nanos: u128) {
320        self.timestamp_nanos = nanos;
321    }
322
323    /// Get a mutable reference to the config
324    pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
325        &mut self.config
326    }
327
328    /// Get a mutable reference to the table metadata
329    pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
330        &mut self.table_metadata
331    }
332
333    /// Get a mutable reference to the schema metadata
334    pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
335        &mut self.schema.metadata
336    }
337
338    /// Get a mutable reference to the field metadata for a specific field id
339    ///
340    /// Returns None if the field does not exist in the schema.
341    pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
342        self.schema
343            .field_by_id_mut(field_id)
344            .map(|field| &mut field.metadata)
345    }
346
347    /// Set the `config` from an iterator
348    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
349    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
350        self.config.extend(upsert_values);
351    }
352
353    /// Delete `config` keys using a slice of keys
354    #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
355    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
356        self.config
357            .retain(|key, _| !delete_keys.contains(&key.as_str()));
358    }
359
360    /// Replaces the schema metadata with the given key-value pairs.
361    #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
362    pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
363        self.schema.metadata = new_metadata;
364    }
365
366    /// Replaces the metadata of the field with the given id with the given key-value pairs.
367    ///
368    /// If the field does not exist in the schema, this is a no-op.
369    #[deprecated(
370        note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
371    )]
372    pub fn replace_field_metadata(
373        &mut self,
374        field_id: i32,
375        new_metadata: HashMap<String, String>,
376    ) -> Result<()> {
377        if let Some(field) = self.schema.field_by_id_mut(field_id) {
378            field.metadata = new_metadata;
379            Ok(())
380        } else {
381            Err(Error::invalid_input(
382                format!(
383                    "Field with id {} does not exist for replace_field_metadata",
384                    field_id
385                ),
386                location!(),
387            ))
388        }
389    }
390
391    /// Check the current fragment list and update the high water mark
392    pub fn update_max_fragment_id(&mut self) {
393        // If there are no fragments, don't update max_fragment_id
394        if self.fragments.is_empty() {
395            return;
396        }
397
398        let max_fragment_id = self
399            .fragments
400            .iter()
401            .map(|f| f.id)
402            .max()
403            .unwrap() // Safe because we checked fragments is not empty
404            .try_into()
405            .unwrap();
406
407        match self.max_fragment_id {
408            None => {
409                // First time being set
410                self.max_fragment_id = Some(max_fragment_id);
411            }
412            Some(current_max) => {
413                // Only update if the computed max is greater than current
414                // This preserves the high water mark even when fragments are deleted
415                if max_fragment_id > current_max {
416                    self.max_fragment_id = Some(max_fragment_id);
417                }
418            }
419        }
420    }
421
422    /// Return the max fragment id.
423    /// Note this does not support recycling of fragment ids.
424    ///
425    /// This will return None if there are no fragments and max_fragment_id was never set.
426    pub fn max_fragment_id(&self) -> Option<u64> {
427        if let Some(max_id) = self.max_fragment_id {
428            // Return the stored high water mark
429            Some(max_id.into())
430        } else {
431            // Not yet set, compute from fragment list
432            self.fragments.iter().map(|f| f.id).max()
433        }
434    }
435
436    /// Get the max used field id
437    ///
438    /// This is different than [Schema::max_field_id] because it also considers
439    /// the field ids in the data files that have been dropped from the schema.
440    pub fn max_field_id(&self) -> i32 {
441        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
442        let fragment_max_id = self
443            .fragments
444            .iter()
445            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
446            .max()
447            .copied();
448        let fragment_max_id = fragment_max_id.unwrap_or(-1);
449        schema_max_id.max(fragment_max_id)
450    }
451
452    /// Return the fragments that are newer than the given manifest.
453    /// Note this does not support recycling of fragment ids.
454    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
455        if since.version >= self.version {
456            return Err(Error::io(
457                format!(
458                    "fragments_since: given version {} is newer than manifest version {}",
459                    since.version, self.version
460                ),
461                location!(),
462            ));
463        }
464        let start = since.max_fragment_id();
465        Ok(self
466            .fragments
467            .iter()
468            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
469            .cloned()
470            .collect())
471    }
472
473    /// Find the fragments that contain the rows, identified by the offset range.
474    ///
475    /// Note that the offsets are the logical offsets of rows, not row IDs.
476    ///
477    ///
478    /// Parameters
479    /// ----------
480    /// range: Range<usize>
481    ///     Offset range
482    ///
483    /// Returns
484    /// -------
485    /// Vec<(usize, Fragment)>
486    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
487    ///
488    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
489        let start = range.start;
490        let end = range.end;
491        let idx = self
492            .fragment_offsets
493            .binary_search(&start)
494            .unwrap_or_else(|idx| idx - 1);
495
496        let mut fragments = vec![];
497        for i in idx..self.fragments.len() {
498            if self.fragment_offsets[i] >= end
499                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
500                    <= start
501            {
502                break;
503            }
504            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
505        }
506
507        fragments
508    }
509
510    /// Whether the dataset uses stable row ids.
511    pub fn uses_stable_row_ids(&self) -> bool {
512        self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
513    }
514
515    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
516    /// and can be used to create a dataset
517    pub fn serialized(&self) -> Vec<u8> {
518        let pb_manifest: pb::Manifest = self.into();
519        pb_manifest.encode_to_vec()
520    }
521
522    pub fn should_use_legacy_format(&self) -> bool {
523        self.data_storage_format.version == LEGACY_FORMAT_VERSION
524    }
525
526    /// Get the summary information of a manifest.
527    ///
528    /// This function calculates various statistics about the manifest, including:
529    /// - total_files_size: Total size of all data files in bytes
530    /// - total_fragments: Total number of fragments in the dataset
531    /// - total_data_files: Total number of data files across all fragments
532    /// - total_deletion_files: Total number of deletion files
533    /// - total_data_file_rows: Total number of rows in data files
534    /// - total_deletion_file_rows: Total number of deleted rows in deletion files
535    /// - total_rows: Total number of rows in the dataset
536    pub fn summary(&self) -> ManifestSummary {
537        // Calculate total fragments
538        let mut summary =
539            self.fragments
540                .iter()
541                .fold(ManifestSummary::default(), |mut summary, f| {
542                    // Count data files in the current fragment
543                    summary.total_data_files += f.files.len() as u64;
544                    // Sum the number of rows for the current fragment (if available)
545                    if let Some(num_rows) = f.num_rows() {
546                        summary.total_rows += num_rows as u64;
547                    }
548                    // Sum file sizes for all data files in the current fragment (if available)
549                    for data_file in &f.files {
550                        if let Some(size_bytes) = data_file.file_size_bytes.get() {
551                            summary.total_files_size += size_bytes.get();
552                        }
553                    }
554                    // Check and count if the current fragment has a deletion file
555                    if f.deletion_file.is_some() {
556                        summary.total_deletion_files += 1;
557                    }
558                    // Sum the number of deleted rows from the deletion file (if available)
559                    if let Some(deletion_file) = &f.deletion_file {
560                        if let Some(num_deleted) = deletion_file.num_deleted_rows {
561                            summary.total_deletion_file_rows += num_deleted as u64;
562                        }
563                    }
564                    summary
565                });
566        summary.total_fragments = self.fragments.len() as u64;
567        summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
568
569        summary
570    }
571}
572
573#[derive(Debug, Clone, PartialEq)]
574pub struct BasePath {
575    pub id: u32,
576    pub name: Option<String>,
577    pub is_dataset_root: bool,
578    /// The full URI string (e.g., "s3://bucket/path")
579    pub path: String,
580}
581
582impl BasePath {
583    /// Create a new BasePath
584    ///
585    /// # Arguments
586    ///
587    /// * `id` - Unique identifier for this base path
588    /// * `path` - Full URI string (e.g., "s3://bucket/path", "/local/path")
589    /// * `name` - Optional human-readable name for this base
590    /// * `is_dataset_root` - Whether this is the dataset root or a data-only base
591    pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
592        Self {
593            id,
594            name,
595            is_dataset_root,
596            path,
597        }
598    }
599
600    /// Extract the object store path from this BasePath's URI.
601    ///
602    /// This is a synchronous operation that parses the URI without initializing an object store.
603    pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
604        ObjectStore::extract_path_from_uri(registry, &self.path)
605    }
606}
607
608impl DeepSizeOf for BasePath {
609    fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
610        self.name.deep_size_of_children(context)
611            + self.path.deep_size_of_children(context) * 2
612            + size_of::<bool>()
613    }
614}
615
616#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
617pub struct WriterVersion {
618    pub library: String,
619    pub version: String,
620}
621
622#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
623pub struct DataStorageFormat {
624    pub file_format: String,
625    pub version: String,
626}
627
628const LANCE_FORMAT_NAME: &str = "lance";
629
630impl DataStorageFormat {
631    pub fn new(version: LanceFileVersion) -> Self {
632        Self {
633            file_format: LANCE_FORMAT_NAME.to_string(),
634            version: version.resolve().to_string(),
635        }
636    }
637
638    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
639        self.version.parse::<LanceFileVersion>()
640    }
641}
642
643impl Default for DataStorageFormat {
644    fn default() -> Self {
645        Self::new(LanceFileVersion::default())
646    }
647}
648
649impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
650    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
651        Self {
652            file_format: pb.file_format,
653            version: pb.version,
654        }
655    }
656}
657
658#[derive(Debug, Clone, Copy, PartialEq, Eq)]
659pub enum VersionPart {
660    Major,
661    Minor,
662    Patch,
663}
664
665impl WriterVersion {
666    /// Try to parse the version string as a semver string. Returns None if
667    /// not successful.
668    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
669        // First split by '-' to separate the version from the pre-release tag
670        let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
671            (
672                &self.version[..dash_idx],
673                Some(&self.version[dash_idx + 1..]),
674            )
675        } else {
676            (self.version.as_str(), None)
677        };
678
679        let mut parts = version_part.split('.');
680        let major = parts.next().unwrap_or("0").parse().ok()?;
681        let minor = parts.next().unwrap_or("0").parse().ok()?;
682        let patch = parts.next().unwrap_or("0").parse().ok()?;
683
684        Some((major, minor, patch, tag))
685    }
686
687    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
688        self.semver()
689            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
690    }
691
692    /// Return true if self is older than the given major/minor/patch
693    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
694        let version = self.semver_or_panic();
695        (version.0, version.1, version.2) < (major, minor, patch)
696    }
697
698    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
699        let parts = self.semver_or_panic();
700        let tag = if keep_tag { parts.3 } else { None };
701        let new_parts = match part {
702            VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
703            VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
704            VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
705        };
706        let new_version = if let Some(tag) = tag {
707            format!("{}.{}.{}-{}", new_parts.0, new_parts.1, new_parts.2, tag)
708        } else {
709            format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
710        };
711        Self {
712            library: self.library.clone(),
713            version: new_version,
714        }
715    }
716}
717
718impl Default for WriterVersion {
719    #[cfg(not(test))]
720    fn default() -> Self {
721        Self {
722            library: "lance".to_string(),
723            version: env!("CARGO_PKG_VERSION").to_string(),
724        }
725    }
726
727    // Unit tests always run as if they are in the next version.
728    #[cfg(test)]
729    fn default() -> Self {
730        Self {
731            library: "lance".to_string(),
732            version: env!("CARGO_PKG_VERSION").to_string(),
733        }
734        .bump(VersionPart::Patch, true)
735    }
736}
737
738impl ProtoStruct for Manifest {
739    type Proto = pb::Manifest;
740}
741
742impl From<pb::BasePath> for BasePath {
743    fn from(p: pb::BasePath) -> Self {
744        Self::new(p.id, p.path, p.name, p.is_dataset_root)
745    }
746}
747
748impl From<BasePath> for pb::BasePath {
749    fn from(p: BasePath) -> Self {
750        Self {
751            id: p.id,
752            name: p.name,
753            is_dataset_root: p.is_dataset_root,
754            path: p.path,
755        }
756    }
757}
758
759impl TryFrom<pb::Manifest> for Manifest {
760    type Error = Error;
761
762    fn try_from(p: pb::Manifest) -> Result<Self> {
763        let timestamp_nanos = p.timestamp.map(|ts| {
764            let sec = ts.seconds as u128 * 1e9 as u128;
765            let nanos = ts.nanos as u128;
766            sec + nanos
767        });
768        // We only use the writer version if it is fully set.
769        let writer_version = match p.writer_version {
770            Some(pb::manifest::WriterVersion { library, version }) => {
771                Some(WriterVersion { library, version })
772            }
773            _ => None,
774        };
775        let fragments = Arc::new(
776            p.fragments
777                .into_iter()
778                .map(Fragment::try_from)
779                .collect::<Result<Vec<_>>>()?,
780        );
781        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
782        let fields_with_meta = FieldsWithMeta {
783            fields: Fields(p.fields),
784            metadata: p.schema_metadata,
785        };
786
787        if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
788            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
789        {
790            return Err(Error::Internal {
791                message: "All fragments must have row ids".into(),
792                location: location!(),
793            });
794        }
795
796        let data_storage_format = match p.data_format {
797            None => {
798                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
799                    // If there are fragments, they are a better indicator
800                    DataStorageFormat::new(inferred_version)
801                } else {
802                    // No fragments to inspect, best we can do is look at writer flags
803                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
804                        DataStorageFormat::new(LanceFileVersion::Stable)
805                    } else {
806                        DataStorageFormat::new(LanceFileVersion::Legacy)
807                    }
808                }
809            }
810            Some(format) => DataStorageFormat::from(format),
811        };
812
813        let schema = Schema::from(fields_with_meta);
814        let local_schema = schema.retain_storage_class(StorageClass::Default);
815
816        Ok(Self {
817            schema,
818            local_schema,
819            version: p.version,
820            branch: p.branch,
821            writer_version,
822            version_aux_data: p.version_aux_data as usize,
823            index_section: p.index_section.map(|i| i as usize),
824            timestamp_nanos: timestamp_nanos.unwrap_or(0),
825            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
826            reader_feature_flags: p.reader_feature_flags,
827            writer_feature_flags: p.writer_feature_flags,
828            max_fragment_id: p.max_fragment_id,
829            fragments,
830            transaction_file: if p.transaction_file.is_empty() {
831                None
832            } else {
833                Some(p.transaction_file)
834            },
835            fragment_offsets,
836            next_row_id: p.next_row_id,
837            data_storage_format,
838            config: p.config,
839            table_metadata: p.table_metadata,
840            blob_dataset_version: if p.blob_dataset_version == 0 {
841                None
842            } else {
843                Some(p.blob_dataset_version)
844            },
845            base_paths: p
846                .base_paths
847                .iter()
848                .map(|item| (item.id, item.clone().into()))
849                .collect(),
850        })
851    }
852}
853
854impl From<&Manifest> for pb::Manifest {
855    fn from(m: &Manifest) -> Self {
856        let timestamp_nanos = if m.timestamp_nanos == 0 {
857            None
858        } else {
859            let nanos = m.timestamp_nanos % 1e9 as u128;
860            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
861            Some(Timestamp {
862                seconds,
863                nanos: nanos as i32,
864            })
865        };
866        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
867        Self {
868            fields: fields_with_meta.fields.0,
869            schema_metadata: m
870                .schema
871                .metadata
872                .iter()
873                .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
874                .collect(),
875            version: m.version,
876            branch: m.branch.clone(),
877            writer_version: m
878                .writer_version
879                .as_ref()
880                .map(|wv| pb::manifest::WriterVersion {
881                    library: wv.library.clone(),
882                    version: wv.version.clone(),
883                }),
884            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
885            table_metadata: m.table_metadata.clone(),
886            version_aux_data: m.version_aux_data as u64,
887            index_section: m.index_section.map(|i| i as u64),
888            timestamp: timestamp_nanos,
889            tag: m.tag.clone().unwrap_or_default(),
890            reader_feature_flags: m.reader_feature_flags,
891            writer_feature_flags: m.writer_feature_flags,
892            max_fragment_id: m.max_fragment_id,
893            transaction_file: m.transaction_file.clone().unwrap_or_default(),
894            next_row_id: m.next_row_id,
895            data_format: Some(pb::manifest::DataStorageFormat {
896                file_format: m.data_storage_format.file_format.clone(),
897                version: m.data_storage_format.version.clone(),
898            }),
899            config: m.config.clone(),
900            blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
901            base_paths: m
902                .base_paths
903                .values()
904                .map(|base_path| pb::BasePath {
905                    id: base_path.id,
906                    name: base_path.name.clone(),
907                    is_dataset_root: base_path.is_dataset_root,
908                    path: base_path.path.clone(),
909                })
910                .collect(),
911        }
912    }
913}
914
915#[async_trait]
916pub trait SelfDescribingFileReader {
917    /// Open a file reader without any cached schema
918    ///
919    /// In this case the schema will first need to be loaded
920    /// from the file itself.
921    ///
922    /// When loading files from a dataset it is preferable to use
923    /// the fragment reader to avoid this overhead.
924    async fn try_new_self_described(
925        object_store: &ObjectStore,
926        path: &Path,
927        cache: Option<&LanceCache>,
928    ) -> Result<Self>
929    where
930        Self: Sized,
931    {
932        let reader = object_store.open(path).await?;
933        Self::try_new_self_described_from_reader(reader.into(), cache).await
934    }
935
936    async fn try_new_self_described_from_reader(
937        reader: Arc<dyn Reader>,
938        cache: Option<&LanceCache>,
939    ) -> Result<Self>
940    where
941        Self: Sized;
942}
943
944#[async_trait]
945impl SelfDescribingFileReader for FileReader {
946    async fn try_new_self_described_from_reader(
947        reader: Arc<dyn Reader>,
948        cache: Option<&LanceCache>,
949    ) -> Result<Self> {
950        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
951        let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
952            message: format!(
953                "Attempt to open file at {} as self-describing but it did not contain a manifest",
954                reader.path(),
955            ),
956            location: location!(),
957        })?;
958        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
959        if manifest.should_use_legacy_format() {
960            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
961        }
962        let schema = manifest.schema;
963        let max_field_id = schema.max_field_id().unwrap_or_default();
964        Self::try_new_from_reader(
965            reader.path(),
966            reader.clone(),
967            Some(metadata),
968            schema,
969            0,
970            0,
971            max_field_id,
972            cache,
973        )
974        .await
975    }
976}
977
978#[cfg(test)]
979mod tests {
980    use crate::format::{DataFile, DeletionFile, DeletionFileType};
981    use std::num::NonZero;
982
983    use super::*;
984
985    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
986    use lance_core::datatypes::Field;
987
988    #[test]
989    fn test_writer_version() {
990        let wv = WriterVersion::default();
991        assert_eq!(wv.library, "lance");
992        let parts = wv.semver().unwrap();
993
994        // Parse the actual cargo version to check if it has a pre-release tag
995        let cargo_version = env!("CARGO_PKG_VERSION");
996        let expected_tag = if cargo_version.contains('-') {
997            Some(cargo_version.split('-').nth(1).unwrap())
998        } else {
999            None
1000        };
1001
1002        assert_eq!(
1003            parts,
1004            (
1005                env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
1006                env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
1007                // Unit tests run against (major,minor,patch + 1)
1008                env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
1009                expected_tag
1010            )
1011        );
1012
1013        // Verify the base version (without tag) matches CARGO_PKG_VERSION
1014        let base_version = cargo_version.split('-').next().unwrap();
1015        assert_eq!(
1016            format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
1017            base_version
1018        );
1019
1020        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1021            let bumped = wv.bump(*part, false);
1022            let bumped_parts = bumped.semver_or_panic();
1023            assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
1024        }
1025    }
1026
1027    #[test]
1028    fn test_fragments_by_offset_range() {
1029        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1030            "a",
1031            arrow_schema::DataType::Int64,
1032            false,
1033        )]);
1034        let schema = Schema::try_from(&arrow_schema).unwrap();
1035        let fragments = vec![
1036            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1037            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1038            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1039        ];
1040        let manifest = Manifest::new(
1041            schema,
1042            Arc::new(fragments),
1043            DataStorageFormat::default(),
1044            /*blob_dataset_version= */ None,
1045            HashMap::new(),
1046        );
1047
1048        let actual = manifest.fragments_by_offset_range(0..10);
1049        assert_eq!(actual.len(), 1);
1050        assert_eq!(actual[0].0, 0);
1051        assert_eq!(actual[0].1.id, 0);
1052
1053        let actual = manifest.fragments_by_offset_range(5..15);
1054        assert_eq!(actual.len(), 2);
1055        assert_eq!(actual[0].0, 0);
1056        assert_eq!(actual[0].1.id, 0);
1057        assert_eq!(actual[1].0, 10);
1058        assert_eq!(actual[1].1.id, 1);
1059
1060        let actual = manifest.fragments_by_offset_range(15..50);
1061        assert_eq!(actual.len(), 2);
1062        assert_eq!(actual[0].0, 10);
1063        assert_eq!(actual[0].1.id, 1);
1064        assert_eq!(actual[1].0, 25);
1065        assert_eq!(actual[1].1.id, 2);
1066
1067        // Out of range
1068        let actual = manifest.fragments_by_offset_range(45..100);
1069        assert!(actual.is_empty());
1070
1071        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1072    }
1073
1074    #[test]
1075    fn test_max_field_id() {
1076        // Validate that max field id handles varying field ids by fragment.
1077        let mut field0 =
1078            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1079        field0.set_id(-1, &mut 0);
1080        let mut field2 =
1081            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1082        field2.set_id(-1, &mut 2);
1083
1084        let schema = Schema {
1085            fields: vec![field0, field2],
1086            metadata: Default::default(),
1087        };
1088        let fragments = vec![
1089            Fragment {
1090                id: 0,
1091                files: vec![DataFile::new_legacy_from_fields(
1092                    "path1",
1093                    vec![0, 1, 2],
1094                    None,
1095                )],
1096                deletion_file: None,
1097                row_id_meta: None,
1098                physical_rows: None,
1099                created_at_version_meta: None,
1100                last_updated_at_version_meta: None,
1101            },
1102            Fragment {
1103                id: 1,
1104                files: vec![
1105                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1106                    DataFile::new_legacy_from_fields("path3", vec![2], None),
1107                ],
1108                deletion_file: None,
1109                row_id_meta: None,
1110                physical_rows: None,
1111                created_at_version_meta: None,
1112                last_updated_at_version_meta: None,
1113            },
1114        ];
1115
1116        let manifest = Manifest::new(
1117            schema,
1118            Arc::new(fragments),
1119            DataStorageFormat::default(),
1120            /*blob_dataset_version= */ None,
1121            HashMap::new(),
1122        );
1123
1124        assert_eq!(manifest.max_field_id(), 43);
1125    }
1126
1127    #[test]
1128    fn test_config() {
1129        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1130            "a",
1131            arrow_schema::DataType::Int64,
1132            false,
1133        )]);
1134        let schema = Schema::try_from(&arrow_schema).unwrap();
1135        let fragments = vec![
1136            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1137            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1138            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1139        ];
1140        let mut manifest = Manifest::new(
1141            schema,
1142            Arc::new(fragments),
1143            DataStorageFormat::default(),
1144            /*blob_dataset_version= */ None,
1145            HashMap::new(),
1146        );
1147
1148        let mut config = manifest.config.clone();
1149        config.insert("lance.test".to_string(), "value".to_string());
1150        config.insert("other-key".to_string(), "other-value".to_string());
1151
1152        manifest.config_mut().extend(config.clone());
1153        assert_eq!(manifest.config, config.clone());
1154
1155        config.remove("other-key");
1156        manifest.config_mut().remove("other-key");
1157        assert_eq!(manifest.config, config);
1158    }
1159
1160    #[test]
1161    fn test_manifest_summary() {
1162        // Step 1: test empty manifest summary
1163        let arrow_schema = ArrowSchema::new(vec![
1164            ArrowField::new("id", arrow_schema::DataType::Int64, false),
1165            ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1166        ]);
1167        let schema = Schema::try_from(&arrow_schema).unwrap();
1168
1169        let empty_manifest = Manifest::new(
1170            schema.clone(),
1171            Arc::new(vec![]),
1172            DataStorageFormat::default(),
1173            None,
1174            HashMap::new(),
1175        );
1176
1177        let empty_summary = empty_manifest.summary();
1178        assert_eq!(empty_summary.total_rows, 0);
1179        assert_eq!(empty_summary.total_files_size, 0);
1180        assert_eq!(empty_summary.total_fragments, 0);
1181        assert_eq!(empty_summary.total_data_files, 0);
1182        assert_eq!(empty_summary.total_deletion_file_rows, 0);
1183        assert_eq!(empty_summary.total_data_file_rows, 0);
1184        assert_eq!(empty_summary.total_deletion_files, 0);
1185
1186        // Step 2: write empty files and verify summary
1187        let empty_fragments = vec![
1188            Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1189            Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1190        ];
1191
1192        let empty_files_manifest = Manifest::new(
1193            schema.clone(),
1194            Arc::new(empty_fragments),
1195            DataStorageFormat::default(),
1196            None,
1197            HashMap::new(),
1198        );
1199
1200        let empty_files_summary = empty_files_manifest.summary();
1201        assert_eq!(empty_files_summary.total_rows, 0);
1202        assert_eq!(empty_files_summary.total_files_size, 0);
1203        assert_eq!(empty_files_summary.total_fragments, 2);
1204        assert_eq!(empty_files_summary.total_data_files, 2);
1205        assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1206        assert_eq!(empty_files_summary.total_data_file_rows, 0);
1207        assert_eq!(empty_files_summary.total_deletion_files, 0);
1208
1209        // Step 3: write real data and verify summary
1210        let real_fragments = vec![
1211            Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1212            Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1213            Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1214        ];
1215
1216        let real_data_manifest = Manifest::new(
1217            schema.clone(),
1218            Arc::new(real_fragments),
1219            DataStorageFormat::default(),
1220            None,
1221            HashMap::new(),
1222        );
1223
1224        let real_data_summary = real_data_manifest.summary();
1225        assert_eq!(real_data_summary.total_rows, 425); // 100 + 250 + 75
1226        assert_eq!(real_data_summary.total_files_size, 0); // Zero for unknown
1227        assert_eq!(real_data_summary.total_fragments, 3);
1228        assert_eq!(real_data_summary.total_data_files, 3);
1229        assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1230        assert_eq!(real_data_summary.total_data_file_rows, 425);
1231        assert_eq!(real_data_summary.total_deletion_files, 0);
1232
1233        let file_version = LanceFileVersion::default();
1234        // Step 4: write deletion files and verify summary
1235        let mut fragment_with_deletion = Fragment::new(0)
1236            .with_file(
1237                "data_with_deletion.lance",
1238                vec![0, 1],
1239                vec![0, 1],
1240                &file_version,
1241                NonZero::new(1000),
1242            )
1243            .with_physical_rows(50);
1244        fragment_with_deletion.deletion_file = Some(DeletionFile {
1245            read_version: 123,
1246            id: 456,
1247            file_type: DeletionFileType::Array,
1248            num_deleted_rows: Some(10),
1249            base_id: None,
1250        });
1251
1252        let manifest_with_deletion = Manifest::new(
1253            schema,
1254            Arc::new(vec![fragment_with_deletion]),
1255            DataStorageFormat::default(),
1256            None,
1257            HashMap::new(),
1258        );
1259
1260        let deletion_summary = manifest_with_deletion.summary();
1261        assert_eq!(deletion_summary.total_rows, 40); // 50 - 10
1262        assert_eq!(deletion_summary.total_files_size, 1000);
1263        assert_eq!(deletion_summary.total_fragments, 1);
1264        assert_eq!(deletion_summary.total_data_files, 1);
1265        assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1266        assert_eq!(deletion_summary.total_data_file_rows, 50);
1267        assert_eq!(deletion_summary.total_deletion_files, 1);
1268
1269        //Just verify the transformation is OK
1270        let stats_map: BTreeMap<String, String> = deletion_summary.into();
1271        assert_eq!(stats_map.len(), 7)
1272    }
1273}