1use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
8use lance_file::reader::FileReader;
9use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::{Schema, StorageClass};
23use lance_core::{Error, Result};
24use lance_io::object_store::ObjectStore;
25use lance_io::utils::read_struct;
26use snafu::location;
27
28#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36 pub schema: Schema,
38
39 pub local_schema: Schema,
41
42 pub version: u64,
44
45 pub branch: Option<String>,
47
48 pub writer_version: Option<WriterVersion>,
50
51 pub fragments: Arc<Vec<Fragment>>,
56
57 pub version_aux_data: usize,
59
60 pub index_section: Option<usize>,
62
63 pub timestamp_nanos: u128,
65
66 pub tag: Option<String>,
68
69 pub reader_feature_flags: u64,
71
72 pub writer_feature_flags: u64,
74
75 pub max_fragment_id: Option<u32>,
78
79 pub transaction_file: Option<String>,
81
82 fragment_offsets: Vec<usize>,
85
86 pub next_row_id: u64,
88
89 pub data_storage_format: DataStorageFormat,
91
92 pub config: HashMap<String, String>,
94
95 pub table_metadata: HashMap<String, String>,
101
102 pub blob_dataset_version: Option<u64>,
104
105 pub base_paths: HashMap<u32, BasePath>,
107}
108
109pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
111
112pub fn is_detached_version(version: u64) -> bool {
113 version & DETACHED_VERSION_MASK != 0
114}
115
116fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
117 fragments
118 .iter()
119 .map(|f| f.num_rows().unwrap_or_default())
120 .chain([0]) .scan(0_usize, |offset, len| {
122 let start = *offset;
123 *offset += len;
124 Some(start)
125 })
126 .collect()
127}
128
129#[derive(Default)]
130pub struct ManifestSummary {
131 pub total_fragments: u64,
132 pub total_data_files: u64,
133 pub total_files_size: u64,
134 pub total_deletion_files: u64,
135 pub total_data_file_rows: u64,
136 pub total_deletion_file_rows: u64,
137 pub total_rows: u64,
138}
139
140impl From<ManifestSummary> for BTreeMap<String, String> {
141 fn from(summary: ManifestSummary) -> Self {
142 let mut stats_map = Self::new();
143 stats_map.insert(
144 "total_fragments".to_string(),
145 summary.total_fragments.to_string(),
146 );
147 stats_map.insert(
148 "total_data_files".to_string(),
149 summary.total_data_files.to_string(),
150 );
151 stats_map.insert(
152 "total_files_size".to_string(),
153 summary.total_files_size.to_string(),
154 );
155 stats_map.insert(
156 "total_deletion_files".to_string(),
157 summary.total_deletion_files.to_string(),
158 );
159 stats_map.insert(
160 "total_data_file_rows".to_string(),
161 summary.total_data_file_rows.to_string(),
162 );
163 stats_map.insert(
164 "total_deletion_file_rows".to_string(),
165 summary.total_deletion_file_rows.to_string(),
166 );
167 stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
168 stats_map
169 }
170}
171
172impl Manifest {
173 pub fn new(
174 schema: Schema,
175 fragments: Arc<Vec<Fragment>>,
176 data_storage_format: DataStorageFormat,
177 blob_dataset_version: Option<u64>,
178 base_paths: HashMap<u32, BasePath>,
179 ) -> Self {
180 let fragment_offsets = compute_fragment_offsets(&fragments);
181 let local_schema = schema.retain_storage_class(StorageClass::Default);
182
183 Self {
184 schema,
185 local_schema,
186 version: 1,
187 branch: None,
188 writer_version: Some(WriterVersion::default()),
189 fragments,
190 version_aux_data: 0,
191 index_section: None,
192 timestamp_nanos: 0,
193 tag: None,
194 reader_feature_flags: 0,
195 writer_feature_flags: 0,
196 max_fragment_id: None,
197 transaction_file: None,
198 fragment_offsets,
199 next_row_id: 0,
200 data_storage_format,
201 config: HashMap::new(),
202 table_metadata: HashMap::new(),
203 blob_dataset_version,
204 base_paths,
205 }
206 }
207
208 pub fn new_from_previous(
209 previous: &Self,
210 schema: Schema,
211 fragments: Arc<Vec<Fragment>>,
212 new_blob_version: Option<u64>,
213 ) -> Self {
214 let fragment_offsets = compute_fragment_offsets(&fragments);
215 let local_schema = schema.retain_storage_class(StorageClass::Default);
216
217 let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
218
219 Self {
220 schema,
221 local_schema,
222 version: previous.version + 1,
223 branch: previous.branch.clone(),
224 writer_version: Some(WriterVersion::default()),
225 fragments,
226 version_aux_data: 0,
227 index_section: None, timestamp_nanos: 0, tag: None,
230 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
233 transaction_file: None,
234 fragment_offsets,
235 next_row_id: previous.next_row_id,
236 data_storage_format: previous.data_storage_format.clone(),
237 config: previous.config.clone(),
238 table_metadata: previous.table_metadata.clone(),
239 blob_dataset_version,
240 base_paths: previous.base_paths.clone(),
241 }
242 }
243
244 pub fn shallow_clone(
249 &self,
250 ref_name: Option<String>,
251 ref_path: String,
252 ref_base_id: u32,
253 branch_name: Option<String>,
254 transaction_file: String,
255 ) -> Self {
256 let cloned_fragments = self
257 .fragments
258 .as_ref()
259 .iter()
260 .map(|fragment| {
261 let mut cloned_fragment = fragment.clone();
262 for file in &mut cloned_fragment.files {
263 if file.base_id.is_none() {
264 file.base_id = Some(ref_base_id);
265 }
266 }
267
268 if let Some(deletion) = &mut cloned_fragment.deletion_file {
269 if deletion.base_id.is_none() {
270 deletion.base_id = Some(ref_base_id);
271 }
272 }
273 cloned_fragment
274 })
275 .collect::<Vec<_>>();
276
277 Self {
278 schema: self.schema.clone(),
279 local_schema: self.local_schema.clone(),
280 version: self.version,
281 branch: branch_name,
282 writer_version: self.writer_version.clone(),
283 fragments: Arc::new(cloned_fragments),
284 version_aux_data: self.version_aux_data,
285 index_section: None, timestamp_nanos: self.timestamp_nanos,
287 tag: None,
288 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: self.max_fragment_id,
291 transaction_file: Some(transaction_file),
292 fragment_offsets: self.fragment_offsets.clone(),
293 next_row_id: self.next_row_id,
294 data_storage_format: self.data_storage_format.clone(),
295 config: self.config.clone(),
296 blob_dataset_version: self.blob_dataset_version,
297 base_paths: {
298 let mut base_paths = self.base_paths.clone();
299 let base_path = BasePath {
300 id: ref_base_id,
301 name: ref_name,
302 is_dataset_root: true,
303 path: ref_path,
304 };
305 base_paths.insert(ref_base_id, base_path);
306 base_paths
307 },
308 table_metadata: self.table_metadata.clone(),
309 }
310 }
311
312 pub fn timestamp(&self) -> DateTime<Utc> {
314 let nanos = self.timestamp_nanos % 1_000_000_000;
315 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
316 Utc.from_utc_datetime(
317 &DateTime::from_timestamp(seconds, nanos as u32)
318 .unwrap_or_default()
319 .naive_utc(),
320 )
321 }
322
323 pub fn set_timestamp(&mut self, nanos: u128) {
325 self.timestamp_nanos = nanos;
326 }
327
328 pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
330 &mut self.config
331 }
332
333 pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
335 &mut self.table_metadata
336 }
337
338 pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
340 &mut self.schema.metadata
341 }
342
343 pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
347 self.schema
348 .field_by_id_mut(field_id)
349 .map(|field| &mut field.metadata)
350 }
351
352 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
354 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
355 self.config.extend(upsert_values);
356 }
357
358 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
360 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
361 self.config
362 .retain(|key, _| !delete_keys.contains(&key.as_str()));
363 }
364
365 #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
367 pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
368 self.schema.metadata = new_metadata;
369 }
370
371 #[deprecated(
375 note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
376 )]
377 pub fn replace_field_metadata(
378 &mut self,
379 field_id: i32,
380 new_metadata: HashMap<String, String>,
381 ) -> Result<()> {
382 if let Some(field) = self.schema.field_by_id_mut(field_id) {
383 field.metadata = new_metadata;
384 Ok(())
385 } else {
386 Err(Error::invalid_input(
387 format!(
388 "Field with id {} does not exist for replace_field_metadata",
389 field_id
390 ),
391 location!(),
392 ))
393 }
394 }
395
396 pub fn update_max_fragment_id(&mut self) {
398 if self.fragments.is_empty() {
400 return;
401 }
402
403 let max_fragment_id = self
404 .fragments
405 .iter()
406 .map(|f| f.id)
407 .max()
408 .unwrap() .try_into()
410 .unwrap();
411
412 match self.max_fragment_id {
413 None => {
414 self.max_fragment_id = Some(max_fragment_id);
416 }
417 Some(current_max) => {
418 if max_fragment_id > current_max {
421 self.max_fragment_id = Some(max_fragment_id);
422 }
423 }
424 }
425 }
426
427 pub fn max_fragment_id(&self) -> Option<u64> {
432 if let Some(max_id) = self.max_fragment_id {
433 Some(max_id.into())
435 } else {
436 self.fragments.iter().map(|f| f.id).max()
438 }
439 }
440
441 pub fn max_field_id(&self) -> i32 {
446 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
447 let fragment_max_id = self
448 .fragments
449 .iter()
450 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
451 .max()
452 .copied();
453 let fragment_max_id = fragment_max_id.unwrap_or(-1);
454 schema_max_id.max(fragment_max_id)
455 }
456
457 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
460 if since.version >= self.version {
461 return Err(Error::io(
462 format!(
463 "fragments_since: given version {} is newer than manifest version {}",
464 since.version, self.version
465 ),
466 location!(),
467 ));
468 }
469 let start = since.max_fragment_id();
470 Ok(self
471 .fragments
472 .iter()
473 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
474 .cloned()
475 .collect())
476 }
477
478 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
494 let start = range.start;
495 let end = range.end;
496 let idx = self
497 .fragment_offsets
498 .binary_search(&start)
499 .unwrap_or_else(|idx| idx - 1);
500
501 let mut fragments = vec![];
502 for i in idx..self.fragments.len() {
503 if self.fragment_offsets[i] >= end
504 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
505 <= start
506 {
507 break;
508 }
509 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
510 }
511
512 fragments
513 }
514
515 pub fn uses_stable_row_ids(&self) -> bool {
517 self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
518 }
519
520 pub fn serialized(&self) -> Vec<u8> {
523 let pb_manifest: pb::Manifest = self.into();
524 pb_manifest.encode_to_vec()
525 }
526
527 pub fn should_use_legacy_format(&self) -> bool {
528 self.data_storage_format.version == LEGACY_FORMAT_VERSION
529 }
530
531 pub fn summary(&self) -> ManifestSummary {
542 let mut summary =
544 self.fragments
545 .iter()
546 .fold(ManifestSummary::default(), |mut summary, f| {
547 summary.total_data_files += f.files.len() as u64;
549 if let Some(num_rows) = f.num_rows() {
551 summary.total_rows += num_rows as u64;
552 }
553 for data_file in &f.files {
555 if let Some(size_bytes) = data_file.file_size_bytes.get() {
556 summary.total_files_size += size_bytes.get();
557 }
558 }
559 if f.deletion_file.is_some() {
561 summary.total_deletion_files += 1;
562 }
563 if let Some(deletion_file) = &f.deletion_file {
565 if let Some(num_deleted) = deletion_file.num_deleted_rows {
566 summary.total_deletion_file_rows += num_deleted as u64;
567 }
568 }
569 summary
570 });
571 summary.total_fragments = self.fragments.len() as u64;
572 summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
573
574 summary
575 }
576}
577
578#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
579pub struct BasePath {
580 pub id: u32,
581 pub name: Option<String>,
582 pub is_dataset_root: bool,
583 pub path: String,
584}
585
586#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
587pub struct WriterVersion {
588 pub library: String,
589 pub version: String,
590}
591
592#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
593pub struct DataStorageFormat {
594 pub file_format: String,
595 pub version: String,
596}
597
598const LANCE_FORMAT_NAME: &str = "lance";
599
600impl DataStorageFormat {
601 pub fn new(version: LanceFileVersion) -> Self {
602 Self {
603 file_format: LANCE_FORMAT_NAME.to_string(),
604 version: version.resolve().to_string(),
605 }
606 }
607
608 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
609 self.version.parse::<LanceFileVersion>()
610 }
611}
612
613impl Default for DataStorageFormat {
614 fn default() -> Self {
615 Self::new(LanceFileVersion::default())
616 }
617}
618
619impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
620 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
621 Self {
622 file_format: pb.file_format,
623 version: pb.version,
624 }
625 }
626}
627
628#[derive(Debug, Clone, Copy, PartialEq, Eq)]
629pub enum VersionPart {
630 Major,
631 Minor,
632 Patch,
633}
634
635impl WriterVersion {
636 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
639 let mut parts = self.version.split('.');
640 let major = parts.next().unwrap_or("0").parse().ok()?;
641 let minor = parts.next().unwrap_or("0").parse().ok()?;
642 let patch = parts.next().unwrap_or("0").parse().ok()?;
643 let tag = parts.next();
644 Some((major, minor, patch, tag))
645 }
646
647 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
648 self.semver()
649 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
650 }
651
652 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
654 let version = self.semver_or_panic();
655 (version.0, version.1, version.2) < (major, minor, patch)
656 }
657
658 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
659 let parts = self.semver_or_panic();
660 let tag = if keep_tag { parts.3 } else { None };
661 let new_parts = match part {
662 VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
663 VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
664 VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
665 };
666 let new_version = if let Some(tag) = tag {
667 format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
668 } else {
669 format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
670 };
671 Self {
672 library: self.library.clone(),
673 version: new_version,
674 }
675 }
676}
677
678impl Default for WriterVersion {
679 #[cfg(not(test))]
680 fn default() -> Self {
681 Self {
682 library: "lance".to_string(),
683 version: env!("CARGO_PKG_VERSION").to_string(),
684 }
685 }
686
687 #[cfg(test)]
689 fn default() -> Self {
690 Self {
691 library: "lance".to_string(),
692 version: env!("CARGO_PKG_VERSION").to_string(),
693 }
694 .bump(VersionPart::Patch, true)
695 }
696}
697
698impl ProtoStruct for Manifest {
699 type Proto = pb::Manifest;
700}
701
702impl From<pb::BasePath> for BasePath {
703 fn from(p: pb::BasePath) -> Self {
704 Self {
705 id: p.id,
706 name: p.name,
707 is_dataset_root: p.is_dataset_root,
708 path: p.path,
709 }
710 }
711}
712
713impl TryFrom<pb::Manifest> for Manifest {
714 type Error = Error;
715
716 fn try_from(p: pb::Manifest) -> Result<Self> {
717 let timestamp_nanos = p.timestamp.map(|ts| {
718 let sec = ts.seconds as u128 * 1e9 as u128;
719 let nanos = ts.nanos as u128;
720 sec + nanos
721 });
722 let writer_version = match p.writer_version {
724 Some(pb::manifest::WriterVersion { library, version }) => {
725 Some(WriterVersion { library, version })
726 }
727 _ => None,
728 };
729 let fragments = Arc::new(
730 p.fragments
731 .into_iter()
732 .map(Fragment::try_from)
733 .collect::<Result<Vec<_>>>()?,
734 );
735 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
736 let fields_with_meta = FieldsWithMeta {
737 fields: Fields(p.fields),
738 metadata: p.schema_metadata,
739 };
740
741 if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
742 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
743 {
744 return Err(Error::Internal {
745 message: "All fragments must have row ids".into(),
746 location: location!(),
747 });
748 }
749
750 let data_storage_format = match p.data_format {
751 None => {
752 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
753 DataStorageFormat::new(inferred_version)
755 } else {
756 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
758 DataStorageFormat::new(LanceFileVersion::Stable)
759 } else {
760 DataStorageFormat::new(LanceFileVersion::Legacy)
761 }
762 }
763 }
764 Some(format) => DataStorageFormat::from(format),
765 };
766
767 let schema = Schema::from(fields_with_meta);
768 let local_schema = schema.retain_storage_class(StorageClass::Default);
769
770 Ok(Self {
771 schema,
772 local_schema,
773 version: p.version,
774 branch: p.branch,
775 writer_version,
776 version_aux_data: p.version_aux_data as usize,
777 index_section: p.index_section.map(|i| i as usize),
778 timestamp_nanos: timestamp_nanos.unwrap_or(0),
779 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
780 reader_feature_flags: p.reader_feature_flags,
781 writer_feature_flags: p.writer_feature_flags,
782 max_fragment_id: p.max_fragment_id,
783 fragments,
784 transaction_file: if p.transaction_file.is_empty() {
785 None
786 } else {
787 Some(p.transaction_file)
788 },
789 fragment_offsets,
790 next_row_id: p.next_row_id,
791 data_storage_format,
792 config: p.config,
793 table_metadata: p.table_metadata,
794 blob_dataset_version: if p.blob_dataset_version == 0 {
795 None
796 } else {
797 Some(p.blob_dataset_version)
798 },
799 base_paths: p
800 .base_paths
801 .iter()
802 .map(|item| (item.id, item.clone().into()))
803 .collect(),
804 })
805 }
806}
807
808impl From<&Manifest> for pb::Manifest {
809 fn from(m: &Manifest) -> Self {
810 let timestamp_nanos = if m.timestamp_nanos == 0 {
811 None
812 } else {
813 let nanos = m.timestamp_nanos % 1e9 as u128;
814 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
815 Some(Timestamp {
816 seconds,
817 nanos: nanos as i32,
818 })
819 };
820 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
821 Self {
822 fields: fields_with_meta.fields.0,
823 schema_metadata: m
824 .schema
825 .metadata
826 .iter()
827 .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
828 .collect(),
829 version: m.version,
830 branch: m.branch.clone(),
831 writer_version: m
832 .writer_version
833 .as_ref()
834 .map(|wv| pb::manifest::WriterVersion {
835 library: wv.library.clone(),
836 version: wv.version.clone(),
837 }),
838 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
839 table_metadata: m.table_metadata.clone(),
840 version_aux_data: m.version_aux_data as u64,
841 index_section: m.index_section.map(|i| i as u64),
842 timestamp: timestamp_nanos,
843 tag: m.tag.clone().unwrap_or_default(),
844 reader_feature_flags: m.reader_feature_flags,
845 writer_feature_flags: m.writer_feature_flags,
846 max_fragment_id: m.max_fragment_id,
847 transaction_file: m.transaction_file.clone().unwrap_or_default(),
848 next_row_id: m.next_row_id,
849 data_format: Some(pb::manifest::DataStorageFormat {
850 file_format: m.data_storage_format.file_format.clone(),
851 version: m.data_storage_format.version.clone(),
852 }),
853 config: m.config.clone(),
854 blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
855 base_paths: m
856 .base_paths
857 .values()
858 .map(|base_path| pb::BasePath {
859 id: base_path.id,
860 name: base_path.name.clone(),
861 is_dataset_root: base_path.is_dataset_root,
862 path: base_path.path.clone(),
863 })
864 .collect(),
865 }
866 }
867}
868
869#[async_trait]
870pub trait SelfDescribingFileReader {
871 async fn try_new_self_described(
879 object_store: &ObjectStore,
880 path: &Path,
881 cache: Option<&LanceCache>,
882 ) -> Result<Self>
883 where
884 Self: Sized,
885 {
886 let reader = object_store.open(path).await?;
887 Self::try_new_self_described_from_reader(reader.into(), cache).await
888 }
889
890 async fn try_new_self_described_from_reader(
891 reader: Arc<dyn Reader>,
892 cache: Option<&LanceCache>,
893 ) -> Result<Self>
894 where
895 Self: Sized;
896}
897
898#[async_trait]
899impl SelfDescribingFileReader for FileReader {
900 async fn try_new_self_described_from_reader(
901 reader: Arc<dyn Reader>,
902 cache: Option<&LanceCache>,
903 ) -> Result<Self> {
904 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
905 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
906 message: format!(
907 "Attempt to open file at {} as self-describing but it did not contain a manifest",
908 reader.path(),
909 ),
910 location: location!(),
911 })?;
912 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
913 if manifest.should_use_legacy_format() {
914 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
915 }
916 let schema = manifest.schema;
917 let max_field_id = schema.max_field_id().unwrap_or_default();
918 Self::try_new_from_reader(
919 reader.path(),
920 reader.clone(),
921 Some(metadata),
922 schema,
923 0,
924 0,
925 max_field_id,
926 cache,
927 )
928 .await
929 }
930}
931
932#[cfg(test)]
933mod tests {
934 use crate::format::{DataFile, DeletionFile, DeletionFileType};
935 use std::num::NonZero;
936
937 use super::*;
938
939 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
940 use lance_core::datatypes::Field;
941
942 #[test]
943 fn test_writer_version() {
944 let wv = WriterVersion::default();
945 assert_eq!(wv.library, "lance");
946 let parts = wv.semver().unwrap();
947 assert_eq!(
948 parts,
949 (
950 env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
951 env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
952 env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
954 None
955 )
956 );
957 assert_eq!(
958 format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
959 env!("CARGO_PKG_VERSION")
960 );
961 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
962 let bumped = wv.bump(*part, false);
963 let bumped_parts = bumped.semver_or_panic();
964 assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
965 }
966 }
967
968 #[test]
969 fn test_fragments_by_offset_range() {
970 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
971 "a",
972 arrow_schema::DataType::Int64,
973 false,
974 )]);
975 let schema = Schema::try_from(&arrow_schema).unwrap();
976 let fragments = vec![
977 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
978 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
979 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
980 ];
981 let manifest = Manifest::new(
982 schema,
983 Arc::new(fragments),
984 DataStorageFormat::default(),
985 None,
986 HashMap::new(),
987 );
988
989 let actual = manifest.fragments_by_offset_range(0..10);
990 assert_eq!(actual.len(), 1);
991 assert_eq!(actual[0].0, 0);
992 assert_eq!(actual[0].1.id, 0);
993
994 let actual = manifest.fragments_by_offset_range(5..15);
995 assert_eq!(actual.len(), 2);
996 assert_eq!(actual[0].0, 0);
997 assert_eq!(actual[0].1.id, 0);
998 assert_eq!(actual[1].0, 10);
999 assert_eq!(actual[1].1.id, 1);
1000
1001 let actual = manifest.fragments_by_offset_range(15..50);
1002 assert_eq!(actual.len(), 2);
1003 assert_eq!(actual[0].0, 10);
1004 assert_eq!(actual[0].1.id, 1);
1005 assert_eq!(actual[1].0, 25);
1006 assert_eq!(actual[1].1.id, 2);
1007
1008 let actual = manifest.fragments_by_offset_range(45..100);
1010 assert!(actual.is_empty());
1011
1012 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1013 }
1014
1015 #[test]
1016 fn test_max_field_id() {
1017 let mut field0 =
1019 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1020 field0.set_id(-1, &mut 0);
1021 let mut field2 =
1022 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1023 field2.set_id(-1, &mut 2);
1024
1025 let schema = Schema {
1026 fields: vec![field0, field2],
1027 metadata: Default::default(),
1028 };
1029 let fragments = vec![
1030 Fragment {
1031 id: 0,
1032 files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
1033 deletion_file: None,
1034 row_id_meta: None,
1035 physical_rows: None,
1036 },
1037 Fragment {
1038 id: 1,
1039 files: vec![
1040 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
1041 DataFile::new_legacy_from_fields("path3", vec![2]),
1042 ],
1043 deletion_file: None,
1044 row_id_meta: None,
1045 physical_rows: None,
1046 },
1047 ];
1048
1049 let manifest = Manifest::new(
1050 schema,
1051 Arc::new(fragments),
1052 DataStorageFormat::default(),
1053 None,
1054 HashMap::new(),
1055 );
1056
1057 assert_eq!(manifest.max_field_id(), 43);
1058 }
1059
1060 #[test]
1061 fn test_config() {
1062 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1063 "a",
1064 arrow_schema::DataType::Int64,
1065 false,
1066 )]);
1067 let schema = Schema::try_from(&arrow_schema).unwrap();
1068 let fragments = vec![
1069 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1070 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1071 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1072 ];
1073 let mut manifest = Manifest::new(
1074 schema,
1075 Arc::new(fragments),
1076 DataStorageFormat::default(),
1077 None,
1078 HashMap::new(),
1079 );
1080
1081 let mut config = manifest.config.clone();
1082 config.insert("lance.test".to_string(), "value".to_string());
1083 config.insert("other-key".to_string(), "other-value".to_string());
1084
1085 manifest.config_mut().extend(config.clone());
1086 assert_eq!(manifest.config, config.clone());
1087
1088 config.remove("other-key");
1089 manifest.config_mut().remove("other-key");
1090 assert_eq!(manifest.config, config);
1091 }
1092
1093 #[test]
1094 fn test_manifest_summary() {
1095 let arrow_schema = ArrowSchema::new(vec![
1097 ArrowField::new("id", arrow_schema::DataType::Int64, false),
1098 ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1099 ]);
1100 let schema = Schema::try_from(&arrow_schema).unwrap();
1101
1102 let empty_manifest = Manifest::new(
1103 schema.clone(),
1104 Arc::new(vec![]),
1105 DataStorageFormat::default(),
1106 None,
1107 HashMap::new(),
1108 );
1109
1110 let empty_summary = empty_manifest.summary();
1111 assert_eq!(empty_summary.total_rows, 0);
1112 assert_eq!(empty_summary.total_files_size, 0);
1113 assert_eq!(empty_summary.total_fragments, 0);
1114 assert_eq!(empty_summary.total_data_files, 0);
1115 assert_eq!(empty_summary.total_deletion_file_rows, 0);
1116 assert_eq!(empty_summary.total_data_file_rows, 0);
1117 assert_eq!(empty_summary.total_deletion_files, 0);
1118
1119 let empty_fragments = vec![
1121 Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1122 Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1123 ];
1124
1125 let empty_files_manifest = Manifest::new(
1126 schema.clone(),
1127 Arc::new(empty_fragments),
1128 DataStorageFormat::default(),
1129 None,
1130 HashMap::new(),
1131 );
1132
1133 let empty_files_summary = empty_files_manifest.summary();
1134 assert_eq!(empty_files_summary.total_rows, 0);
1135 assert_eq!(empty_files_summary.total_files_size, 0);
1136 assert_eq!(empty_files_summary.total_fragments, 2);
1137 assert_eq!(empty_files_summary.total_data_files, 2);
1138 assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1139 assert_eq!(empty_files_summary.total_data_file_rows, 0);
1140 assert_eq!(empty_files_summary.total_deletion_files, 0);
1141
1142 let real_fragments = vec![
1144 Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1145 Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1146 Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1147 ];
1148
1149 let real_data_manifest = Manifest::new(
1150 schema.clone(),
1151 Arc::new(real_fragments),
1152 DataStorageFormat::default(),
1153 None,
1154 HashMap::new(),
1155 );
1156
1157 let real_data_summary = real_data_manifest.summary();
1158 assert_eq!(real_data_summary.total_rows, 425); assert_eq!(real_data_summary.total_files_size, 0); assert_eq!(real_data_summary.total_fragments, 3);
1161 assert_eq!(real_data_summary.total_data_files, 3);
1162 assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1163 assert_eq!(real_data_summary.total_data_file_rows, 425);
1164 assert_eq!(real_data_summary.total_deletion_files, 0);
1165
1166 let file_version = LanceFileVersion::default();
1167 let mut fragment_with_deletion = Fragment::new(0)
1169 .with_file(
1170 "data_with_deletion.lance",
1171 vec![0, 1],
1172 vec![0, 1],
1173 &file_version,
1174 NonZero::new(1000),
1175 )
1176 .with_physical_rows(50);
1177 fragment_with_deletion.deletion_file = Some(DeletionFile {
1178 read_version: 123,
1179 id: 456,
1180 file_type: DeletionFileType::Array,
1181 num_deleted_rows: Some(10),
1182 base_id: None,
1183 });
1184
1185 let manifest_with_deletion = Manifest::new(
1186 schema,
1187 Arc::new(vec![fragment_with_deletion]),
1188 DataStorageFormat::default(),
1189 None,
1190 HashMap::new(),
1191 );
1192
1193 let deletion_summary = manifest_with_deletion.summary();
1194 assert_eq!(deletion_summary.total_rows, 40); assert_eq!(deletion_summary.total_files_size, 1000);
1196 assert_eq!(deletion_summary.total_fragments, 1);
1197 assert_eq!(deletion_summary.total_data_files, 1);
1198 assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1199 assert_eq!(deletion_summary.total_data_file_rows, 50);
1200 assert_eq!(deletion_summary.total_deletion_files, 1);
1201
1202 let stats_map: BTreeMap<String, String> = deletion_summary.into();
1204 assert_eq!(stats_map.len(), 7)
1205 }
1206}