1use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use deepsize::DeepSizeOf;
11use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
12use lance_file::reader::FileReader;
13use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
14use lance_io::traits::{ProtoStruct, Reader};
15use object_store::path::Path;
16use prost::Message;
17use prost_types::Timestamp;
18
19use super::Fragment;
20use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::{Schema, StorageClass};
24use lance_core::{Error, Result};
25use lance_io::object_store::ObjectStore;
26use lance_io::utils::read_struct;
27use snafu::location;
28
29#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
36pub struct Manifest {
37 pub schema: Schema,
39
40 pub local_schema: Schema,
42
43 pub version: u64,
45
46 pub writer_version: Option<WriterVersion>,
48
49 pub fragments: Arc<Vec<Fragment>>,
54
55 pub version_aux_data: usize,
57
58 pub index_section: Option<usize>,
60
61 pub timestamp_nanos: u128,
63
64 pub tag: Option<String>,
66
67 pub reader_feature_flags: u64,
69
70 pub writer_feature_flags: u64,
72
73 pub max_fragment_id: Option<u32>,
76
77 pub transaction_file: Option<String>,
79
80 fragment_offsets: Vec<usize>,
83
84 pub next_row_id: u64,
86
87 pub data_storage_format: DataStorageFormat,
89
90 pub config: HashMap<String, String>,
92
93 pub blob_dataset_version: Option<u64>,
95
96 pub base_paths: HashMap<u32, BasePath>,
98}
99
100pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
102
103pub fn is_detached_version(version: u64) -> bool {
104 version & DETACHED_VERSION_MASK != 0
105}
106
107fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
108 fragments
109 .iter()
110 .map(|f| f.num_rows().unwrap_or_default())
111 .chain([0]) .scan(0_usize, |offset, len| {
113 let start = *offset;
114 *offset += len;
115 Some(start)
116 })
117 .collect()
118}
119
120impl Manifest {
121 pub fn new(
122 schema: Schema,
123 fragments: Arc<Vec<Fragment>>,
124 data_storage_format: DataStorageFormat,
125 blob_dataset_version: Option<u64>,
126 base_paths: HashMap<u32, BasePath>,
127 ) -> Self {
128 let fragment_offsets = compute_fragment_offsets(&fragments);
129 let local_schema = schema.retain_storage_class(StorageClass::Default);
130
131 Self {
132 schema,
133 local_schema,
134 version: 1,
135 writer_version: Some(WriterVersion::default()),
136 fragments,
137 version_aux_data: 0,
138 index_section: None,
139 timestamp_nanos: 0,
140 tag: None,
141 reader_feature_flags: 0,
142 writer_feature_flags: 0,
143 max_fragment_id: None,
144 transaction_file: None,
145 fragment_offsets,
146 next_row_id: 0,
147 data_storage_format,
148 config: HashMap::new(),
149 blob_dataset_version,
150 base_paths,
151 }
152 }
153
154 pub fn new_from_previous(
155 previous: &Self,
156 schema: Schema,
157 fragments: Arc<Vec<Fragment>>,
158 new_blob_version: Option<u64>,
159 ) -> Self {
160 let fragment_offsets = compute_fragment_offsets(&fragments);
161 let local_schema = schema.retain_storage_class(StorageClass::Default);
162
163 let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
164
165 Self {
166 schema,
167 local_schema,
168 version: previous.version + 1,
169 writer_version: Some(WriterVersion::default()),
170 fragments,
171 version_aux_data: 0,
172 index_section: None, timestamp_nanos: 0, tag: None,
175 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
178 transaction_file: None,
179 fragment_offsets,
180 next_row_id: previous.next_row_id,
181 data_storage_format: previous.data_storage_format.clone(),
182 config: previous.config.clone(),
183 blob_dataset_version,
184 base_paths: previous.base_paths.clone(),
185 }
186 }
187
188 pub fn shallow_clone(
189 &self,
190 ref_name: Option<String>,
191 ref_path: String,
192 transaction_file: String,
193 ) -> Self {
194 let new_base_id = self.base_paths.keys().max().map(|id| *id + 1).unwrap_or(0);
195 let cloned_fragments = self
196 .fragments
197 .as_ref()
198 .iter()
199 .map(|fragment| {
200 let mut cloned_fragment = fragment.clone();
201 cloned_fragment.files = cloned_fragment
202 .files
203 .into_iter()
204 .map(|mut file| {
205 file.base_id = Some(new_base_id);
206 file
207 })
208 .collect();
209
210 if let Some(mut deletion) = cloned_fragment.deletion_file.take() {
211 deletion.base_id = Some(new_base_id);
212 cloned_fragment.deletion_file = Some(deletion);
213 }
214
215 cloned_fragment
216 })
217 .collect::<Vec<_>>();
218
219 Self {
220 schema: self.schema.clone(),
221 local_schema: self.local_schema.clone(),
222 version: self.version,
223 writer_version: self.writer_version.clone(),
224 fragments: Arc::new(cloned_fragments),
225 version_aux_data: self.version_aux_data,
226 index_section: None,
228 timestamp_nanos: self.timestamp_nanos,
229 reader_feature_flags: self.reader_feature_flags,
230 tag: None,
231 writer_feature_flags: self.writer_feature_flags,
232 max_fragment_id: self.max_fragment_id,
233 transaction_file: Some(transaction_file),
234 fragment_offsets: self.fragment_offsets.clone(),
235 next_row_id: self.next_row_id,
236 data_storage_format: self.data_storage_format.clone(),
237 config: self.config.clone(),
238 blob_dataset_version: self.blob_dataset_version,
239 base_paths: {
240 let mut base_paths = self.base_paths.clone();
241 let base_path = BasePath {
242 id: new_base_id,
243 name: ref_name,
244 is_dataset_root: true,
245 path: ref_path,
246 };
247 base_paths.insert(new_base_id, base_path);
248 base_paths
249 },
250 }
251 }
252
253 pub fn timestamp(&self) -> DateTime<Utc> {
255 let nanos = self.timestamp_nanos % 1_000_000_000;
256 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
257 Utc.from_utc_datetime(
258 &DateTime::from_timestamp(seconds, nanos as u32)
259 .unwrap_or_default()
260 .naive_utc(),
261 )
262 }
263
264 pub fn set_timestamp(&mut self, nanos: u128) {
266 self.timestamp_nanos = nanos;
267 }
268
269 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
271 self.config.extend(upsert_values);
272 }
273
274 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
276 self.config
277 .retain(|key, _| !delete_keys.contains(&key.as_str()));
278 }
279
280 pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
282 self.schema.metadata = new_metadata;
283 }
284
285 pub fn replace_field_metadata(
289 &mut self,
290 field_id: i32,
291 new_metadata: HashMap<String, String>,
292 ) -> Result<()> {
293 if let Some(field) = self.schema.field_by_id_mut(field_id) {
294 field.metadata = new_metadata;
295 Ok(())
296 } else {
297 Err(Error::invalid_input(
298 format!(
299 "Field with id {} does not exist for replace_field_metadata",
300 field_id
301 ),
302 location!(),
303 ))
304 }
305 }
306
307 pub fn update_max_fragment_id(&mut self) {
309 if self.fragments.is_empty() {
311 return;
312 }
313
314 let max_fragment_id = self
315 .fragments
316 .iter()
317 .map(|f| f.id)
318 .max()
319 .unwrap() .try_into()
321 .unwrap();
322
323 match self.max_fragment_id {
324 None => {
325 self.max_fragment_id = Some(max_fragment_id);
327 }
328 Some(current_max) => {
329 if max_fragment_id > current_max {
332 self.max_fragment_id = Some(max_fragment_id);
333 }
334 }
335 }
336 }
337
338 pub fn max_fragment_id(&self) -> Option<u64> {
343 if let Some(max_id) = self.max_fragment_id {
344 Some(max_id.into())
346 } else {
347 self.fragments.iter().map(|f| f.id).max()
349 }
350 }
351
352 pub fn max_field_id(&self) -> i32 {
357 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
358 let fragment_max_id = self
359 .fragments
360 .iter()
361 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
362 .max()
363 .copied();
364 let fragment_max_id = fragment_max_id.unwrap_or(-1);
365 schema_max_id.max(fragment_max_id)
366 }
367
368 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
371 if since.version >= self.version {
372 return Err(Error::io(
373 format!(
374 "fragments_since: given version {} is newer than manifest version {}",
375 since.version, self.version
376 ),
377 location!(),
378 ));
379 }
380 let start = since.max_fragment_id();
381 Ok(self
382 .fragments
383 .iter()
384 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
385 .cloned()
386 .collect())
387 }
388
389 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
405 let start = range.start;
406 let end = range.end;
407 let idx = self
408 .fragment_offsets
409 .binary_search(&start)
410 .unwrap_or_else(|idx| idx - 1);
411
412 let mut fragments = vec![];
413 for i in idx..self.fragments.len() {
414 if self.fragment_offsets[i] >= end
415 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
416 <= start
417 {
418 break;
419 }
420 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
421 }
422
423 fragments
424 }
425
426 pub fn uses_stable_row_ids(&self) -> bool {
428 self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
429 }
430
431 pub fn serialized(&self) -> Vec<u8> {
434 let pb_manifest: pb::Manifest = self.into();
435 pb_manifest.encode_to_vec()
436 }
437
438 pub fn should_use_legacy_format(&self) -> bool {
439 self.data_storage_format.version == LEGACY_FORMAT_VERSION
440 }
441}
442
443#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
444pub struct BasePath {
445 pub id: u32,
446 pub name: Option<String>,
447 pub is_dataset_root: bool,
448 pub path: String,
449}
450
451#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
452pub struct WriterVersion {
453 pub library: String,
454 pub version: String,
455}
456
457#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
458pub struct DataStorageFormat {
459 pub file_format: String,
460 pub version: String,
461}
462
463const LANCE_FORMAT_NAME: &str = "lance";
464
465impl DataStorageFormat {
466 pub fn new(version: LanceFileVersion) -> Self {
467 Self {
468 file_format: LANCE_FORMAT_NAME.to_string(),
469 version: version.resolve().to_string(),
470 }
471 }
472
473 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
474 self.version.parse::<LanceFileVersion>()
475 }
476}
477
478impl Default for DataStorageFormat {
479 fn default() -> Self {
480 Self::new(LanceFileVersion::default())
481 }
482}
483
484impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
485 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
486 Self {
487 file_format: pb.file_format,
488 version: pb.version,
489 }
490 }
491}
492
493#[derive(Debug, Clone, Copy, PartialEq, Eq)]
494pub enum VersionPart {
495 Major,
496 Minor,
497 Patch,
498}
499
500impl WriterVersion {
501 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
504 let mut parts = self.version.split('.');
505 let major = parts.next().unwrap_or("0").parse().ok()?;
506 let minor = parts.next().unwrap_or("0").parse().ok()?;
507 let patch = parts.next().unwrap_or("0").parse().ok()?;
508 let tag = parts.next();
509 Some((major, minor, patch, tag))
510 }
511
512 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
513 self.semver()
514 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
515 }
516
517 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
519 let version = self.semver_or_panic();
520 (version.0, version.1, version.2) < (major, minor, patch)
521 }
522
523 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
524 let parts = self.semver_or_panic();
525 let tag = if keep_tag { parts.3 } else { None };
526 let new_parts = match part {
527 VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
528 VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
529 VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
530 };
531 let new_version = if let Some(tag) = tag {
532 format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
533 } else {
534 format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
535 };
536 Self {
537 library: self.library.clone(),
538 version: new_version,
539 }
540 }
541}
542
543impl Default for WriterVersion {
544 #[cfg(not(test))]
545 fn default() -> Self {
546 Self {
547 library: "lance".to_string(),
548 version: env!("CARGO_PKG_VERSION").to_string(),
549 }
550 }
551
552 #[cfg(test)]
554 fn default() -> Self {
555 Self {
556 library: "lance".to_string(),
557 version: env!("CARGO_PKG_VERSION").to_string(),
558 }
559 .bump(VersionPart::Patch, true)
560 }
561}
562
563impl ProtoStruct for Manifest {
564 type Proto = pb::Manifest;
565}
566
567impl From<pb::BasePath> for BasePath {
568 fn from(p: pb::BasePath) -> Self {
569 Self {
570 id: p.id,
571 name: p.name,
572 is_dataset_root: p.is_dataset_root,
573 path: p.path,
574 }
575 }
576}
577
578impl TryFrom<pb::Manifest> for Manifest {
579 type Error = Error;
580
581 fn try_from(p: pb::Manifest) -> Result<Self> {
582 let timestamp_nanos = p.timestamp.map(|ts| {
583 let sec = ts.seconds as u128 * 1e9 as u128;
584 let nanos = ts.nanos as u128;
585 sec + nanos
586 });
587 let writer_version = match p.writer_version {
589 Some(pb::manifest::WriterVersion { library, version }) => {
590 Some(WriterVersion { library, version })
591 }
592 _ => None,
593 };
594 let fragments = Arc::new(
595 p.fragments
596 .into_iter()
597 .map(Fragment::try_from)
598 .collect::<Result<Vec<_>>>()?,
599 );
600 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
601 let fields_with_meta = FieldsWithMeta {
602 fields: Fields(p.fields),
603 metadata: p.metadata,
604 };
605
606 if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
607 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
608 {
609 return Err(Error::Internal {
610 message: "All fragments must have row ids".into(),
611 location: location!(),
612 });
613 }
614
615 let data_storage_format = match p.data_format {
616 None => {
617 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
618 DataStorageFormat::new(inferred_version)
620 } else {
621 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
623 DataStorageFormat::new(LanceFileVersion::Stable)
624 } else {
625 DataStorageFormat::new(LanceFileVersion::Legacy)
626 }
627 }
628 }
629 Some(format) => DataStorageFormat::from(format),
630 };
631
632 let schema = Schema::from(fields_with_meta);
633 let local_schema = schema.retain_storage_class(StorageClass::Default);
634
635 Ok(Self {
636 schema,
637 local_schema,
638 version: p.version,
639 writer_version,
640 version_aux_data: p.version_aux_data as usize,
641 index_section: p.index_section.map(|i| i as usize),
642 timestamp_nanos: timestamp_nanos.unwrap_or(0),
643 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
644 reader_feature_flags: p.reader_feature_flags,
645 writer_feature_flags: p.writer_feature_flags,
646 max_fragment_id: p.max_fragment_id,
647 fragments,
648 transaction_file: if p.transaction_file.is_empty() {
649 None
650 } else {
651 Some(p.transaction_file)
652 },
653 fragment_offsets,
654 next_row_id: p.next_row_id,
655 data_storage_format,
656 config: p.config,
657 blob_dataset_version: if p.blob_dataset_version == 0 {
658 None
659 } else {
660 Some(p.blob_dataset_version)
661 },
662 base_paths: p
663 .base_paths
664 .iter()
665 .map(|item| (item.id, item.clone().into()))
666 .collect(),
667 })
668 }
669}
670
671impl From<&Manifest> for pb::Manifest {
672 fn from(m: &Manifest) -> Self {
673 let timestamp_nanos = if m.timestamp_nanos == 0 {
674 None
675 } else {
676 let nanos = m.timestamp_nanos % 1e9 as u128;
677 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
678 Some(Timestamp {
679 seconds,
680 nanos: nanos as i32,
681 })
682 };
683 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
684 Self {
685 fields: fields_with_meta.fields.0,
686 version: m.version,
687 writer_version: m
688 .writer_version
689 .as_ref()
690 .map(|wv| pb::manifest::WriterVersion {
691 library: wv.library.clone(),
692 version: wv.version.clone(),
693 }),
694 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
695 metadata: fields_with_meta.metadata,
696 version_aux_data: m.version_aux_data as u64,
697 index_section: m.index_section.map(|i| i as u64),
698 timestamp: timestamp_nanos,
699 tag: m.tag.clone().unwrap_or_default(),
700 reader_feature_flags: m.reader_feature_flags,
701 writer_feature_flags: m.writer_feature_flags,
702 max_fragment_id: m.max_fragment_id,
703 transaction_file: m.transaction_file.clone().unwrap_or_default(),
704 next_row_id: m.next_row_id,
705 data_format: Some(pb::manifest::DataStorageFormat {
706 file_format: m.data_storage_format.file_format.clone(),
707 version: m.data_storage_format.version.clone(),
708 }),
709 config: m.config.clone(),
710 blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
711 base_paths: m
712 .base_paths
713 .values()
714 .map(|base_path| pb::BasePath {
715 id: base_path.id,
716 name: base_path.name.clone(),
717 is_dataset_root: base_path.is_dataset_root,
718 path: base_path.path.clone(),
719 })
720 .collect(),
721 }
722 }
723}
724
725#[async_trait]
726pub trait SelfDescribingFileReader {
727 async fn try_new_self_described(
735 object_store: &ObjectStore,
736 path: &Path,
737 cache: Option<&LanceCache>,
738 ) -> Result<Self>
739 where
740 Self: Sized,
741 {
742 let reader = object_store.open(path).await?;
743 Self::try_new_self_described_from_reader(reader.into(), cache).await
744 }
745
746 async fn try_new_self_described_from_reader(
747 reader: Arc<dyn Reader>,
748 cache: Option<&LanceCache>,
749 ) -> Result<Self>
750 where
751 Self: Sized;
752}
753
754#[async_trait]
755impl SelfDescribingFileReader for FileReader {
756 async fn try_new_self_described_from_reader(
757 reader: Arc<dyn Reader>,
758 cache: Option<&LanceCache>,
759 ) -> Result<Self> {
760 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
761 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
762 message: format!(
763 "Attempt to open file at {} as self-describing but it did not contain a manifest",
764 reader.path(),
765 ),
766 location: location!(),
767 })?;
768 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
769 if manifest.should_use_legacy_format() {
770 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
771 }
772 let schema = manifest.schema;
773 let max_field_id = schema.max_field_id().unwrap_or_default();
774 Self::try_new_from_reader(
775 reader.path(),
776 reader.clone(),
777 Some(metadata),
778 schema,
779 0,
780 0,
781 max_field_id,
782 cache,
783 )
784 .await
785 }
786}
787
788#[cfg(test)]
789mod tests {
790 use crate::format::DataFile;
791
792 use super::*;
793
794 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
795 use lance_core::datatypes::Field;
796
797 #[test]
798 fn test_writer_version() {
799 let wv = WriterVersion::default();
800 assert_eq!(wv.library, "lance");
801 let parts = wv.semver().unwrap();
802 assert_eq!(
803 parts,
804 (
805 env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
806 env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
807 env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
809 None
810 )
811 );
812 assert_eq!(
813 format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
814 env!("CARGO_PKG_VERSION")
815 );
816 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
817 let bumped = wv.bump(*part, false);
818 let bumped_parts = bumped.semver_or_panic();
819 assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
820 }
821 }
822
823 #[test]
824 fn test_fragments_by_offset_range() {
825 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
826 "a",
827 arrow_schema::DataType::Int64,
828 false,
829 )]);
830 let schema = Schema::try_from(&arrow_schema).unwrap();
831 let fragments = vec![
832 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
833 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
834 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
835 ];
836 let manifest = Manifest::new(
837 schema,
838 Arc::new(fragments),
839 DataStorageFormat::default(),
840 None,
841 HashMap::new(),
842 );
843
844 let actual = manifest.fragments_by_offset_range(0..10);
845 assert_eq!(actual.len(), 1);
846 assert_eq!(actual[0].0, 0);
847 assert_eq!(actual[0].1.id, 0);
848
849 let actual = manifest.fragments_by_offset_range(5..15);
850 assert_eq!(actual.len(), 2);
851 assert_eq!(actual[0].0, 0);
852 assert_eq!(actual[0].1.id, 0);
853 assert_eq!(actual[1].0, 10);
854 assert_eq!(actual[1].1.id, 1);
855
856 let actual = manifest.fragments_by_offset_range(15..50);
857 assert_eq!(actual.len(), 2);
858 assert_eq!(actual[0].0, 10);
859 assert_eq!(actual[0].1.id, 1);
860 assert_eq!(actual[1].0, 25);
861 assert_eq!(actual[1].1.id, 2);
862
863 let actual = manifest.fragments_by_offset_range(45..100);
865 assert!(actual.is_empty());
866
867 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
868 }
869
870 #[test]
871 fn test_max_field_id() {
872 let mut field0 =
874 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
875 field0.set_id(-1, &mut 0);
876 let mut field2 =
877 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
878 field2.set_id(-1, &mut 2);
879
880 let schema = Schema {
881 fields: vec![field0, field2],
882 metadata: Default::default(),
883 };
884 let fragments = vec![
885 Fragment {
886 id: 0,
887 files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
888 deletion_file: None,
889 row_id_meta: None,
890 physical_rows: None,
891 },
892 Fragment {
893 id: 1,
894 files: vec![
895 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
896 DataFile::new_legacy_from_fields("path3", vec![2]),
897 ],
898 deletion_file: None,
899 row_id_meta: None,
900 physical_rows: None,
901 },
902 ];
903
904 let manifest = Manifest::new(
905 schema,
906 Arc::new(fragments),
907 DataStorageFormat::default(),
908 None,
909 HashMap::new(),
910 );
911
912 assert_eq!(manifest.max_field_id(), 43);
913 }
914
915 #[test]
916 fn test_config() {
917 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
918 "a",
919 arrow_schema::DataType::Int64,
920 false,
921 )]);
922 let schema = Schema::try_from(&arrow_schema).unwrap();
923 let fragments = vec![
924 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
925 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
926 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
927 ];
928 let mut manifest = Manifest::new(
929 schema,
930 Arc::new(fragments),
931 DataStorageFormat::default(),
932 None,
933 HashMap::new(),
934 );
935
936 let mut config = manifest.config.clone();
937 config.insert("lance.test".to_string(), "value".to_string());
938 config.insert("other-key".to_string(), "other-value".to_string());
939
940 manifest.update_config(config.clone());
941 assert_eq!(manifest.config, config.clone());
942
943 config.remove("other-key");
944 manifest.delete_config_keys(&["other-key"]);
945 assert_eq!(manifest.config, config);
946 }
947}