1use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use deepsize::DeepSizeOf;
11use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
12use lance_file::reader::FileReader;
13use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
14use lance_io::traits::{ProtoStruct, Reader};
15use object_store::path::Path;
16use prost::Message;
17use prost_types::Timestamp;
18
19use super::Fragment;
20use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::{Schema, StorageClass};
24use lance_core::{Error, Result};
25use lance_io::object_store::ObjectStore;
26use lance_io::utils::read_struct;
27use snafu::location;
28
29#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
36pub struct Manifest {
37 pub schema: Schema,
39
40 pub local_schema: Schema,
42
43 pub version: u64,
45
46 pub writer_version: Option<WriterVersion>,
48
49 pub fragments: Arc<Vec<Fragment>>,
54
55 pub version_aux_data: usize,
57
58 pub index_section: Option<usize>,
60
61 pub timestamp_nanos: u128,
63
64 pub tag: Option<String>,
66
67 pub reader_feature_flags: u64,
69
70 pub writer_feature_flags: u64,
72
73 pub max_fragment_id: Option<u32>,
76
77 pub transaction_file: Option<String>,
79
80 fragment_offsets: Vec<usize>,
83
84 pub next_row_id: u64,
86
87 pub data_storage_format: DataStorageFormat,
89
90 pub config: HashMap<String, String>,
92
93 pub blob_dataset_version: Option<u64>,
95
96 pub base_paths: HashMap<u32, BasePath>,
98}
99
100pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
102
103pub fn is_detached_version(version: u64) -> bool {
104 version & DETACHED_VERSION_MASK != 0
105}
106
107fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
108 fragments
109 .iter()
110 .map(|f| f.num_rows().unwrap_or_default())
111 .chain([0]) .scan(0_usize, |offset, len| {
113 let start = *offset;
114 *offset += len;
115 Some(start)
116 })
117 .collect()
118}
119
120impl Manifest {
121 pub fn new(
122 schema: Schema,
123 fragments: Arc<Vec<Fragment>>,
124 data_storage_format: DataStorageFormat,
125 blob_dataset_version: Option<u64>,
126 base_paths: HashMap<u32, BasePath>,
127 ) -> Self {
128 let fragment_offsets = compute_fragment_offsets(&fragments);
129 let local_schema = schema.retain_storage_class(StorageClass::Default);
130
131 Self {
132 schema,
133 local_schema,
134 version: 1,
135 writer_version: Some(WriterVersion::default()),
136 fragments,
137 version_aux_data: 0,
138 index_section: None,
139 timestamp_nanos: 0,
140 tag: None,
141 reader_feature_flags: 0,
142 writer_feature_flags: 0,
143 max_fragment_id: None,
144 transaction_file: None,
145 fragment_offsets,
146 next_row_id: 0,
147 data_storage_format,
148 config: HashMap::new(),
149 blob_dataset_version,
150 base_paths,
151 }
152 }
153
154 pub fn new_from_previous(
155 previous: &Self,
156 schema: Schema,
157 fragments: Arc<Vec<Fragment>>,
158 new_blob_version: Option<u64>,
159 ) -> Self {
160 let fragment_offsets = compute_fragment_offsets(&fragments);
161 let local_schema = schema.retain_storage_class(StorageClass::Default);
162
163 let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
164
165 Self {
166 schema,
167 local_schema,
168 version: previous.version + 1,
169 writer_version: Some(WriterVersion::default()),
170 fragments,
171 version_aux_data: 0,
172 index_section: None, timestamp_nanos: 0, tag: None,
175 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
178 transaction_file: None,
179 fragment_offsets,
180 next_row_id: previous.next_row_id,
181 data_storage_format: previous.data_storage_format.clone(),
182 config: previous.config.clone(),
183 blob_dataset_version,
184 base_paths: previous.base_paths.clone(),
185 }
186 }
187
188 pub fn shallow_clone(
192 &self,
193 ref_name: Option<String>,
194 ref_path: String,
195 ref_base_id: u32,
196 transaction_file: String,
197 ) -> Self {
198 let cloned_fragments = self
199 .fragments
200 .as_ref()
201 .iter()
202 .map(|fragment| {
203 let mut cloned_fragment = fragment.clone();
204 for file in &mut cloned_fragment.files {
205 if file.base_id.is_none() {
206 file.base_id = Some(ref_base_id);
207 }
208 }
209
210 if let Some(deletion) = &mut cloned_fragment.deletion_file {
211 if deletion.base_id.is_none() {
212 deletion.base_id = Some(ref_base_id);
213 }
214 }
215 cloned_fragment
216 })
217 .collect::<Vec<_>>();
218
219 Self {
220 schema: self.schema.clone(),
221 local_schema: self.local_schema.clone(),
222 version: self.version,
223 writer_version: self.writer_version.clone(),
224 fragments: Arc::new(cloned_fragments),
225 version_aux_data: self.version_aux_data,
226 index_section: None, timestamp_nanos: self.timestamp_nanos,
228 tag: None,
229 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: self.max_fragment_id,
232 transaction_file: Some(transaction_file),
233 fragment_offsets: self.fragment_offsets.clone(),
234 next_row_id: self.next_row_id,
235 data_storage_format: self.data_storage_format.clone(),
236 config: self.config.clone(),
237 blob_dataset_version: self.blob_dataset_version,
238 base_paths: {
239 let mut base_paths = self.base_paths.clone();
240 let base_path = BasePath {
241 id: ref_base_id,
242 name: ref_name,
243 is_dataset_root: true,
244 path: ref_path,
245 };
246 base_paths.insert(ref_base_id, base_path);
247 base_paths
248 },
249 }
250 }
251
252 pub fn timestamp(&self) -> DateTime<Utc> {
254 let nanos = self.timestamp_nanos % 1_000_000_000;
255 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
256 Utc.from_utc_datetime(
257 &DateTime::from_timestamp(seconds, nanos as u32)
258 .unwrap_or_default()
259 .naive_utc(),
260 )
261 }
262
263 pub fn set_timestamp(&mut self, nanos: u128) {
265 self.timestamp_nanos = nanos;
266 }
267
268 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
270 self.config.extend(upsert_values);
271 }
272
273 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
275 self.config
276 .retain(|key, _| !delete_keys.contains(&key.as_str()));
277 }
278
279 pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
281 self.schema.metadata = new_metadata;
282 }
283
284 pub fn replace_field_metadata(
288 &mut self,
289 field_id: i32,
290 new_metadata: HashMap<String, String>,
291 ) -> Result<()> {
292 if let Some(field) = self.schema.field_by_id_mut(field_id) {
293 field.metadata = new_metadata;
294 Ok(())
295 } else {
296 Err(Error::invalid_input(
297 format!(
298 "Field with id {} does not exist for replace_field_metadata",
299 field_id
300 ),
301 location!(),
302 ))
303 }
304 }
305
306 pub fn update_max_fragment_id(&mut self) {
308 if self.fragments.is_empty() {
310 return;
311 }
312
313 let max_fragment_id = self
314 .fragments
315 .iter()
316 .map(|f| f.id)
317 .max()
318 .unwrap() .try_into()
320 .unwrap();
321
322 match self.max_fragment_id {
323 None => {
324 self.max_fragment_id = Some(max_fragment_id);
326 }
327 Some(current_max) => {
328 if max_fragment_id > current_max {
331 self.max_fragment_id = Some(max_fragment_id);
332 }
333 }
334 }
335 }
336
337 pub fn max_fragment_id(&self) -> Option<u64> {
342 if let Some(max_id) = self.max_fragment_id {
343 Some(max_id.into())
345 } else {
346 self.fragments.iter().map(|f| f.id).max()
348 }
349 }
350
351 pub fn max_field_id(&self) -> i32 {
356 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
357 let fragment_max_id = self
358 .fragments
359 .iter()
360 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
361 .max()
362 .copied();
363 let fragment_max_id = fragment_max_id.unwrap_or(-1);
364 schema_max_id.max(fragment_max_id)
365 }
366
367 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
370 if since.version >= self.version {
371 return Err(Error::io(
372 format!(
373 "fragments_since: given version {} is newer than manifest version {}",
374 since.version, self.version
375 ),
376 location!(),
377 ));
378 }
379 let start = since.max_fragment_id();
380 Ok(self
381 .fragments
382 .iter()
383 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
384 .cloned()
385 .collect())
386 }
387
388 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
404 let start = range.start;
405 let end = range.end;
406 let idx = self
407 .fragment_offsets
408 .binary_search(&start)
409 .unwrap_or_else(|idx| idx - 1);
410
411 let mut fragments = vec![];
412 for i in idx..self.fragments.len() {
413 if self.fragment_offsets[i] >= end
414 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
415 <= start
416 {
417 break;
418 }
419 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
420 }
421
422 fragments
423 }
424
425 pub fn uses_stable_row_ids(&self) -> bool {
427 self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
428 }
429
430 pub fn serialized(&self) -> Vec<u8> {
433 let pb_manifest: pb::Manifest = self.into();
434 pb_manifest.encode_to_vec()
435 }
436
437 pub fn should_use_legacy_format(&self) -> bool {
438 self.data_storage_format.version == LEGACY_FORMAT_VERSION
439 }
440}
441
442#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
443pub struct BasePath {
444 pub id: u32,
445 pub name: Option<String>,
446 pub is_dataset_root: bool,
447 pub path: String,
448}
449
450#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
451pub struct WriterVersion {
452 pub library: String,
453 pub version: String,
454}
455
456#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
457pub struct DataStorageFormat {
458 pub file_format: String,
459 pub version: String,
460}
461
462const LANCE_FORMAT_NAME: &str = "lance";
463
464impl DataStorageFormat {
465 pub fn new(version: LanceFileVersion) -> Self {
466 Self {
467 file_format: LANCE_FORMAT_NAME.to_string(),
468 version: version.resolve().to_string(),
469 }
470 }
471
472 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
473 self.version.parse::<LanceFileVersion>()
474 }
475}
476
477impl Default for DataStorageFormat {
478 fn default() -> Self {
479 Self::new(LanceFileVersion::default())
480 }
481}
482
483impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
484 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
485 Self {
486 file_format: pb.file_format,
487 version: pb.version,
488 }
489 }
490}
491
492#[derive(Debug, Clone, Copy, PartialEq, Eq)]
493pub enum VersionPart {
494 Major,
495 Minor,
496 Patch,
497}
498
499impl WriterVersion {
500 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
503 let mut parts = self.version.split('.');
504 let major = parts.next().unwrap_or("0").parse().ok()?;
505 let minor = parts.next().unwrap_or("0").parse().ok()?;
506 let patch = parts.next().unwrap_or("0").parse().ok()?;
507 let tag = parts.next();
508 Some((major, minor, patch, tag))
509 }
510
511 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
512 self.semver()
513 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
514 }
515
516 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
518 let version = self.semver_or_panic();
519 (version.0, version.1, version.2) < (major, minor, patch)
520 }
521
522 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
523 let parts = self.semver_or_panic();
524 let tag = if keep_tag { parts.3 } else { None };
525 let new_parts = match part {
526 VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
527 VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
528 VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
529 };
530 let new_version = if let Some(tag) = tag {
531 format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
532 } else {
533 format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
534 };
535 Self {
536 library: self.library.clone(),
537 version: new_version,
538 }
539 }
540}
541
542impl Default for WriterVersion {
543 #[cfg(not(test))]
544 fn default() -> Self {
545 Self {
546 library: "lance".to_string(),
547 version: env!("CARGO_PKG_VERSION").to_string(),
548 }
549 }
550
551 #[cfg(test)]
553 fn default() -> Self {
554 Self {
555 library: "lance".to_string(),
556 version: env!("CARGO_PKG_VERSION").to_string(),
557 }
558 .bump(VersionPart::Patch, true)
559 }
560}
561
562impl ProtoStruct for Manifest {
563 type Proto = pb::Manifest;
564}
565
566impl From<pb::BasePath> for BasePath {
567 fn from(p: pb::BasePath) -> Self {
568 Self {
569 id: p.id,
570 name: p.name,
571 is_dataset_root: p.is_dataset_root,
572 path: p.path,
573 }
574 }
575}
576
577impl TryFrom<pb::Manifest> for Manifest {
578 type Error = Error;
579
580 fn try_from(p: pb::Manifest) -> Result<Self> {
581 let timestamp_nanos = p.timestamp.map(|ts| {
582 let sec = ts.seconds as u128 * 1e9 as u128;
583 let nanos = ts.nanos as u128;
584 sec + nanos
585 });
586 let writer_version = match p.writer_version {
588 Some(pb::manifest::WriterVersion { library, version }) => {
589 Some(WriterVersion { library, version })
590 }
591 _ => None,
592 };
593 let fragments = Arc::new(
594 p.fragments
595 .into_iter()
596 .map(Fragment::try_from)
597 .collect::<Result<Vec<_>>>()?,
598 );
599 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
600 let fields_with_meta = FieldsWithMeta {
601 fields: Fields(p.fields),
602 metadata: p.metadata,
603 };
604
605 if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
606 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
607 {
608 return Err(Error::Internal {
609 message: "All fragments must have row ids".into(),
610 location: location!(),
611 });
612 }
613
614 let data_storage_format = match p.data_format {
615 None => {
616 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
617 DataStorageFormat::new(inferred_version)
619 } else {
620 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
622 DataStorageFormat::new(LanceFileVersion::Stable)
623 } else {
624 DataStorageFormat::new(LanceFileVersion::Legacy)
625 }
626 }
627 }
628 Some(format) => DataStorageFormat::from(format),
629 };
630
631 let schema = Schema::from(fields_with_meta);
632 let local_schema = schema.retain_storage_class(StorageClass::Default);
633
634 Ok(Self {
635 schema,
636 local_schema,
637 version: p.version,
638 writer_version,
639 version_aux_data: p.version_aux_data as usize,
640 index_section: p.index_section.map(|i| i as usize),
641 timestamp_nanos: timestamp_nanos.unwrap_or(0),
642 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
643 reader_feature_flags: p.reader_feature_flags,
644 writer_feature_flags: p.writer_feature_flags,
645 max_fragment_id: p.max_fragment_id,
646 fragments,
647 transaction_file: if p.transaction_file.is_empty() {
648 None
649 } else {
650 Some(p.transaction_file)
651 },
652 fragment_offsets,
653 next_row_id: p.next_row_id,
654 data_storage_format,
655 config: p.config,
656 blob_dataset_version: if p.blob_dataset_version == 0 {
657 None
658 } else {
659 Some(p.blob_dataset_version)
660 },
661 base_paths: p
662 .base_paths
663 .iter()
664 .map(|item| (item.id, item.clone().into()))
665 .collect(),
666 })
667 }
668}
669
670impl From<&Manifest> for pb::Manifest {
671 fn from(m: &Manifest) -> Self {
672 let timestamp_nanos = if m.timestamp_nanos == 0 {
673 None
674 } else {
675 let nanos = m.timestamp_nanos % 1e9 as u128;
676 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
677 Some(Timestamp {
678 seconds,
679 nanos: nanos as i32,
680 })
681 };
682 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
683 Self {
684 fields: fields_with_meta.fields.0,
685 version: m.version,
686 writer_version: m
687 .writer_version
688 .as_ref()
689 .map(|wv| pb::manifest::WriterVersion {
690 library: wv.library.clone(),
691 version: wv.version.clone(),
692 }),
693 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
694 metadata: fields_with_meta.metadata,
695 version_aux_data: m.version_aux_data as u64,
696 index_section: m.index_section.map(|i| i as u64),
697 timestamp: timestamp_nanos,
698 tag: m.tag.clone().unwrap_or_default(),
699 reader_feature_flags: m.reader_feature_flags,
700 writer_feature_flags: m.writer_feature_flags,
701 max_fragment_id: m.max_fragment_id,
702 transaction_file: m.transaction_file.clone().unwrap_or_default(),
703 next_row_id: m.next_row_id,
704 data_format: Some(pb::manifest::DataStorageFormat {
705 file_format: m.data_storage_format.file_format.clone(),
706 version: m.data_storage_format.version.clone(),
707 }),
708 config: m.config.clone(),
709 blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
710 base_paths: m
711 .base_paths
712 .values()
713 .map(|base_path| pb::BasePath {
714 id: base_path.id,
715 name: base_path.name.clone(),
716 is_dataset_root: base_path.is_dataset_root,
717 path: base_path.path.clone(),
718 })
719 .collect(),
720 }
721 }
722}
723
724#[async_trait]
725pub trait SelfDescribingFileReader {
726 async fn try_new_self_described(
734 object_store: &ObjectStore,
735 path: &Path,
736 cache: Option<&LanceCache>,
737 ) -> Result<Self>
738 where
739 Self: Sized,
740 {
741 let reader = object_store.open(path).await?;
742 Self::try_new_self_described_from_reader(reader.into(), cache).await
743 }
744
745 async fn try_new_self_described_from_reader(
746 reader: Arc<dyn Reader>,
747 cache: Option<&LanceCache>,
748 ) -> Result<Self>
749 where
750 Self: Sized;
751}
752
753#[async_trait]
754impl SelfDescribingFileReader for FileReader {
755 async fn try_new_self_described_from_reader(
756 reader: Arc<dyn Reader>,
757 cache: Option<&LanceCache>,
758 ) -> Result<Self> {
759 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
760 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
761 message: format!(
762 "Attempt to open file at {} as self-describing but it did not contain a manifest",
763 reader.path(),
764 ),
765 location: location!(),
766 })?;
767 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
768 if manifest.should_use_legacy_format() {
769 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
770 }
771 let schema = manifest.schema;
772 let max_field_id = schema.max_field_id().unwrap_or_default();
773 Self::try_new_from_reader(
774 reader.path(),
775 reader.clone(),
776 Some(metadata),
777 schema,
778 0,
779 0,
780 max_field_id,
781 cache,
782 )
783 .await
784 }
785}
786
787#[cfg(test)]
788mod tests {
789 use crate::format::DataFile;
790
791 use super::*;
792
793 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
794 use lance_core::datatypes::Field;
795
796 #[test]
797 fn test_writer_version() {
798 let wv = WriterVersion::default();
799 assert_eq!(wv.library, "lance");
800 let parts = wv.semver().unwrap();
801 assert_eq!(
802 parts,
803 (
804 env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
805 env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
806 env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
808 None
809 )
810 );
811 assert_eq!(
812 format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
813 env!("CARGO_PKG_VERSION")
814 );
815 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
816 let bumped = wv.bump(*part, false);
817 let bumped_parts = bumped.semver_or_panic();
818 assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
819 }
820 }
821
822 #[test]
823 fn test_fragments_by_offset_range() {
824 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
825 "a",
826 arrow_schema::DataType::Int64,
827 false,
828 )]);
829 let schema = Schema::try_from(&arrow_schema).unwrap();
830 let fragments = vec![
831 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
832 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
833 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
834 ];
835 let manifest = Manifest::new(
836 schema,
837 Arc::new(fragments),
838 DataStorageFormat::default(),
839 None,
840 HashMap::new(),
841 );
842
843 let actual = manifest.fragments_by_offset_range(0..10);
844 assert_eq!(actual.len(), 1);
845 assert_eq!(actual[0].0, 0);
846 assert_eq!(actual[0].1.id, 0);
847
848 let actual = manifest.fragments_by_offset_range(5..15);
849 assert_eq!(actual.len(), 2);
850 assert_eq!(actual[0].0, 0);
851 assert_eq!(actual[0].1.id, 0);
852 assert_eq!(actual[1].0, 10);
853 assert_eq!(actual[1].1.id, 1);
854
855 let actual = manifest.fragments_by_offset_range(15..50);
856 assert_eq!(actual.len(), 2);
857 assert_eq!(actual[0].0, 10);
858 assert_eq!(actual[0].1.id, 1);
859 assert_eq!(actual[1].0, 25);
860 assert_eq!(actual[1].1.id, 2);
861
862 let actual = manifest.fragments_by_offset_range(45..100);
864 assert!(actual.is_empty());
865
866 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
867 }
868
869 #[test]
870 fn test_max_field_id() {
871 let mut field0 =
873 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
874 field0.set_id(-1, &mut 0);
875 let mut field2 =
876 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
877 field2.set_id(-1, &mut 2);
878
879 let schema = Schema {
880 fields: vec![field0, field2],
881 metadata: Default::default(),
882 };
883 let fragments = vec![
884 Fragment {
885 id: 0,
886 files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
887 deletion_file: None,
888 row_id_meta: None,
889 physical_rows: None,
890 },
891 Fragment {
892 id: 1,
893 files: vec![
894 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
895 DataFile::new_legacy_from_fields("path3", vec![2]),
896 ],
897 deletion_file: None,
898 row_id_meta: None,
899 physical_rows: None,
900 },
901 ];
902
903 let manifest = Manifest::new(
904 schema,
905 Arc::new(fragments),
906 DataStorageFormat::default(),
907 None,
908 HashMap::new(),
909 );
910
911 assert_eq!(manifest.max_field_id(), 43);
912 }
913
914 #[test]
915 fn test_config() {
916 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
917 "a",
918 arrow_schema::DataType::Int64,
919 false,
920 )]);
921 let schema = Schema::try_from(&arrow_schema).unwrap();
922 let fragments = vec![
923 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
924 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
925 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
926 ];
927 let mut manifest = Manifest::new(
928 schema,
929 Arc::new(fragments),
930 DataStorageFormat::default(),
931 None,
932 HashMap::new(),
933 );
934
935 let mut config = manifest.config.clone();
936 config.insert("lance.test".to_string(), "value".to_string());
937 config.insert("other-key".to_string(), "other-value".to_string());
938
939 manifest.update_config(config.clone());
940 assert_eq!(manifest.config, config.clone());
941
942 config.remove("other-key");
943 manifest.delete_config_keys(&["other-key"]);
944 assert_eq!(manifest.config, config);
945 }
946}