lance_table/format/
manifest.rs

1// SPDX-License-Identifier: Apache-2.0
2// SPDX-FileCopyrightText: Copyright The Lance Authors
3
4use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use deepsize::DeepSizeOf;
11use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
12use lance_file::reader::FileReader;
13use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
14use lance_io::traits::{ProtoStruct, Reader};
15use object_store::path::Path;
16use prost::Message;
17use prost_types::Timestamp;
18
19use super::Fragment;
20use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_MOVE_STABLE_ROW_IDS};
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::{Schema, StorageClass};
24use lance_core::{Error, Result};
25use lance_io::object_store::ObjectStore;
26use lance_io::utils::read_struct;
27use snafu::location;
28
29/// Manifest of a dataset
30///
31///  * Schema
32///  * Version
33///  * Fragments.
34///  * Indices.
35#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
36pub struct Manifest {
37    /// Dataset schema.
38    pub schema: Schema,
39
40    /// Local schema, only containing fields with the default storage class (not blobs)
41    pub local_schema: Schema,
42
43    /// Dataset version
44    pub version: u64,
45
46    /// Version of the writer library that wrote this manifest.
47    pub writer_version: Option<WriterVersion>,
48
49    /// Fragments, the pieces to build the dataset.
50    ///
51    /// This list is stored in order, sorted by fragment id.  However, the fragment id
52    /// sequence may have gaps.
53    pub fragments: Arc<Vec<Fragment>>,
54
55    /// The file position of the version aux data.
56    pub version_aux_data: usize,
57
58    /// The file position of the index metadata.
59    pub index_section: Option<usize>,
60
61    /// The creation timestamp with nanosecond resolution as 128-bit integer
62    pub timestamp_nanos: u128,
63
64    /// An optional string tag for this version
65    pub tag: Option<String>,
66
67    /// The reader flags
68    pub reader_feature_flags: u64,
69
70    /// The writer flags
71    pub writer_feature_flags: u64,
72
73    /// The max fragment id used so far  
74    /// None means never set, Some(0) means max ID used so far is 0
75    pub max_fragment_id: Option<u32>,
76
77    /// The path to the transaction file, relative to the root of the dataset
78    pub transaction_file: Option<String>,
79
80    /// Precomputed logic offset of each fragment
81    /// accelerating the fragment search using offset ranges.
82    fragment_offsets: Vec<usize>,
83
84    /// The max row id used so far.
85    pub next_row_id: u64,
86
87    /// The storage format of the data files.
88    pub data_storage_format: DataStorageFormat,
89
90    /// Table configuration.
91    pub config: HashMap<String, String>,
92
93    /// Blob dataset version
94    pub blob_dataset_version: Option<u64>,
95}
96
97// We use the most significant bit to indicate that a transaction is detached
98pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
99
100pub fn is_detached_version(version: u64) -> bool {
101    version & DETACHED_VERSION_MASK != 0
102}
103
104fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
105    fragments
106        .iter()
107        .map(|f| f.num_rows().unwrap_or_default())
108        .chain([0]) // Make the last offset to be the full-length of the dataset.
109        .scan(0_usize, |offset, len| {
110            let start = *offset;
111            *offset += len;
112            Some(start)
113        })
114        .collect()
115}
116
117impl Manifest {
118    pub fn new(
119        schema: Schema,
120        fragments: Arc<Vec<Fragment>>,
121        data_storage_format: DataStorageFormat,
122        blob_dataset_version: Option<u64>,
123    ) -> Self {
124        let fragment_offsets = compute_fragment_offsets(&fragments);
125        let local_schema = schema.retain_storage_class(StorageClass::Default);
126
127        Self {
128            schema,
129            local_schema,
130            version: 1,
131            writer_version: Some(WriterVersion::default()),
132            fragments,
133            version_aux_data: 0,
134            index_section: None,
135            timestamp_nanos: 0,
136            tag: None,
137            reader_feature_flags: 0,
138            writer_feature_flags: 0,
139            max_fragment_id: None,
140            transaction_file: None,
141            fragment_offsets,
142            next_row_id: 0,
143            data_storage_format,
144            config: HashMap::new(),
145            blob_dataset_version,
146        }
147    }
148
149    pub fn new_from_previous(
150        previous: &Self,
151        schema: Schema,
152        fragments: Arc<Vec<Fragment>>,
153        new_blob_version: Option<u64>,
154    ) -> Self {
155        let fragment_offsets = compute_fragment_offsets(&fragments);
156        let local_schema = schema.retain_storage_class(StorageClass::Default);
157
158        let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
159
160        Self {
161            schema,
162            local_schema,
163            version: previous.version + 1,
164            writer_version: Some(WriterVersion::default()),
165            fragments,
166            version_aux_data: 0,
167            index_section: None, // Caller should update index if they want to keep them.
168            timestamp_nanos: 0,  // This will be set on commit
169            tag: None,
170            reader_feature_flags: 0, // These will be set on commit
171            writer_feature_flags: 0, // These will be set on commit
172            max_fragment_id: previous.max_fragment_id,
173            transaction_file: None,
174            fragment_offsets,
175            next_row_id: previous.next_row_id,
176            data_storage_format: previous.data_storage_format.clone(),
177            config: previous.config.clone(),
178            blob_dataset_version,
179        }
180    }
181
182    /// Return the `timestamp_nanos` value as a Utc DateTime
183    pub fn timestamp(&self) -> DateTime<Utc> {
184        let nanos = self.timestamp_nanos % 1_000_000_000;
185        let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
186        Utc.from_utc_datetime(
187            &DateTime::from_timestamp(seconds, nanos as u32)
188                .unwrap_or_default()
189                .naive_utc(),
190        )
191    }
192
193    /// Set the `timestamp_nanos` value from a Utc DateTime
194    pub fn set_timestamp(&mut self, nanos: u128) {
195        self.timestamp_nanos = nanos;
196    }
197
198    /// Set the `config` from an iterator
199    pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
200        self.config.extend(upsert_values);
201    }
202
203    /// Delete `config` keys using a slice of keys
204    pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
205        self.config
206            .retain(|key, _| !delete_keys.contains(&key.as_str()));
207    }
208
209    /// Replaces the schema metadata with the given key-value pairs.
210    pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
211        self.schema.metadata = new_metadata;
212    }
213
214    /// Replaces the metadata of the field with the given id with the given key-value pairs.
215    ///
216    /// If the field does not exist in the schema, this is a no-op.
217    pub fn replace_field_metadata(
218        &mut self,
219        field_id: i32,
220        new_metadata: HashMap<String, String>,
221    ) -> Result<()> {
222        if let Some(field) = self.schema.field_by_id_mut(field_id) {
223            field.metadata = new_metadata;
224            Ok(())
225        } else {
226            Err(Error::invalid_input(
227                format!(
228                    "Field with id {} does not exist for replace_field_metadata",
229                    field_id
230                ),
231                location!(),
232            ))
233        }
234    }
235
236    /// Check the current fragment list and update the high water mark
237    pub fn update_max_fragment_id(&mut self) {
238        // If there are no fragments, don't update max_fragment_id
239        if self.fragments.is_empty() {
240            return;
241        }
242
243        let max_fragment_id = self
244            .fragments
245            .iter()
246            .map(|f| f.id)
247            .max()
248            .unwrap() // Safe because we checked fragments is not empty
249            .try_into()
250            .unwrap();
251
252        match self.max_fragment_id {
253            None => {
254                // First time being set
255                self.max_fragment_id = Some(max_fragment_id);
256            }
257            Some(current_max) => {
258                // Only update if the computed max is greater than current
259                // This preserves the high water mark even when fragments are deleted
260                if max_fragment_id > current_max {
261                    self.max_fragment_id = Some(max_fragment_id);
262                }
263            }
264        }
265    }
266
267    /// Return the max fragment id.
268    /// Note this does not support recycling of fragment ids.
269    ///
270    /// This will return None if there are no fragments and max_fragment_id was never set.
271    pub fn max_fragment_id(&self) -> Option<u64> {
272        if let Some(max_id) = self.max_fragment_id {
273            // Return the stored high water mark
274            Some(max_id.into())
275        } else {
276            // Not yet set, compute from fragment list
277            self.fragments.iter().map(|f| f.id).max()
278        }
279    }
280
281    /// Get the max used field id
282    ///
283    /// This is different than [Schema::max_field_id] because it also considers
284    /// the field ids in the data files that have been dropped from the schema.
285    pub fn max_field_id(&self) -> i32 {
286        let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
287        let fragment_max_id = self
288            .fragments
289            .iter()
290            .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
291            .max()
292            .copied();
293        let fragment_max_id = fragment_max_id.unwrap_or(-1);
294        schema_max_id.max(fragment_max_id)
295    }
296
297    /// Return the fragments that are newer than the given manifest.
298    /// Note this does not support recycling of fragment ids.
299    pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
300        if since.version >= self.version {
301            return Err(Error::io(
302                format!(
303                    "fragments_since: given version {} is newer than manifest version {}",
304                    since.version, self.version
305                ),
306                location!(),
307            ));
308        }
309        let start = since.max_fragment_id();
310        Ok(self
311            .fragments
312            .iter()
313            .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
314            .cloned()
315            .collect())
316    }
317
318    /// Find the fragments that contain the rows, identified by the offset range.
319    ///
320    /// Note that the offsets are the logical offsets of rows, not row IDs.
321    ///
322    ///
323    /// Parameters
324    /// ----------
325    /// range: Range<usize>
326    ///     Offset range
327    ///
328    /// Returns
329    /// -------
330    /// Vec<(usize, Fragment)>
331    ///    A vector of `(starting_offset_of_fragment, fragment)` pairs.
332    ///
333    pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
334        let start = range.start;
335        let end = range.end;
336        let idx = self
337            .fragment_offsets
338            .binary_search(&start)
339            .unwrap_or_else(|idx| idx - 1);
340
341        let mut fragments = vec![];
342        for i in idx..self.fragments.len() {
343            if self.fragment_offsets[i] >= end
344                || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
345                    <= start
346            {
347                break;
348            }
349            fragments.push((self.fragment_offsets[i], &self.fragments[i]));
350        }
351
352        fragments
353    }
354
355    /// Whether the dataset uses move-stable row ids.
356    pub fn uses_move_stable_row_ids(&self) -> bool {
357        self.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0
358    }
359
360    /// Creates a serialized copy of the manifest, suitable for IPC or temp storage
361    /// and can be used to create a dataset
362    pub fn serialized(&self) -> Vec<u8> {
363        let pb_manifest: pb::Manifest = self.into();
364        pb_manifest.encode_to_vec()
365    }
366
367    pub fn should_use_legacy_format(&self) -> bool {
368        self.data_storage_format.version == LEGACY_FORMAT_VERSION
369    }
370}
371
372#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
373pub struct WriterVersion {
374    pub library: String,
375    pub version: String,
376}
377
378#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
379pub struct DataStorageFormat {
380    pub file_format: String,
381    pub version: String,
382}
383
384const LANCE_FORMAT_NAME: &str = "lance";
385
386impl DataStorageFormat {
387    pub fn new(version: LanceFileVersion) -> Self {
388        Self {
389            file_format: LANCE_FORMAT_NAME.to_string(),
390            version: version.resolve().to_string(),
391        }
392    }
393
394    pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
395        self.version.parse::<LanceFileVersion>()
396    }
397}
398
399impl Default for DataStorageFormat {
400    fn default() -> Self {
401        Self::new(LanceFileVersion::default())
402    }
403}
404
405impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
406    fn from(pb: pb::manifest::DataStorageFormat) -> Self {
407        Self {
408            file_format: pb.file_format,
409            version: pb.version,
410        }
411    }
412}
413
414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
415pub enum VersionPart {
416    Major,
417    Minor,
418    Patch,
419}
420
421impl WriterVersion {
422    /// Try to parse the version string as a semver string. Returns None if
423    /// not successful.
424    pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
425        let mut parts = self.version.split('.');
426        let major = parts.next().unwrap_or("0").parse().ok()?;
427        let minor = parts.next().unwrap_or("0").parse().ok()?;
428        let patch = parts.next().unwrap_or("0").parse().ok()?;
429        let tag = parts.next();
430        Some((major, minor, patch, tag))
431    }
432
433    pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
434        self.semver()
435            .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
436    }
437
438    /// Return true if self is older than the given major/minor/patch
439    pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
440        let version = self.semver_or_panic();
441        (version.0, version.1, version.2) < (major, minor, patch)
442    }
443
444    pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
445        let parts = self.semver_or_panic();
446        let tag = if keep_tag { parts.3 } else { None };
447        let new_parts = match part {
448            VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
449            VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
450            VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
451        };
452        let new_version = if let Some(tag) = tag {
453            format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
454        } else {
455            format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
456        };
457        Self {
458            library: self.library.clone(),
459            version: new_version,
460        }
461    }
462}
463
464impl Default for WriterVersion {
465    #[cfg(not(test))]
466    fn default() -> Self {
467        Self {
468            library: "lance".to_string(),
469            version: env!("CARGO_PKG_VERSION").to_string(),
470        }
471    }
472
473    // Unit tests always run as if they are in the next version.
474    #[cfg(test)]
475    fn default() -> Self {
476        Self {
477            library: "lance".to_string(),
478            version: env!("CARGO_PKG_VERSION").to_string(),
479        }
480        .bump(VersionPart::Patch, true)
481    }
482}
483
484impl ProtoStruct for Manifest {
485    type Proto = pb::Manifest;
486}
487
488impl TryFrom<pb::Manifest> for Manifest {
489    type Error = Error;
490
491    fn try_from(p: pb::Manifest) -> Result<Self> {
492        let timestamp_nanos = p.timestamp.map(|ts| {
493            let sec = ts.seconds as u128 * 1e9 as u128;
494            let nanos = ts.nanos as u128;
495            sec + nanos
496        });
497        // We only use the writer version if it is fully set.
498        let writer_version = match p.writer_version {
499            Some(pb::manifest::WriterVersion { library, version }) => {
500                Some(WriterVersion { library, version })
501            }
502            _ => None,
503        };
504        let fragments = Arc::new(
505            p.fragments
506                .into_iter()
507                .map(Fragment::try_from)
508                .collect::<Result<Vec<_>>>()?,
509        );
510        let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
511        let fields_with_meta = FieldsWithMeta {
512            fields: Fields(p.fields),
513            metadata: p.metadata,
514        };
515
516        if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0
517            && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
518        {
519            return Err(Error::Internal {
520                message: "All fragments must have row ids".into(),
521                location: location!(),
522            });
523        }
524
525        let data_storage_format = match p.data_format {
526            None => {
527                if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
528                    // If there are fragments, they are a better indicator
529                    DataStorageFormat::new(inferred_version)
530                } else {
531                    // No fragments to inspect, best we can do is look at writer flags
532                    if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
533                        DataStorageFormat::new(LanceFileVersion::Stable)
534                    } else {
535                        DataStorageFormat::new(LanceFileVersion::Legacy)
536                    }
537                }
538            }
539            Some(format) => DataStorageFormat::from(format),
540        };
541
542        let schema = Schema::from(fields_with_meta);
543        let local_schema = schema.retain_storage_class(StorageClass::Default);
544
545        Ok(Self {
546            schema,
547            local_schema,
548            version: p.version,
549            writer_version,
550            version_aux_data: p.version_aux_data as usize,
551            index_section: p.index_section.map(|i| i as usize),
552            timestamp_nanos: timestamp_nanos.unwrap_or(0),
553            tag: if p.tag.is_empty() { None } else { Some(p.tag) },
554            reader_feature_flags: p.reader_feature_flags,
555            writer_feature_flags: p.writer_feature_flags,
556            max_fragment_id: p.max_fragment_id,
557            fragments,
558            transaction_file: if p.transaction_file.is_empty() {
559                None
560            } else {
561                Some(p.transaction_file)
562            },
563            fragment_offsets,
564            next_row_id: p.next_row_id,
565            data_storage_format,
566            config: p.config,
567            blob_dataset_version: if p.blob_dataset_version == 0 {
568                None
569            } else {
570                Some(p.blob_dataset_version)
571            },
572        })
573    }
574}
575
576impl From<&Manifest> for pb::Manifest {
577    fn from(m: &Manifest) -> Self {
578        let timestamp_nanos = if m.timestamp_nanos == 0 {
579            None
580        } else {
581            let nanos = m.timestamp_nanos % 1e9 as u128;
582            let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
583            Some(Timestamp {
584                seconds,
585                nanos: nanos as i32,
586            })
587        };
588        let fields_with_meta: FieldsWithMeta = (&m.schema).into();
589        Self {
590            fields: fields_with_meta.fields.0,
591            version: m.version,
592            writer_version: m
593                .writer_version
594                .as_ref()
595                .map(|wv| pb::manifest::WriterVersion {
596                    library: wv.library.clone(),
597                    version: wv.version.clone(),
598                }),
599            fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
600            metadata: fields_with_meta.metadata,
601            version_aux_data: m.version_aux_data as u64,
602            index_section: m.index_section.map(|i| i as u64),
603            timestamp: timestamp_nanos,
604            tag: m.tag.clone().unwrap_or_default(),
605            reader_feature_flags: m.reader_feature_flags,
606            writer_feature_flags: m.writer_feature_flags,
607            max_fragment_id: m.max_fragment_id,
608            transaction_file: m.transaction_file.clone().unwrap_or_default(),
609            next_row_id: m.next_row_id,
610            data_format: Some(pb::manifest::DataStorageFormat {
611                file_format: m.data_storage_format.file_format.clone(),
612                version: m.data_storage_format.version.clone(),
613            }),
614            config: m.config.clone(),
615            blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
616        }
617    }
618}
619
620#[async_trait]
621pub trait SelfDescribingFileReader {
622    /// Open a file reader without any cached schema
623    ///
624    /// In this case the schema will first need to be loaded
625    /// from the file itself.
626    ///
627    /// When loading files from a dataset it is preferable to use
628    /// the fragment reader to avoid this overhead.
629    async fn try_new_self_described(
630        object_store: &ObjectStore,
631        path: &Path,
632        cache: Option<&LanceCache>,
633    ) -> Result<Self>
634    where
635        Self: Sized,
636    {
637        let reader = object_store.open(path).await?;
638        Self::try_new_self_described_from_reader(reader.into(), cache).await
639    }
640
641    async fn try_new_self_described_from_reader(
642        reader: Arc<dyn Reader>,
643        cache: Option<&LanceCache>,
644    ) -> Result<Self>
645    where
646        Self: Sized;
647}
648
649#[async_trait]
650impl SelfDescribingFileReader for FileReader {
651    async fn try_new_self_described_from_reader(
652        reader: Arc<dyn Reader>,
653        cache: Option<&LanceCache>,
654    ) -> Result<Self> {
655        let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
656        let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
657            message: format!(
658                "Attempt to open file at {} as self-describing but it did not contain a manifest",
659                reader.path(),
660            ),
661            location: location!(),
662        })?;
663        let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
664        if manifest.should_use_legacy_format() {
665            populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
666        }
667        let schema = manifest.schema;
668        let max_field_id = schema.max_field_id().unwrap_or_default();
669        Self::try_new_from_reader(
670            reader.path(),
671            reader.clone(),
672            Some(metadata),
673            schema,
674            0,
675            0,
676            max_field_id,
677            cache,
678        )
679        .await
680    }
681}
682
683#[cfg(test)]
684mod tests {
685    use crate::format::DataFile;
686
687    use super::*;
688
689    use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
690    use lance_core::datatypes::Field;
691
692    #[test]
693    fn test_writer_version() {
694        let wv = WriterVersion::default();
695        assert_eq!(wv.library, "lance");
696        let parts = wv.semver().unwrap();
697        assert_eq!(
698            parts,
699            (
700                env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
701                env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
702                // Unit tests run against (major,minor,patch + 1)
703                env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
704                None
705            )
706        );
707        assert_eq!(
708            format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
709            env!("CARGO_PKG_VERSION")
710        );
711        for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
712            let bumped = wv.bump(*part, false);
713            let bumped_parts = bumped.semver_or_panic();
714            assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
715        }
716    }
717
718    #[test]
719    fn test_fragments_by_offset_range() {
720        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
721            "a",
722            arrow_schema::DataType::Int64,
723            false,
724        )]);
725        let schema = Schema::try_from(&arrow_schema).unwrap();
726        let fragments = vec![
727            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
728            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
729            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
730        ];
731        let manifest = Manifest::new(
732            schema,
733            Arc::new(fragments),
734            DataStorageFormat::default(),
735            /*blob_dataset_version= */ None,
736        );
737
738        let actual = manifest.fragments_by_offset_range(0..10);
739        assert_eq!(actual.len(), 1);
740        assert_eq!(actual[0].0, 0);
741        assert_eq!(actual[0].1.id, 0);
742
743        let actual = manifest.fragments_by_offset_range(5..15);
744        assert_eq!(actual.len(), 2);
745        assert_eq!(actual[0].0, 0);
746        assert_eq!(actual[0].1.id, 0);
747        assert_eq!(actual[1].0, 10);
748        assert_eq!(actual[1].1.id, 1);
749
750        let actual = manifest.fragments_by_offset_range(15..50);
751        assert_eq!(actual.len(), 2);
752        assert_eq!(actual[0].0, 10);
753        assert_eq!(actual[0].1.id, 1);
754        assert_eq!(actual[1].0, 25);
755        assert_eq!(actual[1].1.id, 2);
756
757        // Out of range
758        let actual = manifest.fragments_by_offset_range(45..100);
759        assert!(actual.is_empty());
760
761        assert!(manifest.fragments_by_offset_range(200..400).is_empty());
762    }
763
764    #[test]
765    fn test_max_field_id() {
766        // Validate that max field id handles varying field ids by fragment.
767        let mut field0 =
768            Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
769        field0.set_id(-1, &mut 0);
770        let mut field2 =
771            Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
772        field2.set_id(-1, &mut 2);
773
774        let schema = Schema {
775            fields: vec![field0, field2],
776            metadata: Default::default(),
777        };
778        let fragments = vec![
779            Fragment {
780                id: 0,
781                files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
782                deletion_file: None,
783                row_id_meta: None,
784                physical_rows: None,
785            },
786            Fragment {
787                id: 1,
788                files: vec![
789                    DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
790                    DataFile::new_legacy_from_fields("path3", vec![2]),
791                ],
792                deletion_file: None,
793                row_id_meta: None,
794                physical_rows: None,
795            },
796        ];
797
798        let manifest = Manifest::new(
799            schema,
800            Arc::new(fragments),
801            DataStorageFormat::default(),
802            /*blob_dataset_version= */ None,
803        );
804
805        assert_eq!(manifest.max_field_id(), 43);
806    }
807
808    #[test]
809    fn test_config() {
810        let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
811            "a",
812            arrow_schema::DataType::Int64,
813            false,
814        )]);
815        let schema = Schema::try_from(&arrow_schema).unwrap();
816        let fragments = vec![
817            Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
818            Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
819            Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
820        ];
821        let mut manifest = Manifest::new(
822            schema,
823            Arc::new(fragments),
824            DataStorageFormat::default(),
825            /*blob_dataset_version= */ None,
826        );
827
828        let mut config = manifest.config.clone();
829        config.insert("lance.test".to_string(), "value".to_string());
830        config.insert("other-key".to_string(), "other-value".to_string());
831
832        manifest.update_config(config.clone());
833        assert_eq!(manifest.config, config.clone());
834
835        config.remove("other-key");
836        manifest.delete_config_keys(&["other-key"]);
837        assert_eq!(manifest.config, config);
838    }
839}