lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use deepsize::DeepSizeOf;
11use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
12use lance_file::reader::FileReader;
13use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
14use lance_io::traits::{ProtoStruct, Reader};
15use object_store::path::Path;
16use prost::Message;
17use prost_types::Timestamp;
18
19use super::Fragment;
20use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::{Schema, StorageClass};
24use lance_core::{Error, Result};
25use lance_io::object_store::ObjectStore;
26use lance_io::utils::read_struct;
27use snafu::location;
28
29/// Manifest of a dataset
30///
31///  * Schema
32///  * Version
33///  * Fragments.
34///  * Indices.
35#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
36pub struct Manifest {
37    /// Dataset schema.
38    pub schema: Schema,
39
40    /// Local schema, only containing fields with the default storage class (not blobs)
41    pub local_schema: Schema,
42
43    /// Dataset version
44    pub version: u64,
45
46    /// Version of the writer library that wrote this manifest.
47    pub writer_version: Option<WriterVersion>,
48
49    /// Fragments, the pieces to build the dataset.
50    ///
51    /// This list is stored in order, sorted by fragment id.  However, the fragment id
52    /// sequence may have gaps.
53    pub fragments: Arc<Vec<Fragment>>,
54
55    /// The file position of the version aux data.
56    pub version_aux_data: usize,
57
58    /// The file position of the index metadata.
59    pub index_section: Option<usize>,
60
61    /// The creation timestamp with nanosecond resolution as 128-bit integer
62    pub timestamp_nanos: u128,
63
64    /// An optional string tag for this version
65    pub tag: Option<String>,
66
67    /// The reader flags
68    pub reader_feature_flags: u64,
69
70    /// The writer flags
71    pub writer_feature_flags: u64,
72
73    /// The max fragment id used so far  
74    /// None means never set, Some(0) means max ID used so far is 0
75    pub max_fragment_id: Option<u32>,
76
77    /// The path to the transaction file, relative to the root of the dataset
78    pub transaction_file: Option<String>,
79
80    /// Precomputed logic offset of each fragment
81    /// accelerating the fragment search using offset ranges.
82    fragment_offsets: Vec<usize>,
83
84    /// The max row id used so far.
85    pub next_row_id: u64,
86
87    /// The storage format of the data files.
88    pub data_storage_format: DataStorageFormat,
89
90    /// Table configuration.
91    pub config: HashMap<String, String>,
92
93    /// Blob dataset version
94    pub blob_dataset_version: Option<u64>,
95
96    /* external base paths */
97    pub base_paths: HashMap<u32, BasePath>,
98}
99
100// We use the most significant bit to indicate that a transaction is detached
101pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
102
103pub fn is_detached_version(version: u64) -> bool {
104    version & DETACHED_VERSION_MASK != 0
105}
106
107fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
108    fragments
109        .iter()
110        .map(|f| f.num_rows().unwrap_or_default())
111        .chain([0]) // Make the last offset to be the full-length of the dataset.
112        .scan(0_usize, |offset, len| {
113            let start = *offset;
114            *offset += len;
115            Some(start)
116        })
117        .collect()
118}
119
120impl Manifest {
121    pub fn new(
122        schema: Schema,
123        fragments: Arc<Vec<Fragment>>,
124        data_storage_format: DataStorageFormat,
125        blob_dataset_version: Option<u64>,
126        base_paths: HashMap<u32, BasePath>,
127    ) -> Self {
128        let fragment_offsets = compute_fragment_offsets(&fragments);
129        let local_schema = schema.retain_storage_class(StorageClass::Default);
130
131        Self {
132            schema,
133            local_schema,
134            version: 1,
135            writer_version: Some(WriterVersion::default()),
136            fragments,
137            version_aux_data: 0,
138            index_section: None,
139            timestamp_nanos: 0,
140            tag: None,
141            reader_feature_flags: 0,
142            writer_feature_flags: 0,
143            max_fragment_id: None,
144            transaction_file: None,
145            fragment_offsets,
146            next_row_id: 0,
147            data_storage_format,
148            config: HashMap::new(),
149            blob_dataset_version,
150            base_paths,
151        }
152    }
153
154    pub fn new_from_previous(
155        previous: &Self,
156        schema: Schema,
157        fragments: Arc<Vec<Fragment>>,
158        new_blob_version: Option<u64>,
159    ) -> Self {
160        let fragment_offsets = compute_fragment_offsets(&fragments);
161        let local_schema = schema.retain_storage_class(StorageClass::Default);
162
163        let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
164
165        Self {
166            schema,
167            local_schema,
168            version: previous.version + 1,
169            writer_version: Some(WriterVersion::default()),
170            fragments,
171            version_aux_data: 0,
172            index_section: None, // Caller should update index if they want to keep them.
173            timestamp_nanos: 0,  // This will be set on commit
174            tag: None,
175            reader_feature_flags: 0, // These will be set on commit
176            writer_feature_flags: 0, // These will be set on commit
177            max_fragment_id: previous.max_fragment_id,
178            transaction_file: None,
179            fragment_offsets,
180            next_row_id: previous.next_row_id,
181            data_storage_format: previous.data_storage_format.clone(),
182            config: previous.config.clone(),
183            blob_dataset_version,
184            base_paths: previous.base_paths.clone(),
185        }
186    }
187
188    /// Performs a shallow_clone of the manifest entirely in memory without:
189    /// - Any persistent storage operations
190    /// - Modifications to the original data
191    pub fn shallow_clone(
192        &self,
193        ref_name: Option<String>,
194        ref_path: String,
195        ref_base_id: u32,
196        transaction_file: String,
197    ) -> Self {
198        let cloned_fragments = self
199            .fragments
200            .as_ref()
201            .iter()
202            .map(|fragment| {
203                let mut cloned_fragment = fragment.clone();
204                for file in &mut cloned_fragment.files {
205                    if file.base_id.is_none() {
206                        file.base_id = Some(ref_base_id);
207                    }
208                }
209
210                if let Some(deletion) = &mut cloned_fragment.deletion_file {
211                    if deletion.base_id.is_none() {
212                        deletion.base_id = Some(ref_base_id);
213                    }
214                }
215                cloned_fragment
216            })
217            .collect::<Vec<_>>();
218
219        Self {
220            schema: self.schema.clone(),
221            local_schema: self.local_schema.clone(),
222            version: self.version,
223            writer_version: self.writer_version.clone(),
224            fragments: Arc::new(cloned_fragments),
225            version_aux_data: self.version_aux_data,
226            index_section: None, // These will be set on commit
227            timestamp_nanos: self.timestamp_nanos,
228            tag: None,
229            reader_feature_flags: 0, // These will be set on commit
230            writer_feature_flags: 0, // These will be set on commit
231            max_fragment_id: self.max_fragment_id,
232            transaction_file: Some(transaction_file),
233            fragment_offsets: self.fragment_offsets.clone(),
234            next_row_id: self.next_row_id,
235            data_storage_format: self.data_storage_format.clone(),
236            config: self.config.clone(),
237            blob_dataset_version: self.blob_dataset_version,
238            base_paths: {
239                let mut base_paths = self.base_paths.clone();
240                let base_path = BasePath {
241                    id: ref_base_id,
242                    name: ref_name,
243                    is_dataset_root: true,
244                    path: ref_path,
245                };
246                base_paths.insert(ref_base_id, base_path);
247                base_paths
248            },
249        }
250    }
251
252    /// Return the `timestamp_nanos` value as a Utc DateTime
253    pub fn timestamp(&self) -> DateTime<Utc> {
254        let nanos = self.timestamp_nanos % 1_000_000_000;
255        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
256        Utc.from_utc_datetime(
257            &DateTime::from_timestamp(seconds, nanos as u32)
258                .unwrap_or_default()
259                .naive_utc(),
260        )
261    }
262
263    /// Set the `timestamp_nanos` value from a Utc DateTime
264    pub fn set_timestamp(&mut self, nanos: u128) {
265        self.timestamp_nanos = nanos;
266    }
267
268    /// Set the `config` from an iterator
269    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
270        self.config.extend(upsert_values);
271    }
272
273    /// Delete `config` keys using a slice of keys
274    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
275        self.config
276            .retain(|key, _| !delete_keys.contains(&key.as_str()));
277    }
278
279    /// Replaces the schema metadata with the given key-value pairs.
280    pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
281        self.schema.metadata = new_metadata;
282    }
283
284    /// Replaces the metadata of the field with the given id with the given key-value pairs.
285    ///
286    /// If the field does not exist in the schema, this is a no-op.
287    pub fn replace_field_metadata(
288        &mut self,
289        field_id: i32,
290        new_metadata: HashMap<String, String>,
291    ) -> Result<()> {
292        if let Some(field) = self.schema.field_by_id_mut(field_id) {
293            field.metadata = new_metadata;
294            Ok(())
295        } else {
296            Err(Error::invalid_input(
297                format!(
298                    "Field with id {} does not exist for replace_field_metadata",
299                    field_id
300                ),
301                location!(),
302            ))
303        }
304    }
305
306    /// Check the current fragment list and update the high water mark
307    pub fn update_max_fragment_id(&mut self) {
308        // If there are no fragments, don't update max_fragment_id
309        if self.fragments.is_empty() {
310            return;
311        }
312
313        let max_fragment_id = self
314            .fragments
315            .iter()
316            .map(|f| f.id)
317            .max()
318            .unwrap() // Safe because we checked fragments is not empty
319            .try_into()
320            .unwrap();
321
322        match self.max_fragment_id {
323            None => {
324                // First time being set
325                self.max_fragment_id = Some(max_fragment_id);
326            }
327            Some(current_max) => {
328                // Only update if the computed max is greater than current
329                // This preserves the high water mark even when fragments are deleted
330                if max_fragment_id > current_max {
331                    self.max_fragment_id = Some(max_fragment_id);
332                }
333            }
334        }
335    }
336
337    /// Return the max fragment id.
338    /// Note this does not support recycling of fragment ids.
339    ///
340    /// This will return None if there are no fragments and max_fragment_id was never set.
341    pub fn max_fragment_id(&self) -> Option<u64> {
342        if let Some(max_id) = self.max_fragment_id {
343            // Return the stored high water mark
344            Some(max_id.into())
345        } else {
346            // Not yet set, compute from fragment list
347            self.fragments.iter().map(|f| f.id).max()
348        }
349    }
350
351    /// Get the max used field id
352    ///
353    /// This is different than [Schema::max_field_id] because it also considers
354    /// the field ids in the data files that have been dropped from the schema.
355    pub fn max_field_id(&self) -> i32 {
356        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
357        let fragment_max_id = self
358            .fragments
359            .iter()
360            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
361            .max()
362            .copied();
363        let fragment_max_id = fragment_max_id.unwrap_or(-1);
364        schema_max_id.max(fragment_max_id)
365    }
366
367    /// Return the fragments that are newer than the given manifest.
368    /// Note this does not support recycling of fragment ids.
369    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
370        if since.version >= self.version {
371            return Err(Error::io(
372                format!(
373                    "fragments_since: given version {} is newer than manifest version {}",
374                    since.version, self.version
375                ),
376                location!(),
377            ));
378        }
379        let start = since.max_fragment_id();
380        Ok(self
381            .fragments
382            .iter()
383            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
384            .cloned()
385            .collect())
386    }
387
388    /// Find the fragments that contain the rows, identified by the offset range.
389    ///
390    /// Note that the offsets are the logical offsets of rows, not row IDs.
391    ///
392    ///
393    /// Parameters
394    /// ----------
395    /// range: Range<usize>
396    ///     Offset range
397    ///
398    /// Returns
399    /// -------
400    /// Vec<(usize, Fragment)>
401    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
402    ///
403    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
404        let start = range.start;
405        let end = range.end;
406        let idx = self
407            .fragment_offsets
408            .binary_search(&start)
409            .unwrap_or_else(|idx| idx - 1);
410
411        let mut fragments = vec![];
412        for i in idx..self.fragments.len() {
413            if self.fragment_offsets[i] >= end
414                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
415                    <= start
416            {
417                break;
418            }
419            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
420        }
421
422        fragments
423    }
424
425    /// Whether the dataset uses stable row ids.
426    pub fn uses_stable_row_ids(&self) -> bool {
427        self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
428    }
429
430    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
431    /// and can be used to create a dataset
432    pub fn serialized(&self) -> Vec<u8> {
433        let pb_manifest: pb::Manifest = self.into();
434        pb_manifest.encode_to_vec()
435    }
436
437    pub fn should_use_legacy_format(&self) -> bool {
438        self.data_storage_format.version == LEGACY_FORMAT_VERSION
439    }
440}
441
442#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
443pub struct BasePath {
444    pub id: u32,
445    pub name: Option<String>,
446    pub is_dataset_root: bool,
447    pub path: String,
448}
449
450#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
451pub struct WriterVersion {
452    pub library: String,
453    pub version: String,
454}
455
456#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
457pub struct DataStorageFormat {
458    pub file_format: String,
459    pub version: String,
460}
461
462const LANCE_FORMAT_NAME: &str = "lance";
463
464impl DataStorageFormat {
465    pub fn new(version: LanceFileVersion) -> Self {
466        Self {
467            file_format: LANCE_FORMAT_NAME.to_string(),
468            version: version.resolve().to_string(),
469        }
470    }
471
472    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
473        self.version.parse::<LanceFileVersion>()
474    }
475}
476
477impl Default for DataStorageFormat {
478    fn default() -> Self {
479        Self::new(LanceFileVersion::default())
480    }
481}
482
483impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
484    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
485        Self {
486            file_format: pb.file_format,
487            version: pb.version,
488        }
489    }
490}
491
492#[derive(Debug, Clone, Copy, PartialEq, Eq)]
493pub enum VersionPart {
494    Major,
495    Minor,
496    Patch,
497}
498
499impl WriterVersion {
500    /// Try to parse the version string as a semver string. Returns None if
501    /// not successful.
502    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
503        let mut parts = self.version.split('.');
504        let major = parts.next().unwrap_or("0").parse().ok()?;
505        let minor = parts.next().unwrap_or("0").parse().ok()?;
506        let patch = parts.next().unwrap_or("0").parse().ok()?;
507        let tag = parts.next();
508        Some((major, minor, patch, tag))
509    }
510
511    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
512        self.semver()
513            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
514    }
515
516    /// Return true if self is older than the given major/minor/patch
517    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
518        let version = self.semver_or_panic();
519        (version.0, version.1, version.2) < (major, minor, patch)
520    }
521
522    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
523        let parts = self.semver_or_panic();
524        let tag = if keep_tag { parts.3 } else { None };
525        let new_parts = match part {
526            VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
527            VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
528            VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
529        };
530        let new_version = if let Some(tag) = tag {
531            format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
532        } else {
533            format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
534        };
535        Self {
536            library: self.library.clone(),
537            version: new_version,
538        }
539    }
540}
541
542impl Default for WriterVersion {
543    #[cfg(not(test))]
544    fn default() -> Self {
545        Self {
546            library: "lance".to_string(),
547            version: env!("CARGO_PKG_VERSION").to_string(),
548        }
549    }
550
551    // Unit tests always run as if they are in the next version.
552    #[cfg(test)]
553    fn default() -> Self {
554        Self {
555            library: "lance".to_string(),
556            version: env!("CARGO_PKG_VERSION").to_string(),
557        }
558        .bump(VersionPart::Patch, true)
559    }
560}
561
562impl ProtoStruct for Manifest {
563    type Proto = pb::Manifest;
564}
565
566impl From<pb::BasePath> for BasePath {
567    fn from(p: pb::BasePath) -> Self {
568        Self {
569            id: p.id,
570            name: p.name,
571            is_dataset_root: p.is_dataset_root,
572            path: p.path,
573        }
574    }
575}
576
577impl TryFrom<pb::Manifest> for Manifest {
578    type Error = Error;
579
580    fn try_from(p: pb::Manifest) -> Result<Self> {
581        let timestamp_nanos = p.timestamp.map(|ts| {
582            let sec = ts.seconds as u128 * 1e9 as u128;
583            let nanos = ts.nanos as u128;
584            sec + nanos
585        });
586        // We only use the writer version if it is fully set.
587        let writer_version = match p.writer_version {
588            Some(pb::manifest::WriterVersion { library, version }) => {
589                Some(WriterVersion { library, version })
590            }
591            _ => None,
592        };
593        let fragments = Arc::new(
594            p.fragments
595                .into_iter()
596                .map(Fragment::try_from)
597                .collect::<Result<Vec<_>>>()?,
598        );
599        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
600        let fields_with_meta = FieldsWithMeta {
601            fields: Fields(p.fields),
602            metadata: p.metadata,
603        };
604
605        if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
606            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
607        {
608            return Err(Error::Internal {
609                message: "All fragments must have row ids".into(),
610                location: location!(),
611            });
612        }
613
614        let data_storage_format = match p.data_format {
615            None => {
616                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
617                    // If there are fragments, they are a better indicator
618                    DataStorageFormat::new(inferred_version)
619                } else {
620                    // No fragments to inspect, best we can do is look at writer flags
621                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
622                        DataStorageFormat::new(LanceFileVersion::Stable)
623                    } else {
624                        DataStorageFormat::new(LanceFileVersion::Legacy)
625                    }
626                }
627            }
628            Some(format) => DataStorageFormat::from(format),
629        };
630
631        let schema = Schema::from(fields_with_meta);
632        let local_schema = schema.retain_storage_class(StorageClass::Default);
633
634        Ok(Self {
635            schema,
636            local_schema,
637            version: p.version,
638            writer_version,
639            version_aux_data: p.version_aux_data as usize,
640            index_section: p.index_section.map(|i| i as usize),
641            timestamp_nanos: timestamp_nanos.unwrap_or(0),
642            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
643            reader_feature_flags: p.reader_feature_flags,
644            writer_feature_flags: p.writer_feature_flags,
645            max_fragment_id: p.max_fragment_id,
646            fragments,
647            transaction_file: if p.transaction_file.is_empty() {
648                None
649            } else {
650                Some(p.transaction_file)
651            },
652            fragment_offsets,
653            next_row_id: p.next_row_id,
654            data_storage_format,
655            config: p.config,
656            blob_dataset_version: if p.blob_dataset_version == 0 {
657                None
658            } else {
659                Some(p.blob_dataset_version)
660            },
661            base_paths: p
662                .base_paths
663                .iter()
664                .map(|item| (item.id, item.clone().into()))
665                .collect(),
666        })
667    }
668}
669
670impl From<&Manifest> for pb::Manifest {
671    fn from(m: &Manifest) -> Self {
672        let timestamp_nanos = if m.timestamp_nanos == 0 {
673            None
674        } else {
675            let nanos = m.timestamp_nanos % 1e9 as u128;
676            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
677            Some(Timestamp {
678                seconds,
679                nanos: nanos as i32,
680            })
681        };
682        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
683        Self {
684            fields: fields_with_meta.fields.0,
685            version: m.version,
686            writer_version: m
687                .writer_version
688                .as_ref()
689                .map(|wv| pb::manifest::WriterVersion {
690                    library: wv.library.clone(),
691                    version: wv.version.clone(),
692                }),
693            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
694            metadata: fields_with_meta.metadata,
695            version_aux_data: m.version_aux_data as u64,
696            index_section: m.index_section.map(|i| i as u64),
697            timestamp: timestamp_nanos,
698            tag: m.tag.clone().unwrap_or_default(),
699            reader_feature_flags: m.reader_feature_flags,
700            writer_feature_flags: m.writer_feature_flags,
701            max_fragment_id: m.max_fragment_id,
702            transaction_file: m.transaction_file.clone().unwrap_or_default(),
703            next_row_id: m.next_row_id,
704            data_format: Some(pb::manifest::DataStorageFormat {
705                file_format: m.data_storage_format.file_format.clone(),
706                version: m.data_storage_format.version.clone(),
707            }),
708            config: m.config.clone(),
709            blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
710            base_paths: m
711                .base_paths
712                .values()
713                .map(|base_path| pb::BasePath {
714                    id: base_path.id,
715                    name: base_path.name.clone(),
716                    is_dataset_root: base_path.is_dataset_root,
717                    path: base_path.path.clone(),
718                })
719                .collect(),
720        }
721    }
722}
723
724#[async_trait]
725pub trait SelfDescribingFileReader {
726    /// Open a file reader without any cached schema
727    ///
728    /// In this case the schema will first need to be loaded
729    /// from the file itself.
730    ///
731    /// When loading files from a dataset it is preferable to use
732    /// the fragment reader to avoid this overhead.
733    async fn try_new_self_described(
734        object_store: &ObjectStore,
735        path: &Path,
736        cache: Option<&LanceCache>,
737    ) -> Result<Self>
738    where
739        Self: Sized,
740    {
741        let reader = object_store.open(path).await?;
742        Self::try_new_self_described_from_reader(reader.into(), cache).await
743    }
744
745    async fn try_new_self_described_from_reader(
746        reader: Arc<dyn Reader>,
747        cache: Option<&LanceCache>,
748    ) -> Result<Self>
749    where
750        Self: Sized;
751}
752
753#[async_trait]
754impl SelfDescribingFileReader for FileReader {
755    async fn try_new_self_described_from_reader(
756        reader: Arc<dyn Reader>,
757        cache: Option<&LanceCache>,
758    ) -> Result<Self> {
759        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
760        let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
761            message: format!(
762                "Attempt to open file at {} as self-describing but it did not contain a manifest",
763                reader.path(),
764            ),
765            location: location!(),
766        })?;
767        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
768        if manifest.should_use_legacy_format() {
769            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
770        }
771        let schema = manifest.schema;
772        let max_field_id = schema.max_field_id().unwrap_or_default();
773        Self::try_new_from_reader(
774            reader.path(),
775            reader.clone(),
776            Some(metadata),
777            schema,
778            0,
779            0,
780            max_field_id,
781            cache,
782        )
783        .await
784    }
785}
786
787#[cfg(test)]
788mod tests {
789    use crate::format::DataFile;
790
791    use super::*;
792
793    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
794    use lance_core::datatypes::Field;
795
796    #[test]
797    fn test_writer_version() {
798        let wv = WriterVersion::default();
799        assert_eq!(wv.library, "lance");
800        let parts = wv.semver().unwrap();
801        assert_eq!(
802            parts,
803            (
804                env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
805                env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
806                // Unit tests run against (major,minor,patch + 1)
807                env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
808                None
809            )
810        );
811        assert_eq!(
812            format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
813            env!("CARGO_PKG_VERSION")
814        );
815        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
816            let bumped = wv.bump(*part, false);
817            let bumped_parts = bumped.semver_or_panic();
818            assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
819        }
820    }
821
822    #[test]
823    fn test_fragments_by_offset_range() {
824        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
825            "a",
826            arrow_schema::DataType::Int64,
827            false,
828        )]);
829        let schema = Schema::try_from(&arrow_schema).unwrap();
830        let fragments = vec![
831            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
832            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
833            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
834        ];
835        let manifest = Manifest::new(
836            schema,
837            Arc::new(fragments),
838            DataStorageFormat::default(),
839            /*blob_dataset_version= */ None,
840            /*ref_main_location= */ HashMap::new(),
841        );
842
843        let actual = manifest.fragments_by_offset_range(0..10);
844        assert_eq!(actual.len(), 1);
845        assert_eq!(actual[0].0, 0);
846        assert_eq!(actual[0].1.id, 0);
847
848        let actual = manifest.fragments_by_offset_range(5..15);
849        assert_eq!(actual.len(), 2);
850        assert_eq!(actual[0].0, 0);
851        assert_eq!(actual[0].1.id, 0);
852        assert_eq!(actual[1].0, 10);
853        assert_eq!(actual[1].1.id, 1);
854
855        let actual = manifest.fragments_by_offset_range(15..50);
856        assert_eq!(actual.len(), 2);
857        assert_eq!(actual[0].0, 10);
858        assert_eq!(actual[0].1.id, 1);
859        assert_eq!(actual[1].0, 25);
860        assert_eq!(actual[1].1.id, 2);
861
862        // Out of range
863        let actual = manifest.fragments_by_offset_range(45..100);
864        assert!(actual.is_empty());
865
866        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
867    }
868
869    #[test]
870    fn test_max_field_id() {
871        // Validate that max field id handles varying field ids by fragment.
872        let mut field0 =
873            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
874        field0.set_id(-1, &mut 0);
875        let mut field2 =
876            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
877        field2.set_id(-1, &mut 2);
878
879        let schema = Schema {
880            fields: vec![field0, field2],
881            metadata: Default::default(),
882        };
883        let fragments = vec![
884            Fragment {
885                id: 0,
886                files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
887                deletion_file: None,
888                row_id_meta: None,
889                physical_rows: None,
890            },
891            Fragment {
892                id: 1,
893                files: vec![
894                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
895                    DataFile::new_legacy_from_fields("path3", vec![2]),
896                ],
897                deletion_file: None,
898                row_id_meta: None,
899                physical_rows: None,
900            },
901        ];
902
903        let manifest = Manifest::new(
904            schema,
905            Arc::new(fragments),
906            DataStorageFormat::default(),
907            /*blob_dataset_version= */ None,
908            /*ref_main_location= */ HashMap::new(),
909        );
910
911        assert_eq!(manifest.max_field_id(), 43);
912    }
913
914    #[test]
915    fn test_config() {
916        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
917            "a",
918            arrow_schema::DataType::Int64,
919            false,
920        )]);
921        let schema = Schema::try_from(&arrow_schema).unwrap();
922        let fragments = vec![
923            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
924            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
925            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
926        ];
927        let mut manifest = Manifest::new(
928            schema,
929            Arc::new(fragments),
930            DataStorageFormat::default(),
931            /*blob_dataset_version= */ None,
932            /*ref_main_location= */ HashMap::new(),
933        );
934
935        let mut config = manifest.config.clone();
936        config.insert("lance.test".to_string(), "value".to_string());
937        config.insert("other-key".to_string(), "other-value".to_string());
938
939        manifest.update_config(config.clone());
940        assert_eq!(manifest.config, config.clone());
941
942        config.remove("other-key");
943        manifest.delete_config_keys(&["other-key"]);
944        assert_eq!(manifest.config, config);
945    }
946}