lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use deepsize::DeepSizeOf;
11use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
12use lance_file::reader::FileReader;
13use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
14use lance_io::traits::{ProtoStruct, Reader};
15use object_store::path::Path;
16use prost::Message;
17use prost_types::Timestamp;
18
19use super::Fragment;
20use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::{Schema, StorageClass};
24use lance_core::{Error, Result};
25use lance_io::object_store::ObjectStore;
26use lance_io::utils::read_struct;
27use snafu::location;
28
29/// Manifest of a dataset
30///
31///  * Schema
32///  * Version
33///  * Fragments.
34///  * Indices.
35#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
36pub struct Manifest {
37    /// Dataset schema.
38    pub schema: Schema,
39
40    /// Local schema, only containing fields with the default storage class (not blobs)
41    pub local_schema: Schema,
42
43    /// Dataset version
44    pub version: u64,
45
46    /// Version of the writer library that wrote this manifest.
47    pub writer_version: Option<WriterVersion>,
48
49    /// Fragments, the pieces to build the dataset.
50    ///
51    /// This list is stored in order, sorted by fragment id.  However, the fragment id
52    /// sequence may have gaps.
53    pub fragments: Arc<Vec<Fragment>>,
54
55    /// The file position of the version aux data.
56    pub version_aux_data: usize,
57
58    /// The file position of the index metadata.
59    pub index_section: Option<usize>,
60
61    /// The creation timestamp with nanosecond resolution as 128-bit integer
62    pub timestamp_nanos: u128,
63
64    /// An optional string tag for this version
65    pub tag: Option<String>,
66
67    /// The reader flags
68    pub reader_feature_flags: u64,
69
70    /// The writer flags
71    pub writer_feature_flags: u64,
72
73    /// The max fragment id used so far  
74    /// None means never set, Some(0) means max ID used so far is 0
75    pub max_fragment_id: Option<u32>,
76
77    /// The path to the transaction file, relative to the root of the dataset
78    pub transaction_file: Option<String>,
79
80    /// Precomputed logic offset of each fragment
81    /// accelerating the fragment search using offset ranges.
82    fragment_offsets: Vec<usize>,
83
84    /// The max row id used so far.
85    pub next_row_id: u64,
86
87    /// The storage format of the data files.
88    pub data_storage_format: DataStorageFormat,
89
90    /// Table configuration.
91    pub config: HashMap<String, String>,
92
93    /// Blob dataset version
94    pub blob_dataset_version: Option<u64>,
95
96    /* external base paths */
97    pub base_paths: HashMap<u32, BasePath>,
98}
99
100// We use the most significant bit to indicate that a transaction is detached
101pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
102
103pub fn is_detached_version(version: u64) -> bool {
104    version & DETACHED_VERSION_MASK != 0
105}
106
107fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
108    fragments
109        .iter()
110        .map(|f| f.num_rows().unwrap_or_default())
111        .chain([0]) // Make the last offset to be the full-length of the dataset.
112        .scan(0_usize, |offset, len| {
113            let start = *offset;
114            *offset += len;
115            Some(start)
116        })
117        .collect()
118}
119
120impl Manifest {
121    pub fn new(
122        schema: Schema,
123        fragments: Arc<Vec<Fragment>>,
124        data_storage_format: DataStorageFormat,
125        blob_dataset_version: Option<u64>,
126        base_paths: HashMap<u32, BasePath>,
127    ) -> Self {
128        let fragment_offsets = compute_fragment_offsets(&fragments);
129        let local_schema = schema.retain_storage_class(StorageClass::Default);
130
131        Self {
132            schema,
133            local_schema,
134            version: 1,
135            writer_version: Some(WriterVersion::default()),
136            fragments,
137            version_aux_data: 0,
138            index_section: None,
139            timestamp_nanos: 0,
140            tag: None,
141            reader_feature_flags: 0,
142            writer_feature_flags: 0,
143            max_fragment_id: None,
144            transaction_file: None,
145            fragment_offsets,
146            next_row_id: 0,
147            data_storage_format,
148            config: HashMap::new(),
149            blob_dataset_version,
150            base_paths,
151        }
152    }
153
154    pub fn new_from_previous(
155        previous: &Self,
156        schema: Schema,
157        fragments: Arc<Vec<Fragment>>,
158        new_blob_version: Option<u64>,
159    ) -> Self {
160        let fragment_offsets = compute_fragment_offsets(&fragments);
161        let local_schema = schema.retain_storage_class(StorageClass::Default);
162
163        let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
164
165        Self {
166            schema,
167            local_schema,
168            version: previous.version + 1,
169            writer_version: Some(WriterVersion::default()),
170            fragments,
171            version_aux_data: 0,
172            index_section: None, // Caller should update index if they want to keep them.
173            timestamp_nanos: 0,  // This will be set on commit
174            tag: None,
175            reader_feature_flags: 0, // These will be set on commit
176            writer_feature_flags: 0, // These will be set on commit
177            max_fragment_id: previous.max_fragment_id,
178            transaction_file: None,
179            fragment_offsets,
180            next_row_id: previous.next_row_id,
181            data_storage_format: previous.data_storage_format.clone(),
182            config: previous.config.clone(),
183            blob_dataset_version,
184            base_paths: previous.base_paths.clone(),
185        }
186    }
187
188    pub fn shallow_clone(
189        &self,
190        ref_name: Option<String>,
191        ref_path: String,
192        transaction_file: String,
193    ) -> Self {
194        let new_base_id = self.base_paths.keys().max().map(|id| *id + 1).unwrap_or(0);
195        let cloned_fragments = self
196            .fragments
197            .as_ref()
198            .iter()
199            .map(|fragment| {
200                let mut cloned_fragment = fragment.clone();
201                cloned_fragment.files = cloned_fragment
202                    .files
203                    .into_iter()
204                    .map(|mut file| {
205                        file.base_id = Some(new_base_id);
206                        file
207                    })
208                    .collect();
209
210                if let Some(mut deletion) = cloned_fragment.deletion_file.take() {
211                    deletion.base_id = Some(new_base_id);
212                    cloned_fragment.deletion_file = Some(deletion);
213                }
214
215                cloned_fragment
216            })
217            .collect::<Vec<_>>();
218
219        Self {
220            schema: self.schema.clone(),
221            local_schema: self.local_schema.clone(),
222            version: self.version,
223            writer_version: self.writer_version.clone(),
224            fragments: Arc::new(cloned_fragments),
225            version_aux_data: self.version_aux_data,
226            // TODO: apply shallow clone to indexes
227            index_section: None,
228            timestamp_nanos: self.timestamp_nanos,
229            reader_feature_flags: self.reader_feature_flags,
230            tag: None,
231            writer_feature_flags: self.writer_feature_flags,
232            max_fragment_id: self.max_fragment_id,
233            transaction_file: Some(transaction_file),
234            fragment_offsets: self.fragment_offsets.clone(),
235            next_row_id: self.next_row_id,
236            data_storage_format: self.data_storage_format.clone(),
237            config: self.config.clone(),
238            blob_dataset_version: self.blob_dataset_version,
239            base_paths: {
240                let mut base_paths = self.base_paths.clone();
241                let base_path = BasePath {
242                    id: new_base_id,
243                    name: ref_name,
244                    is_dataset_root: true,
245                    path: ref_path,
246                };
247                base_paths.insert(new_base_id, base_path);
248                base_paths
249            },
250        }
251    }
252
253    /// Return the `timestamp_nanos` value as a Utc DateTime
254    pub fn timestamp(&self) -> DateTime<Utc> {
255        let nanos = self.timestamp_nanos % 1_000_000_000;
256        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
257        Utc.from_utc_datetime(
258            &DateTime::from_timestamp(seconds, nanos as u32)
259                .unwrap_or_default()
260                .naive_utc(),
261        )
262    }
263
264    /// Set the `timestamp_nanos` value from a Utc DateTime
265    pub fn set_timestamp(&mut self, nanos: u128) {
266        self.timestamp_nanos = nanos;
267    }
268
269    /// Set the `config` from an iterator
270    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
271        self.config.extend(upsert_values);
272    }
273
274    /// Delete `config` keys using a slice of keys
275    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
276        self.config
277            .retain(|key, _| !delete_keys.contains(&key.as_str()));
278    }
279
280    /// Replaces the schema metadata with the given key-value pairs.
281    pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
282        self.schema.metadata = new_metadata;
283    }
284
285    /// Replaces the metadata of the field with the given id with the given key-value pairs.
286    ///
287    /// If the field does not exist in the schema, this is a no-op.
288    pub fn replace_field_metadata(
289        &mut self,
290        field_id: i32,
291        new_metadata: HashMap<String, String>,
292    ) -> Result<()> {
293        if let Some(field) = self.schema.field_by_id_mut(field_id) {
294            field.metadata = new_metadata;
295            Ok(())
296        } else {
297            Err(Error::invalid_input(
298                format!(
299                    "Field with id {} does not exist for replace_field_metadata",
300                    field_id
301                ),
302                location!(),
303            ))
304        }
305    }
306
307    /// Check the current fragment list and update the high water mark
308    pub fn update_max_fragment_id(&mut self) {
309        // If there are no fragments, don't update max_fragment_id
310        if self.fragments.is_empty() {
311            return;
312        }
313
314        let max_fragment_id = self
315            .fragments
316            .iter()
317            .map(|f| f.id)
318            .max()
319            .unwrap() // Safe because we checked fragments is not empty
320            .try_into()
321            .unwrap();
322
323        match self.max_fragment_id {
324            None => {
325                // First time being set
326                self.max_fragment_id = Some(max_fragment_id);
327            }
328            Some(current_max) => {
329                // Only update if the computed max is greater than current
330                // This preserves the high water mark even when fragments are deleted
331                if max_fragment_id > current_max {
332                    self.max_fragment_id = Some(max_fragment_id);
333                }
334            }
335        }
336    }
337
338    /// Return the max fragment id.
339    /// Note this does not support recycling of fragment ids.
340    ///
341    /// This will return None if there are no fragments and max_fragment_id was never set.
342    pub fn max_fragment_id(&self) -> Option<u64> {
343        if let Some(max_id) = self.max_fragment_id {
344            // Return the stored high water mark
345            Some(max_id.into())
346        } else {
347            // Not yet set, compute from fragment list
348            self.fragments.iter().map(|f| f.id).max()
349        }
350    }
351
352    /// Get the max used field id
353    ///
354    /// This is different than [Schema::max_field_id] because it also considers
355    /// the field ids in the data files that have been dropped from the schema.
356    pub fn max_field_id(&self) -> i32 {
357        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
358        let fragment_max_id = self
359            .fragments
360            .iter()
361            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
362            .max()
363            .copied();
364        let fragment_max_id = fragment_max_id.unwrap_or(-1);
365        schema_max_id.max(fragment_max_id)
366    }
367
368    /// Return the fragments that are newer than the given manifest.
369    /// Note this does not support recycling of fragment ids.
370    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
371        if since.version >= self.version {
372            return Err(Error::io(
373                format!(
374                    "fragments_since: given version {} is newer than manifest version {}",
375                    since.version, self.version
376                ),
377                location!(),
378            ));
379        }
380        let start = since.max_fragment_id();
381        Ok(self
382            .fragments
383            .iter()
384            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
385            .cloned()
386            .collect())
387    }
388
389    /// Find the fragments that contain the rows, identified by the offset range.
390    ///
391    /// Note that the offsets are the logical offsets of rows, not row IDs.
392    ///
393    ///
394    /// Parameters
395    /// ----------
396    /// range: Range<usize>
397    ///     Offset range
398    ///
399    /// Returns
400    /// -------
401    /// Vec<(usize, Fragment)>
402    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
403    ///
404    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
405        let start = range.start;
406        let end = range.end;
407        let idx = self
408            .fragment_offsets
409            .binary_search(&start)
410            .unwrap_or_else(|idx| idx - 1);
411
412        let mut fragments = vec![];
413        for i in idx..self.fragments.len() {
414            if self.fragment_offsets[i] >= end
415                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
416                    <= start
417            {
418                break;
419            }
420            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
421        }
422
423        fragments
424    }
425
426    /// Whether the dataset uses stable row ids.
427    pub fn uses_stable_row_ids(&self) -> bool {
428        self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
429    }
430
431    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
432    /// and can be used to create a dataset
433    pub fn serialized(&self) -> Vec<u8> {
434        let pb_manifest: pb::Manifest = self.into();
435        pb_manifest.encode_to_vec()
436    }
437
438    pub fn should_use_legacy_format(&self) -> bool {
439        self.data_storage_format.version == LEGACY_FORMAT_VERSION
440    }
441}
442
443#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
444pub struct BasePath {
445    pub id: u32,
446    pub name: Option<String>,
447    pub is_dataset_root: bool,
448    pub path: String,
449}
450
451#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
452pub struct WriterVersion {
453    pub library: String,
454    pub version: String,
455}
456
457#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
458pub struct DataStorageFormat {
459    pub file_format: String,
460    pub version: String,
461}
462
463const LANCE_FORMAT_NAME: &str = "lance";
464
465impl DataStorageFormat {
466    pub fn new(version: LanceFileVersion) -> Self {
467        Self {
468            file_format: LANCE_FORMAT_NAME.to_string(),
469            version: version.resolve().to_string(),
470        }
471    }
472
473    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
474        self.version.parse::<LanceFileVersion>()
475    }
476}
477
478impl Default for DataStorageFormat {
479    fn default() -> Self {
480        Self::new(LanceFileVersion::default())
481    }
482}
483
484impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
485    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
486        Self {
487            file_format: pb.file_format,
488            version: pb.version,
489        }
490    }
491}
492
493#[derive(Debug, Clone, Copy, PartialEq, Eq)]
494pub enum VersionPart {
495    Major,
496    Minor,
497    Patch,
498}
499
500impl WriterVersion {
501    /// Try to parse the version string as a semver string. Returns None if
502    /// not successful.
503    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
504        let mut parts = self.version.split('.');
505        let major = parts.next().unwrap_or("0").parse().ok()?;
506        let minor = parts.next().unwrap_or("0").parse().ok()?;
507        let patch = parts.next().unwrap_or("0").parse().ok()?;
508        let tag = parts.next();
509        Some((major, minor, patch, tag))
510    }
511
512    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
513        self.semver()
514            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
515    }
516
517    /// Return true if self is older than the given major/minor/patch
518    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
519        let version = self.semver_or_panic();
520        (version.0, version.1, version.2) < (major, minor, patch)
521    }
522
523    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
524        let parts = self.semver_or_panic();
525        let tag = if keep_tag { parts.3 } else { None };
526        let new_parts = match part {
527            VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
528            VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
529            VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
530        };
531        let new_version = if let Some(tag) = tag {
532            format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
533        } else {
534            format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
535        };
536        Self {
537            library: self.library.clone(),
538            version: new_version,
539        }
540    }
541}
542
543impl Default for WriterVersion {
544    #[cfg(not(test))]
545    fn default() -> Self {
546        Self {
547            library: "lance".to_string(),
548            version: env!("CARGO_PKG_VERSION").to_string(),
549        }
550    }
551
552    // Unit tests always run as if they are in the next version.
553    #[cfg(test)]
554    fn default() -> Self {
555        Self {
556            library: "lance".to_string(),
557            version: env!("CARGO_PKG_VERSION").to_string(),
558        }
559        .bump(VersionPart::Patch, true)
560    }
561}
562
563impl ProtoStruct for Manifest {
564    type Proto = pb::Manifest;
565}
566
567impl From<pb::BasePath> for BasePath {
568    fn from(p: pb::BasePath) -> Self {
569        Self {
570            id: p.id,
571            name: p.name,
572            is_dataset_root: p.is_dataset_root,
573            path: p.path,
574        }
575    }
576}
577
578impl TryFrom<pb::Manifest> for Manifest {
579    type Error = Error;
580
581    fn try_from(p: pb::Manifest) -> Result<Self> {
582        let timestamp_nanos = p.timestamp.map(|ts| {
583            let sec = ts.seconds as u128 * 1e9 as u128;
584            let nanos = ts.nanos as u128;
585            sec + nanos
586        });
587        // We only use the writer version if it is fully set.
588        let writer_version = match p.writer_version {
589            Some(pb::manifest::WriterVersion { library, version }) => {
590                Some(WriterVersion { library, version })
591            }
592            _ => None,
593        };
594        let fragments = Arc::new(
595            p.fragments
596                .into_iter()
597                .map(Fragment::try_from)
598                .collect::<Result<Vec<_>>>()?,
599        );
600        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
601        let fields_with_meta = FieldsWithMeta {
602            fields: Fields(p.fields),
603            metadata: p.metadata,
604        };
605
606        if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
607            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
608        {
609            return Err(Error::Internal {
610                message: "All fragments must have row ids".into(),
611                location: location!(),
612            });
613        }
614
615        let data_storage_format = match p.data_format {
616            None => {
617                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
618                    // If there are fragments, they are a better indicator
619                    DataStorageFormat::new(inferred_version)
620                } else {
621                    // No fragments to inspect, best we can do is look at writer flags
622                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
623                        DataStorageFormat::new(LanceFileVersion::Stable)
624                    } else {
625                        DataStorageFormat::new(LanceFileVersion::Legacy)
626                    }
627                }
628            }
629            Some(format) => DataStorageFormat::from(format),
630        };
631
632        let schema = Schema::from(fields_with_meta);
633        let local_schema = schema.retain_storage_class(StorageClass::Default);
634
635        Ok(Self {
636            schema,
637            local_schema,
638            version: p.version,
639            writer_version,
640            version_aux_data: p.version_aux_data as usize,
641            index_section: p.index_section.map(|i| i as usize),
642            timestamp_nanos: timestamp_nanos.unwrap_or(0),
643            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
644            reader_feature_flags: p.reader_feature_flags,
645            writer_feature_flags: p.writer_feature_flags,
646            max_fragment_id: p.max_fragment_id,
647            fragments,
648            transaction_file: if p.transaction_file.is_empty() {
649                None
650            } else {
651                Some(p.transaction_file)
652            },
653            fragment_offsets,
654            next_row_id: p.next_row_id,
655            data_storage_format,
656            config: p.config,
657            blob_dataset_version: if p.blob_dataset_version == 0 {
658                None
659            } else {
660                Some(p.blob_dataset_version)
661            },
662            base_paths: p
663                .base_paths
664                .iter()
665                .map(|item| (item.id, item.clone().into()))
666                .collect(),
667        })
668    }
669}
670
671impl From<&Manifest> for pb::Manifest {
672    fn from(m: &Manifest) -> Self {
673        let timestamp_nanos = if m.timestamp_nanos == 0 {
674            None
675        } else {
676            let nanos = m.timestamp_nanos % 1e9 as u128;
677            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
678            Some(Timestamp {
679                seconds,
680                nanos: nanos as i32,
681            })
682        };
683        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
684        Self {
685            fields: fields_with_meta.fields.0,
686            version: m.version,
687            writer_version: m
688                .writer_version
689                .as_ref()
690                .map(|wv| pb::manifest::WriterVersion {
691                    library: wv.library.clone(),
692                    version: wv.version.clone(),
693                }),
694            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
695            metadata: fields_with_meta.metadata,
696            version_aux_data: m.version_aux_data as u64,
697            index_section: m.index_section.map(|i| i as u64),
698            timestamp: timestamp_nanos,
699            tag: m.tag.clone().unwrap_or_default(),
700            reader_feature_flags: m.reader_feature_flags,
701            writer_feature_flags: m.writer_feature_flags,
702            max_fragment_id: m.max_fragment_id,
703            transaction_file: m.transaction_file.clone().unwrap_or_default(),
704            next_row_id: m.next_row_id,
705            data_format: Some(pb::manifest::DataStorageFormat {
706                file_format: m.data_storage_format.file_format.clone(),
707                version: m.data_storage_format.version.clone(),
708            }),
709            config: m.config.clone(),
710            blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
711            base_paths: m
712                .base_paths
713                .values()
714                .map(|base_path| pb::BasePath {
715                    id: base_path.id,
716                    name: base_path.name.clone(),
717                    is_dataset_root: base_path.is_dataset_root,
718                    path: base_path.path.clone(),
719                })
720                .collect(),
721        }
722    }
723}
724
725#[async_trait]
726pub trait SelfDescribingFileReader {
727    /// Open a file reader without any cached schema
728    ///
729    /// In this case the schema will first need to be loaded
730    /// from the file itself.
731    ///
732    /// When loading files from a dataset it is preferable to use
733    /// the fragment reader to avoid this overhead.
734    async fn try_new_self_described(
735        object_store: &ObjectStore,
736        path: &Path,
737        cache: Option<&LanceCache>,
738    ) -> Result<Self>
739    where
740        Self: Sized,
741    {
742        let reader = object_store.open(path).await?;
743        Self::try_new_self_described_from_reader(reader.into(), cache).await
744    }
745
746    async fn try_new_self_described_from_reader(
747        reader: Arc<dyn Reader>,
748        cache: Option<&LanceCache>,
749    ) -> Result<Self>
750    where
751        Self: Sized;
752}
753
754#[async_trait]
755impl SelfDescribingFileReader for FileReader {
756    async fn try_new_self_described_from_reader(
757        reader: Arc<dyn Reader>,
758        cache: Option<&LanceCache>,
759    ) -> Result<Self> {
760        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
761        let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
762            message: format!(
763                "Attempt to open file at {} as self-describing but it did not contain a manifest",
764                reader.path(),
765            ),
766            location: location!(),
767        })?;
768        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
769        if manifest.should_use_legacy_format() {
770            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
771        }
772        let schema = manifest.schema;
773        let max_field_id = schema.max_field_id().unwrap_or_default();
774        Self::try_new_from_reader(
775            reader.path(),
776            reader.clone(),
777            Some(metadata),
778            schema,
779            0,
780            0,
781            max_field_id,
782            cache,
783        )
784        .await
785    }
786}
787
788#[cfg(test)]
789mod tests {
790    use crate::format::DataFile;
791
792    use super::*;
793
794    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
795    use lance_core::datatypes::Field;
796
797    #[test]
798    fn test_writer_version() {
799        let wv = WriterVersion::default();
800        assert_eq!(wv.library, "lance");
801        let parts = wv.semver().unwrap();
802        assert_eq!(
803            parts,
804            (
805                env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
806                env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
807                // Unit tests run against (major,minor,patch + 1)
808                env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
809                None
810            )
811        );
812        assert_eq!(
813            format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
814            env!("CARGO_PKG_VERSION")
815        );
816        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
817            let bumped = wv.bump(*part, false);
818            let bumped_parts = bumped.semver_or_panic();
819            assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
820        }
821    }
822
823    #[test]
824    fn test_fragments_by_offset_range() {
825        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
826            "a",
827            arrow_schema::DataType::Int64,
828            false,
829        )]);
830        let schema = Schema::try_from(&arrow_schema).unwrap();
831        let fragments = vec![
832            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
833            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
834            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
835        ];
836        let manifest = Manifest::new(
837            schema,
838            Arc::new(fragments),
839            DataStorageFormat::default(),
840            /*blob_dataset_version= */ None,
841            /*ref_main_location= */ HashMap::new(),
842        );
843
844        let actual = manifest.fragments_by_offset_range(0..10);
845        assert_eq!(actual.len(), 1);
846        assert_eq!(actual[0].0, 0);
847        assert_eq!(actual[0].1.id, 0);
848
849        let actual = manifest.fragments_by_offset_range(5..15);
850        assert_eq!(actual.len(), 2);
851        assert_eq!(actual[0].0, 0);
852        assert_eq!(actual[0].1.id, 0);
853        assert_eq!(actual[1].0, 10);
854        assert_eq!(actual[1].1.id, 1);
855
856        let actual = manifest.fragments_by_offset_range(15..50);
857        assert_eq!(actual.len(), 2);
858        assert_eq!(actual[0].0, 10);
859        assert_eq!(actual[0].1.id, 1);
860        assert_eq!(actual[1].0, 25);
861        assert_eq!(actual[1].1.id, 2);
862
863        // Out of range
864        let actual = manifest.fragments_by_offset_range(45..100);
865        assert!(actual.is_empty());
866
867        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
868    }
869
870    #[test]
871    fn test_max_field_id() {
872        // Validate that max field id handles varying field ids by fragment.
873        let mut field0 =
874            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
875        field0.set_id(-1, &mut 0);
876        let mut field2 =
877            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
878        field2.set_id(-1, &mut 2);
879
880        let schema = Schema {
881            fields: vec![field0, field2],
882            metadata: Default::default(),
883        };
884        let fragments = vec![
885            Fragment {
886                id: 0,
887                files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
888                deletion_file: None,
889                row_id_meta: None,
890                physical_rows: None,
891            },
892            Fragment {
893                id: 1,
894                files: vec![
895                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
896                    DataFile::new_legacy_from_fields("path3", vec![2]),
897                ],
898                deletion_file: None,
899                row_id_meta: None,
900                physical_rows: None,
901            },
902        ];
903
904        let manifest = Manifest::new(
905            schema,
906            Arc::new(fragments),
907            DataStorageFormat::default(),
908            /*blob_dataset_version= */ None,
909            /*ref_main_location= */ HashMap::new(),
910        );
911
912        assert_eq!(manifest.max_field_id(), 43);
913    }
914
915    #[test]
916    fn test_config() {
917        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
918            "a",
919            arrow_schema::DataType::Int64,
920            false,
921        )]);
922        let schema = Schema::try_from(&arrow_schema).unwrap();
923        let fragments = vec![
924            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
925            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
926            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
927        ];
928        let mut manifest = Manifest::new(
929            schema,
930            Arc::new(fragments),
931            DataStorageFormat::default(),
932            /*blob_dataset_version= */ None,
933            /*ref_main_location= */ HashMap::new(),
934        );
935
936        let mut config = manifest.config.clone();
937        config.insert("lance.test".to_string(), "value".to_string());
938        config.insert("other-key".to_string(), "other-value".to_string());
939
940        manifest.update_config(config.clone());
941        assert_eq!(manifest.config, config.clone());
942
943        config.remove("other-key");
944        manifest.delete_config_keys(&["other-key"]);
945        assert_eq!(manifest.config, config);
946    }
947}