lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use deepsize::DeepSizeOf;
11use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
12use lance_file::reader::FileReader;
13use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
14use lance_io::traits::{ProtoStruct, Reader};
15use object_store::path::Path;
16use prost::Message;
17use prost_types::Timestamp;
18
19use super::Fragment;
20use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_MOVE_STABLE_ROW_IDS};
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::{Schema, StorageClass};
24use lance_core::{Error, Result};
25use lance_io::object_store::ObjectStore;
26use lance_io::utils::read_struct;
27use snafu::location;
28
29/// Manifest of a dataset
30///
31///  * Schema
32///  * Version
33///  * Fragments.
34///  * Indices.
35#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
36pub struct Manifest {
37    /// Dataset schema.
38    pub schema: Schema,
39
40    /// Local schema, only containing fields with the default storage class (not blobs)
41    pub local_schema: Schema,
42
43    /// Dataset version
44    pub version: u64,
45
46    /// Version of the writer library that wrote this manifest.
47    pub writer_version: Option<WriterVersion>,
48
49    /// Fragments, the pieces to build the dataset.
50    ///
51    /// This list is stored in order, sorted by fragment id.  However, the fragment id
52    /// sequence may have gaps.
53    pub fragments: Arc<Vec<Fragment>>,
54
55    /// The file position of the version aux data.
56    pub version_aux_data: usize,
57
58    /// The file position of the index metadata.
59    pub index_section: Option<usize>,
60
61    /// The creation timestamp with nanosecond resolution as 128-bit integer
62    pub timestamp_nanos: u128,
63
64    /// An optional string tag for this version
65    pub tag: Option<String>,
66
67    /// The reader flags
68    pub reader_feature_flags: u64,
69
70    /// The writer flags
71    pub writer_feature_flags: u64,
72
73    /// The max fragment id used so far
74    pub max_fragment_id: u32,
75
76    /// The path to the transaction file, relative to the root of the dataset
77    pub transaction_file: Option<String>,
78
79    /// Precomputed logic offset of each fragment
80    /// accelerating the fragment search using offset ranges.
81    fragment_offsets: Vec<usize>,
82
83    /// The max row id used so far.
84    pub next_row_id: u64,
85
86    /// The storage format of the data files.
87    pub data_storage_format: DataStorageFormat,
88
89    /// Table configuration.
90    pub config: HashMap<String, String>,
91
92    /// Blob dataset version
93    pub blob_dataset_version: Option<u64>,
94}
95
96// We use the most significant bit to indicate that a transaction is detached
97pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
98
99pub fn is_detached_version(version: u64) -> bool {
100    version & DETACHED_VERSION_MASK != 0
101}
102
103fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
104    fragments
105        .iter()
106        .map(|f| f.num_rows().unwrap_or_default())
107        .chain([0]) // Make the last offset to be the full-length of the dataset.
108        .scan(0_usize, |offset, len| {
109            let start = *offset;
110            *offset += len;
111            Some(start)
112        })
113        .collect()
114}
115
116impl Manifest {
117    pub fn new(
118        schema: Schema,
119        fragments: Arc<Vec<Fragment>>,
120        data_storage_format: DataStorageFormat,
121        blob_dataset_version: Option<u64>,
122    ) -> Self {
123        let fragment_offsets = compute_fragment_offsets(&fragments);
124        let local_schema = schema.retain_storage_class(StorageClass::Default);
125
126        Self {
127            schema,
128            local_schema,
129            version: 1,
130            writer_version: Some(WriterVersion::default()),
131            fragments,
132            version_aux_data: 0,
133            index_section: None,
134            timestamp_nanos: 0,
135            tag: None,
136            reader_feature_flags: 0,
137            writer_feature_flags: 0,
138            max_fragment_id: 0,
139            transaction_file: None,
140            fragment_offsets,
141            next_row_id: 0,
142            data_storage_format,
143            config: HashMap::new(),
144            blob_dataset_version,
145        }
146    }
147
148    pub fn new_from_previous(
149        previous: &Self,
150        schema: Schema,
151        fragments: Arc<Vec<Fragment>>,
152        new_blob_version: Option<u64>,
153    ) -> Self {
154        let fragment_offsets = compute_fragment_offsets(&fragments);
155        let local_schema = schema.retain_storage_class(StorageClass::Default);
156
157        let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
158
159        Self {
160            schema,
161            local_schema,
162            version: previous.version + 1,
163            writer_version: Some(WriterVersion::default()),
164            fragments,
165            version_aux_data: 0,
166            index_section: None, // Caller should update index if they want to keep them.
167            timestamp_nanos: 0,  // This will be set on commit
168            tag: None,
169            reader_feature_flags: 0, // These will be set on commit
170            writer_feature_flags: 0, // These will be set on commit
171            max_fragment_id: previous.max_fragment_id,
172            transaction_file: None,
173            fragment_offsets,
174            next_row_id: previous.next_row_id,
175            data_storage_format: previous.data_storage_format.clone(),
176            config: previous.config.clone(),
177            blob_dataset_version,
178        }
179    }
180
181    /// Return the `timestamp_nanos` value as a Utc DateTime
182    pub fn timestamp(&self) -> DateTime<Utc> {
183        let nanos = self.timestamp_nanos % 1_000_000_000;
184        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
185        Utc.from_utc_datetime(
186            &DateTime::from_timestamp(seconds, nanos as u32)
187                .unwrap_or_default()
188                .naive_utc(),
189        )
190    }
191
192    /// Set the `timestamp_nanos` value from a Utc DateTime
193    pub fn set_timestamp(&mut self, nanos: u128) {
194        self.timestamp_nanos = nanos;
195    }
196
197    /// Set the `config` from an iterator
198    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
199        self.config.extend(upsert_values);
200    }
201
202    /// Delete `config` keys using a slice of keys
203    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
204        self.config
205            .retain(|key, _| !delete_keys.contains(&key.as_str()));
206    }
207
208    /// Replaces the schema metadata with the given key-value pairs.
209    pub fn update_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
210        self.schema.metadata = new_metadata;
211    }
212
213    /// Replaces the metadata of the field with the given id with the given key-value pairs.
214    ///
215    /// If the field does not exist in the schema, this is a no-op.
216    pub fn update_field_metadata(&mut self, field_id: i32, new_metadata: HashMap<String, String>) {
217        if let Some(field) = self.schema.field_by_id_mut(field_id) {
218            field.metadata = new_metadata;
219        }
220    }
221
222    /// Check the current fragment list and update the high water mark
223    pub fn update_max_fragment_id(&mut self) {
224        let max_fragment_id = self
225            .fragments
226            .iter()
227            .map(|f| f.id)
228            .max()
229            .unwrap_or_default()
230            .try_into()
231            .unwrap();
232
233        if max_fragment_id > self.max_fragment_id {
234            self.max_fragment_id = max_fragment_id;
235        }
236    }
237
238    /// Return the max fragment id.
239    /// Note this does not support recycling of fragment ids.
240    ///
241    /// This will return None if there are no fragments.
242    pub fn max_fragment_id(&self) -> Option<u64> {
243        if self.max_fragment_id == 0 {
244            // It might not have been updated, so the best we can do is recompute
245            // it from the fragment list.
246            self.fragments.iter().map(|f| f.id).max()
247        } else {
248            Some(self.max_fragment_id.into())
249        }
250    }
251
252    /// Get the max used field id
253    ///
254    /// This is different than [Schema::max_field_id] because it also considers
255    /// the field ids in the data files that have been dropped from the schema.
256    pub fn max_field_id(&self) -> i32 {
257        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
258        let fragment_max_id = self
259            .fragments
260            .iter()
261            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
262            .max()
263            .copied();
264        let fragment_max_id = fragment_max_id.unwrap_or(-1);
265        schema_max_id.max(fragment_max_id)
266    }
267
268    /// Return the fragments that are newer than the given manifest.
269    /// Note this does not support recycling of fragment ids.
270    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
271        if since.version >= self.version {
272            return Err(Error::io(
273                format!(
274                    "fragments_since: given version {} is newer than manifest version {}",
275                    since.version, self.version
276                ),
277                location!(),
278            ));
279        }
280        let start = since.max_fragment_id();
281        Ok(self
282            .fragments
283            .iter()
284            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
285            .cloned()
286            .collect())
287    }
288
289    /// Find the fragments that contain the rows, identified by the offset range.
290    ///
291    /// Note that the offsets are the logical offsets of rows, not row IDs.
292    ///
293    ///
294    /// Parameters
295    /// ----------
296    /// range: Range<usize>
297    ///     Offset range
298    ///
299    /// Returns
300    /// -------
301    /// Vec<(usize, Fragment)>
302    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
303    ///
304    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
305        let start = range.start;
306        let end = range.end;
307        let idx = self
308            .fragment_offsets
309            .binary_search(&start)
310            .unwrap_or_else(|idx| idx - 1);
311
312        let mut fragments = vec![];
313        for i in idx..self.fragments.len() {
314            if self.fragment_offsets[i] >= end
315                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
316                    <= start
317            {
318                break;
319            }
320            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
321        }
322
323        fragments
324    }
325
326    /// Whether the dataset uses move-stable row ids.
327    pub fn uses_move_stable_row_ids(&self) -> bool {
328        self.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0
329    }
330
331    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
332    /// and can be used to create a dataset
333    pub fn serialized(&self) -> Vec<u8> {
334        let pb_manifest: pb::Manifest = self.into();
335        pb_manifest.encode_to_vec()
336    }
337
338    pub fn should_use_legacy_format(&self) -> bool {
339        self.data_storage_format.version == LEGACY_FORMAT_VERSION
340    }
341}
342
343#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
344pub struct WriterVersion {
345    pub library: String,
346    pub version: String,
347}
348
349#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
350pub struct DataStorageFormat {
351    pub file_format: String,
352    pub version: String,
353}
354
355const LANCE_FORMAT_NAME: &str = "lance";
356
357impl DataStorageFormat {
358    pub fn new(version: LanceFileVersion) -> Self {
359        Self {
360            file_format: LANCE_FORMAT_NAME.to_string(),
361            version: version.resolve().to_string(),
362        }
363    }
364
365    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
366        self.version.parse::<LanceFileVersion>()
367    }
368}
369
370impl Default for DataStorageFormat {
371    fn default() -> Self {
372        Self::new(LanceFileVersion::default())
373    }
374}
375
376impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
377    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
378        Self {
379            file_format: pb.file_format,
380            version: pb.version,
381        }
382    }
383}
384
385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
386pub enum VersionPart {
387    Major,
388    Minor,
389    Patch,
390}
391
392impl WriterVersion {
393    /// Try to parse the version string as a semver string. Returns None if
394    /// not successful.
395    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
396        let mut parts = self.version.split('.');
397        let major = parts.next().unwrap_or("0").parse().ok()?;
398        let minor = parts.next().unwrap_or("0").parse().ok()?;
399        let patch = parts.next().unwrap_or("0").parse().ok()?;
400        let tag = parts.next();
401        Some((major, minor, patch, tag))
402    }
403
404    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
405        self.semver()
406            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
407    }
408
409    /// Return true if self is older than the given major/minor/patch
410    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
411        let version = self.semver_or_panic();
412        (version.0, version.1, version.2) < (major, minor, patch)
413    }
414
415    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
416        let parts = self.semver_or_panic();
417        let tag = if keep_tag { parts.3 } else { None };
418        let new_parts = match part {
419            VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
420            VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
421            VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
422        };
423        let new_version = if let Some(tag) = tag {
424            format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
425        } else {
426            format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
427        };
428        Self {
429            library: self.library.clone(),
430            version: new_version,
431        }
432    }
433}
434
435impl Default for WriterVersion {
436    #[cfg(not(test))]
437    fn default() -> Self {
438        Self {
439            library: "lance".to_string(),
440            version: env!("CARGO_PKG_VERSION").to_string(),
441        }
442    }
443
444    // Unit tests always run as if they are in the next version.
445    #[cfg(test)]
446    fn default() -> Self {
447        Self {
448            library: "lance".to_string(),
449            version: env!("CARGO_PKG_VERSION").to_string(),
450        }
451        .bump(VersionPart::Patch, true)
452    }
453}
454
455impl ProtoStruct for Manifest {
456    type Proto = pb::Manifest;
457}
458
459impl TryFrom<pb::Manifest> for Manifest {
460    type Error = Error;
461
462    fn try_from(p: pb::Manifest) -> Result<Self> {
463        let timestamp_nanos = p.timestamp.map(|ts| {
464            let sec = ts.seconds as u128 * 1e9 as u128;
465            let nanos = ts.nanos as u128;
466            sec + nanos
467        });
468        // We only use the writer version if it is fully set.
469        let writer_version = match p.writer_version {
470            Some(pb::manifest::WriterVersion { library, version }) => {
471                Some(WriterVersion { library, version })
472            }
473            _ => None,
474        };
475        let fragments = Arc::new(
476            p.fragments
477                .into_iter()
478                .map(Fragment::try_from)
479                .collect::<Result<Vec<_>>>()?,
480        );
481        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
482        let fields_with_meta = FieldsWithMeta {
483            fields: Fields(p.fields),
484            metadata: p.metadata,
485        };
486
487        if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0
488            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
489        {
490            return Err(Error::Internal {
491                message: "All fragments must have row ids".into(),
492                location: location!(),
493            });
494        }
495
496        let data_storage_format = match p.data_format {
497            None => {
498                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
499                    // If there are fragments, they are a better indicator
500                    DataStorageFormat::new(inferred_version)
501                } else {
502                    // No fragments to inspect, best we can do is look at writer flags
503                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
504                        DataStorageFormat::new(LanceFileVersion::Stable)
505                    } else {
506                        DataStorageFormat::new(LanceFileVersion::Legacy)
507                    }
508                }
509            }
510            Some(format) => DataStorageFormat::from(format),
511        };
512
513        let schema = Schema::from(fields_with_meta);
514        let local_schema = schema.retain_storage_class(StorageClass::Default);
515
516        Ok(Self {
517            schema,
518            local_schema,
519            version: p.version,
520            writer_version,
521            fragments,
522            version_aux_data: p.version_aux_data as usize,
523            index_section: p.index_section.map(|i| i as usize),
524            timestamp_nanos: timestamp_nanos.unwrap_or(0),
525            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
526            reader_feature_flags: p.reader_feature_flags,
527            writer_feature_flags: p.writer_feature_flags,
528            max_fragment_id: p.max_fragment_id,
529            transaction_file: if p.transaction_file.is_empty() {
530                None
531            } else {
532                Some(p.transaction_file)
533            },
534            fragment_offsets,
535            next_row_id: p.next_row_id,
536            data_storage_format,
537            config: p.config,
538            blob_dataset_version: if p.blob_dataset_version == 0 {
539                None
540            } else {
541                Some(p.blob_dataset_version)
542            },
543        })
544    }
545}
546
547impl From<&Manifest> for pb::Manifest {
548    fn from(m: &Manifest) -> Self {
549        let timestamp_nanos = if m.timestamp_nanos == 0 {
550            None
551        } else {
552            let nanos = m.timestamp_nanos % 1e9 as u128;
553            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
554            Some(Timestamp {
555                seconds,
556                nanos: nanos as i32,
557            })
558        };
559        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
560        Self {
561            fields: fields_with_meta.fields.0,
562            version: m.version,
563            writer_version: m
564                .writer_version
565                .as_ref()
566                .map(|wv| pb::manifest::WriterVersion {
567                    library: wv.library.clone(),
568                    version: wv.version.clone(),
569                }),
570            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
571            metadata: fields_with_meta.metadata,
572            version_aux_data: m.version_aux_data as u64,
573            index_section: m.index_section.map(|i| i as u64),
574            timestamp: timestamp_nanos,
575            tag: m.tag.clone().unwrap_or_default(),
576            reader_feature_flags: m.reader_feature_flags,
577            writer_feature_flags: m.writer_feature_flags,
578            max_fragment_id: m.max_fragment_id,
579            transaction_file: m.transaction_file.clone().unwrap_or_default(),
580            next_row_id: m.next_row_id,
581            data_format: Some(pb::manifest::DataStorageFormat {
582                file_format: m.data_storage_format.file_format.clone(),
583                version: m.data_storage_format.version.clone(),
584            }),
585            config: m.config.clone(),
586            blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
587        }
588    }
589}
590
591#[async_trait]
592pub trait SelfDescribingFileReader {
593    /// Open a file reader without any cached schema
594    ///
595    /// In this case the schema will first need to be loaded
596    /// from the file itself.
597    ///
598    /// When loading files from a dataset it is preferable to use
599    /// the fragment reader to avoid this overhead.
600    async fn try_new_self_described(
601        object_store: &ObjectStore,
602        path: &Path,
603        cache: Option<&LanceCache>,
604    ) -> Result<Self>
605    where
606        Self: Sized,
607    {
608        let reader = object_store.open(path).await?;
609        Self::try_new_self_described_from_reader(reader.into(), cache).await
610    }
611
612    async fn try_new_self_described_from_reader(
613        reader: Arc<dyn Reader>,
614        cache: Option<&LanceCache>,
615    ) -> Result<Self>
616    where
617        Self: Sized;
618}
619
620#[async_trait]
621impl SelfDescribingFileReader for FileReader {
622    async fn try_new_self_described_from_reader(
623        reader: Arc<dyn Reader>,
624        cache: Option<&LanceCache>,
625    ) -> Result<Self> {
626        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
627        let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
628            message: format!(
629                "Attempt to open file at {} as self-describing but it did not contain a manifest",
630                reader.path(),
631            ),
632            location: location!(),
633        })?;
634        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
635        populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
636        let schema = manifest.schema;
637        let max_field_id = schema.max_field_id().unwrap_or_default();
638        Self::try_new_from_reader(
639            reader.path(),
640            reader.clone(),
641            Some(metadata),
642            schema,
643            0,
644            0,
645            max_field_id,
646            cache,
647        )
648        .await
649    }
650}
651
652#[cfg(test)]
653mod tests {
654    use crate::format::DataFile;
655
656    use super::*;
657
658    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
659    use lance_core::datatypes::Field;
660
661    #[test]
662    fn test_writer_version() {
663        let wv = WriterVersion::default();
664        assert_eq!(wv.library, "lance");
665        let parts = wv.semver().unwrap();
666        assert_eq!(
667            parts,
668            (
669                env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
670                env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
671                // Unit tests run against (major,minor,patch + 1)
672                env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
673                None
674            )
675        );
676        assert_eq!(
677            format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
678            env!("CARGO_PKG_VERSION")
679        );
680        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
681            let bumped = wv.bump(*part, false);
682            let bumped_parts = bumped.semver_or_panic();
683            assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
684        }
685    }
686
687    #[test]
688    fn test_fragments_by_offset_range() {
689        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
690            "a",
691            arrow_schema::DataType::Int64,
692            false,
693        )]);
694        let schema = Schema::try_from(&arrow_schema).unwrap();
695        let fragments = vec![
696            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
697            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
698            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
699        ];
700        let manifest = Manifest::new(
701            schema,
702            Arc::new(fragments),
703            DataStorageFormat::default(),
704            /*blob_dataset_version= */ None,
705        );
706
707        let actual = manifest.fragments_by_offset_range(0..10);
708        assert_eq!(actual.len(), 1);
709        assert_eq!(actual[0].0, 0);
710        assert_eq!(actual[0].1.id, 0);
711
712        let actual = manifest.fragments_by_offset_range(5..15);
713        assert_eq!(actual.len(), 2);
714        assert_eq!(actual[0].0, 0);
715        assert_eq!(actual[0].1.id, 0);
716        assert_eq!(actual[1].0, 10);
717        assert_eq!(actual[1].1.id, 1);
718
719        let actual = manifest.fragments_by_offset_range(15..50);
720        assert_eq!(actual.len(), 2);
721        assert_eq!(actual[0].0, 10);
722        assert_eq!(actual[0].1.id, 1);
723        assert_eq!(actual[1].0, 25);
724        assert_eq!(actual[1].1.id, 2);
725
726        // Out of range
727        let actual = manifest.fragments_by_offset_range(45..100);
728        assert!(actual.is_empty());
729
730        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
731    }
732
733    #[test]
734    fn test_max_field_id() {
735        // Validate that max field id handles varying field ids by fragment.
736        let mut field0 =
737            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
738        field0.set_id(-1, &mut 0);
739        let mut field2 =
740            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
741        field2.set_id(-1, &mut 2);
742
743        let schema = Schema {
744            fields: vec![field0, field2],
745            metadata: Default::default(),
746        };
747        let fragments = vec![
748            Fragment {
749                id: 0,
750                files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
751                deletion_file: None,
752                row_id_meta: None,
753                physical_rows: None,
754            },
755            Fragment {
756                id: 1,
757                files: vec![
758                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
759                    DataFile::new_legacy_from_fields("path3", vec![2]),
760                ],
761                deletion_file: None,
762                row_id_meta: None,
763                physical_rows: None,
764            },
765        ];
766
767        let manifest = Manifest::new(
768            schema,
769            Arc::new(fragments),
770            DataStorageFormat::default(),
771            /*blob_dataset_version= */ None,
772        );
773
774        assert_eq!(manifest.max_field_id(), 43);
775    }
776
777    #[test]
778    fn test_config() {
779        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
780            "a",
781            arrow_schema::DataType::Int64,
782            false,
783        )]);
784        let schema = Schema::try_from(&arrow_schema).unwrap();
785        let fragments = vec![
786            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
787            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
788            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
789        ];
790        let mut manifest = Manifest::new(
791            schema,
792            Arc::new(fragments),
793            DataStorageFormat::default(),
794            /*blob_dataset_version= */ None,
795        );
796
797        let mut config = manifest.config.clone();
798        config.insert("lance.test".to_string(), "value".to_string());
799        config.insert("other-key".to_string(), "other-value".to_string());
800
801        manifest.update_config(config.clone());
802        assert_eq!(manifest.config, config.clone());
803
804        config.remove("other-key");
805        manifest.delete_config_keys(&["other-key"]);
806        assert_eq!(manifest.config, config);
807    }
808}