lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
11use lance_file::reader::FileReader;
12use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
13use lance_io::traits::{ProtoStruct, Reader};
14use object_store::path::Path;
15use prost::Message;
16use prost_types::Timestamp;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_MOVE_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::FileMetadataCache;
22use lance_core::datatypes::{Schema, StorageClass};
23use lance_core::{Error, Result};
24use lance_io::object_store::ObjectStore;
25use lance_io::utils::read_struct;
26use snafu::location;
27
28/// Manifest of a dataset
29///
30///  * Schema
31///  * Version
32///  * Fragments.
33///  * Indices.
34#[derive(Debug, Clone, PartialEq)]
35pub struct Manifest {
36    /// Dataset schema.
37    pub schema: Schema,
38
39    /// Local schema, only containing fields with the default storage class (not blobs)
40    pub local_schema: Schema,
41
42    /// Dataset version
43    pub version: u64,
44
45    /// Version of the writer library that wrote this manifest.
46    pub writer_version: Option<WriterVersion>,
47
48    /// Fragments, the pieces to build the dataset.
49    ///
50    /// This list is stored in order, sorted by fragment id.  However, the fragment id
51    /// sequence may have gaps.
52    pub fragments: Arc<Vec<Fragment>>,
53
54    /// The file position of the version aux data.
55    pub version_aux_data: usize,
56
57    /// The file position of the index metadata.
58    pub index_section: Option<usize>,
59
60    /// The creation timestamp with nanosecond resolution as 128-bit integer
61    pub timestamp_nanos: u128,
62
63    /// An optional string tag for this version
64    pub tag: Option<String>,
65
66    /// The reader flags
67    pub reader_feature_flags: u64,
68
69    /// The writer flags
70    pub writer_feature_flags: u64,
71
72    /// The max fragment id used so far
73    pub max_fragment_id: u32,
74
75    /// The path to the transaction file, relative to the root of the dataset
76    pub transaction_file: Option<String>,
77
78    /// Precomputed logic offset of each fragment
79    /// accelerating the fragment search using offset ranges.
80    fragment_offsets: Vec<usize>,
81
82    /// The max row id used so far.
83    pub next_row_id: u64,
84
85    /// The storage format of the data files.
86    pub data_storage_format: DataStorageFormat,
87
88    /// Table configuration.
89    pub config: HashMap<String, String>,
90
91    /// Blob dataset version
92    pub blob_dataset_version: Option<u64>,
93}
94
95// We use the most significant bit to indicate that a transaction is detached
96pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
97
98pub fn is_detached_version(version: u64) -> bool {
99    version & DETACHED_VERSION_MASK != 0
100}
101
102fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
103    fragments
104        .iter()
105        .map(|f| f.num_rows().unwrap_or_default())
106        .chain([0]) // Make the last offset to be the full-length of the dataset.
107        .scan(0_usize, |offset, len| {
108            let start = *offset;
109            *offset += len;
110            Some(start)
111        })
112        .collect()
113}
114
115impl Manifest {
116    pub fn new(
117        schema: Schema,
118        fragments: Arc<Vec<Fragment>>,
119        data_storage_format: DataStorageFormat,
120        blob_dataset_version: Option<u64>,
121    ) -> Self {
122        let fragment_offsets = compute_fragment_offsets(&fragments);
123        let local_schema = schema.retain_storage_class(StorageClass::Default);
124
125        Self {
126            schema,
127            local_schema,
128            version: 1,
129            writer_version: Some(WriterVersion::default()),
130            fragments,
131            version_aux_data: 0,
132            index_section: None,
133            timestamp_nanos: 0,
134            tag: None,
135            reader_feature_flags: 0,
136            writer_feature_flags: 0,
137            max_fragment_id: 0,
138            transaction_file: None,
139            fragment_offsets,
140            next_row_id: 0,
141            data_storage_format,
142            config: HashMap::new(),
143            blob_dataset_version,
144        }
145    }
146
147    pub fn new_from_previous(
148        previous: &Self,
149        schema: Schema,
150        fragments: Arc<Vec<Fragment>>,
151        new_blob_version: Option<u64>,
152    ) -> Self {
153        let fragment_offsets = compute_fragment_offsets(&fragments);
154        let local_schema = schema.retain_storage_class(StorageClass::Default);
155
156        let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
157
158        Self {
159            schema,
160            local_schema,
161            version: previous.version + 1,
162            writer_version: Some(WriterVersion::default()),
163            fragments,
164            version_aux_data: 0,
165            index_section: None, // Caller should update index if they want to keep them.
166            timestamp_nanos: 0,  // This will be set on commit
167            tag: None,
168            reader_feature_flags: 0, // These will be set on commit
169            writer_feature_flags: 0, // These will be set on commit
170            max_fragment_id: previous.max_fragment_id,
171            transaction_file: None,
172            fragment_offsets,
173            next_row_id: previous.next_row_id,
174            data_storage_format: previous.data_storage_format.clone(),
175            config: previous.config.clone(),
176            blob_dataset_version,
177        }
178    }
179
180    /// Return the `timestamp_nanos` value as a Utc DateTime
181    pub fn timestamp(&self) -> DateTime<Utc> {
182        let nanos = self.timestamp_nanos % 1_000_000_000;
183        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
184        Utc.from_utc_datetime(
185            &DateTime::from_timestamp(seconds, nanos as u32)
186                .unwrap_or_default()
187                .naive_utc(),
188        )
189    }
190
191    /// Set the `timestamp_nanos` value from a Utc DateTime
192    pub fn set_timestamp(&mut self, nanos: u128) {
193        self.timestamp_nanos = nanos;
194    }
195
196    /// Set the `config` from an iterator
197    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
198        self.config.extend(upsert_values);
199    }
200
201    /// Delete `config` keys using a slice of keys
202    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
203        self.config
204            .retain(|key, _| !delete_keys.contains(&key.as_str()));
205    }
206
207    /// Replaces the schema metadata with the given key-value pairs.
208    pub fn update_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
209        self.schema.metadata = new_metadata;
210    }
211
212    /// Replaces the metadata of the field with the given id with the given key-value pairs.
213    ///
214    /// If the field does not exist in the schema, this is a no-op.
215    pub fn update_field_metadata(&mut self, field_id: i32, new_metadata: HashMap<String, String>) {
216        if let Some(field) = self.schema.field_by_id_mut(field_id) {
217            field.metadata = new_metadata;
218        }
219    }
220
221    /// Check the current fragment list and update the high water mark
222    pub fn update_max_fragment_id(&mut self) {
223        let max_fragment_id = self
224            .fragments
225            .iter()
226            .map(|f| f.id)
227            .max()
228            .unwrap_or_default()
229            .try_into()
230            .unwrap();
231
232        if max_fragment_id > self.max_fragment_id {
233            self.max_fragment_id = max_fragment_id;
234        }
235    }
236
237    /// Return the max fragment id.
238    /// Note this does not support recycling of fragment ids.
239    ///
240    /// This will return None if there are no fragments.
241    pub fn max_fragment_id(&self) -> Option<u64> {
242        if self.max_fragment_id == 0 {
243            // It might not have been updated, so the best we can do is recompute
244            // it from the fragment list.
245            self.fragments.iter().map(|f| f.id).max()
246        } else {
247            Some(self.max_fragment_id.into())
248        }
249    }
250
251    /// Get the max used field id
252    ///
253    /// This is different than [Schema::max_field_id] because it also considers
254    /// the field ids in the data files that have been dropped from the schema.
255    pub fn max_field_id(&self) -> i32 {
256        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
257        let fragment_max_id = self
258            .fragments
259            .iter()
260            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
261            .max()
262            .copied();
263        let fragment_max_id = fragment_max_id.unwrap_or(-1);
264        schema_max_id.max(fragment_max_id)
265    }
266
267    /// Return the fragments that are newer than the given manifest.
268    /// Note this does not support recycling of fragment ids.
269    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
270        if since.version >= self.version {
271            return Err(Error::io(
272                format!(
273                    "fragments_since: given version {} is newer than manifest version {}",
274                    since.version, self.version
275                ),
276                location!(),
277            ));
278        }
279        let start = since.max_fragment_id();
280        Ok(self
281            .fragments
282            .iter()
283            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
284            .cloned()
285            .collect())
286    }
287
288    /// Find the fragments that contain the rows, identified by the offset range.
289    ///
290    /// Note that the offsets are the logical offsets of rows, not row IDs.
291    ///
292    ///
293    /// Parameters
294    /// ----------
295    /// range: Range<usize>
296    ///     Offset range
297    ///
298    /// Returns
299    /// -------
300    /// Vec<(usize, Fragment)>
301    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
302    ///
303    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
304        let start = range.start;
305        let end = range.end;
306        let idx = self
307            .fragment_offsets
308            .binary_search(&start)
309            .unwrap_or_else(|idx| idx - 1);
310
311        let mut fragments = vec![];
312        for i in idx..self.fragments.len() {
313            if self.fragment_offsets[i] >= end
314                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
315                    <= start
316            {
317                break;
318            }
319            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
320        }
321
322        fragments
323    }
324
325    /// Whether the dataset uses move-stable row ids.
326    pub fn uses_move_stable_row_ids(&self) -> bool {
327        self.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0
328    }
329
330    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
331    /// and can be used to create a dataset
332    pub fn serialized(&self) -> Vec<u8> {
333        let pb_manifest: pb::Manifest = self.into();
334        pb_manifest.encode_to_vec()
335    }
336
337    pub fn should_use_legacy_format(&self) -> bool {
338        self.data_storage_format.version == LEGACY_FORMAT_VERSION
339    }
340}
341
342#[derive(Debug, Clone, PartialEq)]
343pub struct WriterVersion {
344    pub library: String,
345    pub version: String,
346}
347
348#[derive(Debug, Clone, PartialEq)]
349pub struct DataStorageFormat {
350    pub file_format: String,
351    pub version: String,
352}
353
354const LANCE_FORMAT_NAME: &str = "lance";
355
356impl DataStorageFormat {
357    pub fn new(version: LanceFileVersion) -> Self {
358        Self {
359            file_format: LANCE_FORMAT_NAME.to_string(),
360            version: version.resolve().to_string(),
361        }
362    }
363
364    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
365        self.version.parse::<LanceFileVersion>()
366    }
367}
368
369impl Default for DataStorageFormat {
370    fn default() -> Self {
371        Self::new(LanceFileVersion::default())
372    }
373}
374
375impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
376    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
377        Self {
378            file_format: pb.file_format,
379            version: pb.version,
380        }
381    }
382}
383
384#[derive(Debug, Clone, Copy, PartialEq, Eq)]
385pub enum VersionPart {
386    Major,
387    Minor,
388    Patch,
389}
390
391impl WriterVersion {
392    /// Try to parse the version string as a semver string. Returns None if
393    /// not successful.
394    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
395        let mut parts = self.version.split('.');
396        let major = parts.next().unwrap_or("0").parse().ok()?;
397        let minor = parts.next().unwrap_or("0").parse().ok()?;
398        let patch = parts.next().unwrap_or("0").parse().ok()?;
399        let tag = parts.next();
400        Some((major, minor, patch, tag))
401    }
402
403    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
404        self.semver()
405            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
406    }
407
408    /// Return true if self is older than the given major/minor/patch
409    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
410        let version = self.semver_or_panic();
411        (version.0, version.1, version.2) < (major, minor, patch)
412    }
413
414    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
415        let parts = self.semver_or_panic();
416        let tag = if keep_tag { parts.3 } else { None };
417        let new_parts = match part {
418            VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
419            VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
420            VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
421        };
422        let new_version = if let Some(tag) = tag {
423            format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
424        } else {
425            format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
426        };
427        Self {
428            library: self.library.clone(),
429            version: new_version,
430        }
431    }
432}
433
434impl Default for WriterVersion {
435    #[cfg(not(test))]
436    fn default() -> Self {
437        Self {
438            library: "lance".to_string(),
439            version: env!("CARGO_PKG_VERSION").to_string(),
440        }
441    }
442
443    // Unit tests always run as if they are in the next version.
444    #[cfg(test)]
445    fn default() -> Self {
446        Self {
447            library: "lance".to_string(),
448            version: env!("CARGO_PKG_VERSION").to_string(),
449        }
450        .bump(VersionPart::Patch, true)
451    }
452}
453
454impl ProtoStruct for Manifest {
455    type Proto = pb::Manifest;
456}
457
458impl TryFrom<pb::Manifest> for Manifest {
459    type Error = Error;
460
461    fn try_from(p: pb::Manifest) -> Result<Self> {
462        let timestamp_nanos = p.timestamp.map(|ts| {
463            let sec = ts.seconds as u128 * 1e9 as u128;
464            let nanos = ts.nanos as u128;
465            sec + nanos
466        });
467        // We only use the writer version if it is fully set.
468        let writer_version = match p.writer_version {
469            Some(pb::manifest::WriterVersion { library, version }) => {
470                Some(WriterVersion { library, version })
471            }
472            _ => None,
473        };
474        let fragments = Arc::new(
475            p.fragments
476                .into_iter()
477                .map(Fragment::try_from)
478                .collect::<Result<Vec<_>>>()?,
479        );
480        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
481        let fields_with_meta = FieldsWithMeta {
482            fields: Fields(p.fields),
483            metadata: p.metadata,
484        };
485
486        if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0
487            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
488        {
489            return Err(Error::Internal {
490                message: "All fragments must have row ids".into(),
491                location: location!(),
492            });
493        }
494
495        let data_storage_format = match p.data_format {
496            None => {
497                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
498                    // If there are fragments, they are a better indicator
499                    DataStorageFormat::new(inferred_version)
500                } else {
501                    // No fragments to inspect, best we can do is look at writer flags
502                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
503                        DataStorageFormat::new(LanceFileVersion::Stable)
504                    } else {
505                        DataStorageFormat::new(LanceFileVersion::Legacy)
506                    }
507                }
508            }
509            Some(format) => DataStorageFormat::from(format),
510        };
511
512        let schema = Schema::from(fields_with_meta);
513        let local_schema = schema.retain_storage_class(StorageClass::Default);
514
515        Ok(Self {
516            schema,
517            local_schema,
518            version: p.version,
519            writer_version,
520            fragments,
521            version_aux_data: p.version_aux_data as usize,
522            index_section: p.index_section.map(|i| i as usize),
523            timestamp_nanos: timestamp_nanos.unwrap_or(0),
524            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
525            reader_feature_flags: p.reader_feature_flags,
526            writer_feature_flags: p.writer_feature_flags,
527            max_fragment_id: p.max_fragment_id,
528            transaction_file: if p.transaction_file.is_empty() {
529                None
530            } else {
531                Some(p.transaction_file)
532            },
533            fragment_offsets,
534            next_row_id: p.next_row_id,
535            data_storage_format,
536            config: p.config,
537            blob_dataset_version: if p.blob_dataset_version == 0 {
538                None
539            } else {
540                Some(p.blob_dataset_version)
541            },
542        })
543    }
544}
545
546impl From<&Manifest> for pb::Manifest {
547    fn from(m: &Manifest) -> Self {
548        let timestamp_nanos = if m.timestamp_nanos == 0 {
549            None
550        } else {
551            let nanos = m.timestamp_nanos % 1e9 as u128;
552            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
553            Some(Timestamp {
554                seconds,
555                nanos: nanos as i32,
556            })
557        };
558        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
559        Self {
560            fields: fields_with_meta.fields.0,
561            version: m.version,
562            writer_version: m
563                .writer_version
564                .as_ref()
565                .map(|wv| pb::manifest::WriterVersion {
566                    library: wv.library.clone(),
567                    version: wv.version.clone(),
568                }),
569            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
570            metadata: fields_with_meta.metadata,
571            version_aux_data: m.version_aux_data as u64,
572            index_section: m.index_section.map(|i| i as u64),
573            timestamp: timestamp_nanos,
574            tag: m.tag.clone().unwrap_or_default(),
575            reader_feature_flags: m.reader_feature_flags,
576            writer_feature_flags: m.writer_feature_flags,
577            max_fragment_id: m.max_fragment_id,
578            transaction_file: m.transaction_file.clone().unwrap_or_default(),
579            next_row_id: m.next_row_id,
580            data_format: Some(pb::manifest::DataStorageFormat {
581                file_format: m.data_storage_format.file_format.clone(),
582                version: m.data_storage_format.version.clone(),
583            }),
584            config: m.config.clone(),
585            blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
586        }
587    }
588}
589
590#[async_trait]
591pub trait SelfDescribingFileReader {
592    /// Open a file reader without any cached schema
593    ///
594    /// In this case the schema will first need to be loaded
595    /// from the file itself.
596    ///
597    /// When loading files from a dataset it is preferable to use
598    /// the fragment reader to avoid this overhead.
599    async fn try_new_self_described(
600        object_store: &ObjectStore,
601        path: &Path,
602        cache: Option<&FileMetadataCache>,
603    ) -> Result<Self>
604    where
605        Self: Sized,
606    {
607        let reader = object_store.open(path).await?;
608        Self::try_new_self_described_from_reader(reader.into(), cache).await
609    }
610
611    async fn try_new_self_described_from_reader(
612        reader: Arc<dyn Reader>,
613        cache: Option<&FileMetadataCache>,
614    ) -> Result<Self>
615    where
616        Self: Sized;
617}
618
619#[async_trait]
620impl SelfDescribingFileReader for FileReader {
621    async fn try_new_self_described_from_reader(
622        reader: Arc<dyn Reader>,
623        cache: Option<&FileMetadataCache>,
624    ) -> Result<Self> {
625        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
626        let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
627            message: format!(
628                "Attempt to open file at {} as self-describing but it did not contain a manifest",
629                reader.path(),
630            ),
631            location: location!(),
632        })?;
633        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
634        populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
635        let schema = manifest.schema;
636        let max_field_id = schema.max_field_id().unwrap_or_default();
637        Self::try_new_from_reader(
638            reader.path(),
639            reader.clone(),
640            Some(metadata),
641            schema,
642            0,
643            0,
644            max_field_id,
645            cache,
646        )
647        .await
648    }
649}
650
651#[cfg(test)]
652mod tests {
653    use crate::format::DataFile;
654
655    use super::*;
656
657    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
658    use lance_core::datatypes::Field;
659
660    #[test]
661    fn test_writer_version() {
662        let wv = WriterVersion::default();
663        assert_eq!(wv.library, "lance");
664        let parts = wv.semver().unwrap();
665        assert_eq!(
666            parts,
667            (
668                env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
669                env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
670                // Unit tests run against (major,minor,patch + 1)
671                env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
672                None
673            )
674        );
675        assert_eq!(
676            format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
677            env!("CARGO_PKG_VERSION")
678        );
679        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
680            let bumped = wv.bump(*part, false);
681            let bumped_parts = bumped.semver_or_panic();
682            assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
683        }
684    }
685
686    #[test]
687    fn test_fragments_by_offset_range() {
688        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
689            "a",
690            arrow_schema::DataType::Int64,
691            false,
692        )]);
693        let schema = Schema::try_from(&arrow_schema).unwrap();
694        let fragments = vec![
695            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
696            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
697            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
698        ];
699        let manifest = Manifest::new(
700            schema,
701            Arc::new(fragments),
702            DataStorageFormat::default(),
703            /*blob_dataset_version= */ None,
704        );
705
706        let actual = manifest.fragments_by_offset_range(0..10);
707        assert_eq!(actual.len(), 1);
708        assert_eq!(actual[0].0, 0);
709        assert_eq!(actual[0].1.id, 0);
710
711        let actual = manifest.fragments_by_offset_range(5..15);
712        assert_eq!(actual.len(), 2);
713        assert_eq!(actual[0].0, 0);
714        assert_eq!(actual[0].1.id, 0);
715        assert_eq!(actual[1].0, 10);
716        assert_eq!(actual[1].1.id, 1);
717
718        let actual = manifest.fragments_by_offset_range(15..50);
719        assert_eq!(actual.len(), 2);
720        assert_eq!(actual[0].0, 10);
721        assert_eq!(actual[0].1.id, 1);
722        assert_eq!(actual[1].0, 25);
723        assert_eq!(actual[1].1.id, 2);
724
725        // Out of range
726        let actual = manifest.fragments_by_offset_range(45..100);
727        assert!(actual.is_empty());
728
729        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
730    }
731
732    #[test]
733    fn test_max_field_id() {
734        // Validate that max field id handles varying field ids by fragment.
735        let mut field0 =
736            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
737        field0.set_id(-1, &mut 0);
738        let mut field2 =
739            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
740        field2.set_id(-1, &mut 2);
741
742        let schema = Schema {
743            fields: vec![field0, field2],
744            metadata: Default::default(),
745        };
746        let fragments = vec![
747            Fragment {
748                id: 0,
749                files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
750                deletion_file: None,
751                row_id_meta: None,
752                physical_rows: None,
753            },
754            Fragment {
755                id: 1,
756                files: vec![
757                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
758                    DataFile::new_legacy_from_fields("path3", vec![2]),
759                ],
760                deletion_file: None,
761                row_id_meta: None,
762                physical_rows: None,
763            },
764        ];
765
766        let manifest = Manifest::new(
767            schema,
768            Arc::new(fragments),
769            DataStorageFormat::default(),
770            /*blob_dataset_version= */ None,
771        );
772
773        assert_eq!(manifest.max_field_id(), 43);
774    }
775
776    #[test]
777    fn test_config() {
778        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
779            "a",
780            arrow_schema::DataType::Int64,
781            false,
782        )]);
783        let schema = Schema::try_from(&arrow_schema).unwrap();
784        let fragments = vec![
785            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
786            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
787            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
788        ];
789        let mut manifest = Manifest::new(
790            schema,
791            Arc::new(fragments),
792            DataStorageFormat::default(),
793            /*blob_dataset_version= */ None,
794        );
795
796        let mut config = HashMap::new();
797        config.insert("lance:test".to_string(), "value".to_string());
798        config.insert("other-key".to_string(), "other-value".to_string());
799
800        manifest.update_config(config.clone());
801        assert_eq!(manifest.config, config.clone());
802
803        config.remove("other-key");
804        manifest.delete_config_keys(&["other-key"]);
805        assert_eq!(manifest.config, config);
806    }
807}