1use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
8use lance_file::reader::FileReader;
9use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::{Schema, StorageClass};
23use lance_core::{Error, Result};
24use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
25use lance_io::utils::read_struct;
26use snafu::location;
27
28#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36 pub schema: Schema,
38
39 pub local_schema: Schema,
41
42 pub version: u64,
44
45 pub branch: Option<String>,
47
48 pub writer_version: Option<WriterVersion>,
50
51 pub fragments: Arc<Vec<Fragment>>,
56
57 pub version_aux_data: usize,
59
60 pub index_section: Option<usize>,
62
63 pub timestamp_nanos: u128,
65
66 pub tag: Option<String>,
68
69 pub reader_feature_flags: u64,
71
72 pub writer_feature_flags: u64,
74
75 pub max_fragment_id: Option<u32>,
78
79 pub transaction_file: Option<String>,
81
82 fragment_offsets: Vec<usize>,
85
86 pub next_row_id: u64,
88
89 pub data_storage_format: DataStorageFormat,
91
92 pub config: HashMap<String, String>,
94
95 pub table_metadata: HashMap<String, String>,
101
102 pub blob_dataset_version: Option<u64>,
104
105 pub base_paths: HashMap<u32, BasePath>,
107}
108
109pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
111
112pub fn is_detached_version(version: u64) -> bool {
113 version & DETACHED_VERSION_MASK != 0
114}
115
116fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
117 fragments
118 .iter()
119 .map(|f| f.num_rows().unwrap_or_default())
120 .chain([0]) .scan(0_usize, |offset, len| {
122 let start = *offset;
123 *offset += len;
124 Some(start)
125 })
126 .collect()
127}
128
129#[derive(Default)]
130pub struct ManifestSummary {
131 pub total_fragments: u64,
132 pub total_data_files: u64,
133 pub total_files_size: u64,
134 pub total_deletion_files: u64,
135 pub total_data_file_rows: u64,
136 pub total_deletion_file_rows: u64,
137 pub total_rows: u64,
138}
139
140impl From<ManifestSummary> for BTreeMap<String, String> {
141 fn from(summary: ManifestSummary) -> Self {
142 let mut stats_map = Self::new();
143 stats_map.insert(
144 "total_fragments".to_string(),
145 summary.total_fragments.to_string(),
146 );
147 stats_map.insert(
148 "total_data_files".to_string(),
149 summary.total_data_files.to_string(),
150 );
151 stats_map.insert(
152 "total_files_size".to_string(),
153 summary.total_files_size.to_string(),
154 );
155 stats_map.insert(
156 "total_deletion_files".to_string(),
157 summary.total_deletion_files.to_string(),
158 );
159 stats_map.insert(
160 "total_data_file_rows".to_string(),
161 summary.total_data_file_rows.to_string(),
162 );
163 stats_map.insert(
164 "total_deletion_file_rows".to_string(),
165 summary.total_deletion_file_rows.to_string(),
166 );
167 stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
168 stats_map
169 }
170}
171
172impl Manifest {
173 pub fn new(
174 schema: Schema,
175 fragments: Arc<Vec<Fragment>>,
176 data_storage_format: DataStorageFormat,
177 blob_dataset_version: Option<u64>,
178 base_paths: HashMap<u32, BasePath>,
179 ) -> Self {
180 let fragment_offsets = compute_fragment_offsets(&fragments);
181 let local_schema = schema.retain_storage_class(StorageClass::Default);
182
183 Self {
184 schema,
185 local_schema,
186 version: 1,
187 branch: None,
188 writer_version: Some(WriterVersion::default()),
189 fragments,
190 version_aux_data: 0,
191 index_section: None,
192 timestamp_nanos: 0,
193 tag: None,
194 reader_feature_flags: 0,
195 writer_feature_flags: 0,
196 max_fragment_id: None,
197 transaction_file: None,
198 fragment_offsets,
199 next_row_id: 0,
200 data_storage_format,
201 config: HashMap::new(),
202 table_metadata: HashMap::new(),
203 blob_dataset_version,
204 base_paths,
205 }
206 }
207
208 pub fn new_from_previous(
209 previous: &Self,
210 schema: Schema,
211 fragments: Arc<Vec<Fragment>>,
212 new_blob_version: Option<u64>,
213 ) -> Self {
214 let fragment_offsets = compute_fragment_offsets(&fragments);
215 let local_schema = schema.retain_storage_class(StorageClass::Default);
216
217 let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
218
219 Self {
220 schema,
221 local_schema,
222 version: previous.version + 1,
223 branch: previous.branch.clone(),
224 writer_version: Some(WriterVersion::default()),
225 fragments,
226 version_aux_data: 0,
227 index_section: None, timestamp_nanos: 0, tag: None,
230 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
233 transaction_file: None,
234 fragment_offsets,
235 next_row_id: previous.next_row_id,
236 data_storage_format: previous.data_storage_format.clone(),
237 config: previous.config.clone(),
238 table_metadata: previous.table_metadata.clone(),
239 blob_dataset_version,
240 base_paths: previous.base_paths.clone(),
241 }
242 }
243
244 pub fn shallow_clone(
249 &self,
250 ref_name: Option<String>,
251 ref_path: String,
252 ref_base_id: u32,
253 branch_name: Option<String>,
254 transaction_file: String,
255 ) -> Self {
256 let cloned_fragments = self
257 .fragments
258 .as_ref()
259 .iter()
260 .map(|fragment| {
261 let mut cloned_fragment = fragment.clone();
262 for file in &mut cloned_fragment.files {
263 if file.base_id.is_none() {
264 file.base_id = Some(ref_base_id);
265 }
266 }
267
268 if let Some(deletion) = &mut cloned_fragment.deletion_file {
269 if deletion.base_id.is_none() {
270 deletion.base_id = Some(ref_base_id);
271 }
272 }
273 cloned_fragment
274 })
275 .collect::<Vec<_>>();
276
277 Self {
278 schema: self.schema.clone(),
279 local_schema: self.local_schema.clone(),
280 version: self.version,
281 branch: branch_name,
282 writer_version: self.writer_version.clone(),
283 fragments: Arc::new(cloned_fragments),
284 version_aux_data: self.version_aux_data,
285 index_section: None, timestamp_nanos: self.timestamp_nanos,
287 tag: None,
288 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: self.max_fragment_id,
291 transaction_file: Some(transaction_file),
292 fragment_offsets: self.fragment_offsets.clone(),
293 next_row_id: self.next_row_id,
294 data_storage_format: self.data_storage_format.clone(),
295 config: self.config.clone(),
296 blob_dataset_version: self.blob_dataset_version,
297 base_paths: {
298 let mut base_paths = self.base_paths.clone();
299 let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
300 base_paths.insert(ref_base_id, base_path);
301 base_paths
302 },
303 table_metadata: self.table_metadata.clone(),
304 }
305 }
306
307 pub fn timestamp(&self) -> DateTime<Utc> {
309 let nanos = self.timestamp_nanos % 1_000_000_000;
310 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
311 Utc.from_utc_datetime(
312 &DateTime::from_timestamp(seconds, nanos as u32)
313 .unwrap_or_default()
314 .naive_utc(),
315 )
316 }
317
318 pub fn set_timestamp(&mut self, nanos: u128) {
320 self.timestamp_nanos = nanos;
321 }
322
323 pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
325 &mut self.config
326 }
327
328 pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
330 &mut self.table_metadata
331 }
332
333 pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
335 &mut self.schema.metadata
336 }
337
338 pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
342 self.schema
343 .field_by_id_mut(field_id)
344 .map(|field| &mut field.metadata)
345 }
346
347 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
349 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
350 self.config.extend(upsert_values);
351 }
352
353 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
355 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
356 self.config
357 .retain(|key, _| !delete_keys.contains(&key.as_str()));
358 }
359
360 #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
362 pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
363 self.schema.metadata = new_metadata;
364 }
365
366 #[deprecated(
370 note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
371 )]
372 pub fn replace_field_metadata(
373 &mut self,
374 field_id: i32,
375 new_metadata: HashMap<String, String>,
376 ) -> Result<()> {
377 if let Some(field) = self.schema.field_by_id_mut(field_id) {
378 field.metadata = new_metadata;
379 Ok(())
380 } else {
381 Err(Error::invalid_input(
382 format!(
383 "Field with id {} does not exist for replace_field_metadata",
384 field_id
385 ),
386 location!(),
387 ))
388 }
389 }
390
391 pub fn update_max_fragment_id(&mut self) {
393 if self.fragments.is_empty() {
395 return;
396 }
397
398 let max_fragment_id = self
399 .fragments
400 .iter()
401 .map(|f| f.id)
402 .max()
403 .unwrap() .try_into()
405 .unwrap();
406
407 match self.max_fragment_id {
408 None => {
409 self.max_fragment_id = Some(max_fragment_id);
411 }
412 Some(current_max) => {
413 if max_fragment_id > current_max {
416 self.max_fragment_id = Some(max_fragment_id);
417 }
418 }
419 }
420 }
421
422 pub fn max_fragment_id(&self) -> Option<u64> {
427 if let Some(max_id) = self.max_fragment_id {
428 Some(max_id.into())
430 } else {
431 self.fragments.iter().map(|f| f.id).max()
433 }
434 }
435
436 pub fn max_field_id(&self) -> i32 {
441 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
442 let fragment_max_id = self
443 .fragments
444 .iter()
445 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
446 .max()
447 .copied();
448 let fragment_max_id = fragment_max_id.unwrap_or(-1);
449 schema_max_id.max(fragment_max_id)
450 }
451
452 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
455 if since.version >= self.version {
456 return Err(Error::io(
457 format!(
458 "fragments_since: given version {} is newer than manifest version {}",
459 since.version, self.version
460 ),
461 location!(),
462 ));
463 }
464 let start = since.max_fragment_id();
465 Ok(self
466 .fragments
467 .iter()
468 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
469 .cloned()
470 .collect())
471 }
472
473 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
489 let start = range.start;
490 let end = range.end;
491 let idx = self
492 .fragment_offsets
493 .binary_search(&start)
494 .unwrap_or_else(|idx| idx - 1);
495
496 let mut fragments = vec![];
497 for i in idx..self.fragments.len() {
498 if self.fragment_offsets[i] >= end
499 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
500 <= start
501 {
502 break;
503 }
504 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
505 }
506
507 fragments
508 }
509
510 pub fn uses_stable_row_ids(&self) -> bool {
512 self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
513 }
514
515 pub fn serialized(&self) -> Vec<u8> {
518 let pb_manifest: pb::Manifest = self.into();
519 pb_manifest.encode_to_vec()
520 }
521
522 pub fn should_use_legacy_format(&self) -> bool {
523 self.data_storage_format.version == LEGACY_FORMAT_VERSION
524 }
525
526 pub fn summary(&self) -> ManifestSummary {
537 let mut summary =
539 self.fragments
540 .iter()
541 .fold(ManifestSummary::default(), |mut summary, f| {
542 summary.total_data_files += f.files.len() as u64;
544 if let Some(num_rows) = f.num_rows() {
546 summary.total_rows += num_rows as u64;
547 }
548 for data_file in &f.files {
550 if let Some(size_bytes) = data_file.file_size_bytes.get() {
551 summary.total_files_size += size_bytes.get();
552 }
553 }
554 if f.deletion_file.is_some() {
556 summary.total_deletion_files += 1;
557 }
558 if let Some(deletion_file) = &f.deletion_file {
560 if let Some(num_deleted) = deletion_file.num_deleted_rows {
561 summary.total_deletion_file_rows += num_deleted as u64;
562 }
563 }
564 summary
565 });
566 summary.total_fragments = self.fragments.len() as u64;
567 summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
568
569 summary
570 }
571}
572
573#[derive(Debug, Clone, PartialEq)]
574pub struct BasePath {
575 pub id: u32,
576 pub name: Option<String>,
577 pub is_dataset_root: bool,
578 pub path: String,
580}
581
582impl BasePath {
583 pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
592 Self {
593 id,
594 name,
595 is_dataset_root,
596 path,
597 }
598 }
599
600 pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
604 ObjectStore::extract_path_from_uri(registry, &self.path)
605 }
606}
607
608impl DeepSizeOf for BasePath {
609 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
610 self.name.deep_size_of_children(context)
611 + self.path.deep_size_of_children(context) * 2
612 + size_of::<bool>()
613 }
614}
615
616#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
617pub struct WriterVersion {
618 pub library: String,
619 pub version: String,
620}
621
622#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
623pub struct DataStorageFormat {
624 pub file_format: String,
625 pub version: String,
626}
627
628const LANCE_FORMAT_NAME: &str = "lance";
629
630impl DataStorageFormat {
631 pub fn new(version: LanceFileVersion) -> Self {
632 Self {
633 file_format: LANCE_FORMAT_NAME.to_string(),
634 version: version.resolve().to_string(),
635 }
636 }
637
638 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
639 self.version.parse::<LanceFileVersion>()
640 }
641}
642
643impl Default for DataStorageFormat {
644 fn default() -> Self {
645 Self::new(LanceFileVersion::default())
646 }
647}
648
649impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
650 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
651 Self {
652 file_format: pb.file_format,
653 version: pb.version,
654 }
655 }
656}
657
658#[derive(Debug, Clone, Copy, PartialEq, Eq)]
659pub enum VersionPart {
660 Major,
661 Minor,
662 Patch,
663}
664
665impl WriterVersion {
666 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
669 let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
671 (
672 &self.version[..dash_idx],
673 Some(&self.version[dash_idx + 1..]),
674 )
675 } else {
676 (self.version.as_str(), None)
677 };
678
679 let mut parts = version_part.split('.');
680 let major = parts.next().unwrap_or("0").parse().ok()?;
681 let minor = parts.next().unwrap_or("0").parse().ok()?;
682 let patch = parts.next().unwrap_or("0").parse().ok()?;
683
684 Some((major, minor, patch, tag))
685 }
686
687 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
688 self.semver()
689 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
690 }
691
692 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
694 let version = self.semver_or_panic();
695 (version.0, version.1, version.2) < (major, minor, patch)
696 }
697
698 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
699 let parts = self.semver_or_panic();
700 let tag = if keep_tag { parts.3 } else { None };
701 let new_parts = match part {
702 VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
703 VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
704 VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
705 };
706 let new_version = if let Some(tag) = tag {
707 format!("{}.{}.{}-{}", new_parts.0, new_parts.1, new_parts.2, tag)
708 } else {
709 format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
710 };
711 Self {
712 library: self.library.clone(),
713 version: new_version,
714 }
715 }
716}
717
718impl Default for WriterVersion {
719 #[cfg(not(test))]
720 fn default() -> Self {
721 Self {
722 library: "lance".to_string(),
723 version: env!("CARGO_PKG_VERSION").to_string(),
724 }
725 }
726
727 #[cfg(test)]
729 fn default() -> Self {
730 Self {
731 library: "lance".to_string(),
732 version: env!("CARGO_PKG_VERSION").to_string(),
733 }
734 .bump(VersionPart::Patch, true)
735 }
736}
737
738impl ProtoStruct for Manifest {
739 type Proto = pb::Manifest;
740}
741
742impl From<pb::BasePath> for BasePath {
743 fn from(p: pb::BasePath) -> Self {
744 Self::new(p.id, p.path, p.name, p.is_dataset_root)
745 }
746}
747
748impl From<BasePath> for pb::BasePath {
749 fn from(p: BasePath) -> Self {
750 Self {
751 id: p.id,
752 name: p.name,
753 is_dataset_root: p.is_dataset_root,
754 path: p.path,
755 }
756 }
757}
758
759impl TryFrom<pb::Manifest> for Manifest {
760 type Error = Error;
761
762 fn try_from(p: pb::Manifest) -> Result<Self> {
763 let timestamp_nanos = p.timestamp.map(|ts| {
764 let sec = ts.seconds as u128 * 1e9 as u128;
765 let nanos = ts.nanos as u128;
766 sec + nanos
767 });
768 let writer_version = match p.writer_version {
770 Some(pb::manifest::WriterVersion { library, version }) => {
771 Some(WriterVersion { library, version })
772 }
773 _ => None,
774 };
775 let fragments = Arc::new(
776 p.fragments
777 .into_iter()
778 .map(Fragment::try_from)
779 .collect::<Result<Vec<_>>>()?,
780 );
781 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
782 let fields_with_meta = FieldsWithMeta {
783 fields: Fields(p.fields),
784 metadata: p.schema_metadata,
785 };
786
787 if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
788 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
789 {
790 return Err(Error::Internal {
791 message: "All fragments must have row ids".into(),
792 location: location!(),
793 });
794 }
795
796 let data_storage_format = match p.data_format {
797 None => {
798 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
799 DataStorageFormat::new(inferred_version)
801 } else {
802 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
804 DataStorageFormat::new(LanceFileVersion::Stable)
805 } else {
806 DataStorageFormat::new(LanceFileVersion::Legacy)
807 }
808 }
809 }
810 Some(format) => DataStorageFormat::from(format),
811 };
812
813 let schema = Schema::from(fields_with_meta);
814 let local_schema = schema.retain_storage_class(StorageClass::Default);
815
816 Ok(Self {
817 schema,
818 local_schema,
819 version: p.version,
820 branch: p.branch,
821 writer_version,
822 version_aux_data: p.version_aux_data as usize,
823 index_section: p.index_section.map(|i| i as usize),
824 timestamp_nanos: timestamp_nanos.unwrap_or(0),
825 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
826 reader_feature_flags: p.reader_feature_flags,
827 writer_feature_flags: p.writer_feature_flags,
828 max_fragment_id: p.max_fragment_id,
829 fragments,
830 transaction_file: if p.transaction_file.is_empty() {
831 None
832 } else {
833 Some(p.transaction_file)
834 },
835 fragment_offsets,
836 next_row_id: p.next_row_id,
837 data_storage_format,
838 config: p.config,
839 table_metadata: p.table_metadata,
840 blob_dataset_version: if p.blob_dataset_version == 0 {
841 None
842 } else {
843 Some(p.blob_dataset_version)
844 },
845 base_paths: p
846 .base_paths
847 .iter()
848 .map(|item| (item.id, item.clone().into()))
849 .collect(),
850 })
851 }
852}
853
854impl From<&Manifest> for pb::Manifest {
855 fn from(m: &Manifest) -> Self {
856 let timestamp_nanos = if m.timestamp_nanos == 0 {
857 None
858 } else {
859 let nanos = m.timestamp_nanos % 1e9 as u128;
860 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
861 Some(Timestamp {
862 seconds,
863 nanos: nanos as i32,
864 })
865 };
866 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
867 Self {
868 fields: fields_with_meta.fields.0,
869 schema_metadata: m
870 .schema
871 .metadata
872 .iter()
873 .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
874 .collect(),
875 version: m.version,
876 branch: m.branch.clone(),
877 writer_version: m
878 .writer_version
879 .as_ref()
880 .map(|wv| pb::manifest::WriterVersion {
881 library: wv.library.clone(),
882 version: wv.version.clone(),
883 }),
884 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
885 table_metadata: m.table_metadata.clone(),
886 version_aux_data: m.version_aux_data as u64,
887 index_section: m.index_section.map(|i| i as u64),
888 timestamp: timestamp_nanos,
889 tag: m.tag.clone().unwrap_or_default(),
890 reader_feature_flags: m.reader_feature_flags,
891 writer_feature_flags: m.writer_feature_flags,
892 max_fragment_id: m.max_fragment_id,
893 transaction_file: m.transaction_file.clone().unwrap_or_default(),
894 next_row_id: m.next_row_id,
895 data_format: Some(pb::manifest::DataStorageFormat {
896 file_format: m.data_storage_format.file_format.clone(),
897 version: m.data_storage_format.version.clone(),
898 }),
899 config: m.config.clone(),
900 blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
901 base_paths: m
902 .base_paths
903 .values()
904 .map(|base_path| pb::BasePath {
905 id: base_path.id,
906 name: base_path.name.clone(),
907 is_dataset_root: base_path.is_dataset_root,
908 path: base_path.path.clone(),
909 })
910 .collect(),
911 }
912 }
913}
914
915#[async_trait]
916pub trait SelfDescribingFileReader {
917 async fn try_new_self_described(
925 object_store: &ObjectStore,
926 path: &Path,
927 cache: Option<&LanceCache>,
928 ) -> Result<Self>
929 where
930 Self: Sized,
931 {
932 let reader = object_store.open(path).await?;
933 Self::try_new_self_described_from_reader(reader.into(), cache).await
934 }
935
936 async fn try_new_self_described_from_reader(
937 reader: Arc<dyn Reader>,
938 cache: Option<&LanceCache>,
939 ) -> Result<Self>
940 where
941 Self: Sized;
942}
943
944#[async_trait]
945impl SelfDescribingFileReader for FileReader {
946 async fn try_new_self_described_from_reader(
947 reader: Arc<dyn Reader>,
948 cache: Option<&LanceCache>,
949 ) -> Result<Self> {
950 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
951 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
952 message: format!(
953 "Attempt to open file at {} as self-describing but it did not contain a manifest",
954 reader.path(),
955 ),
956 location: location!(),
957 })?;
958 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
959 if manifest.should_use_legacy_format() {
960 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
961 }
962 let schema = manifest.schema;
963 let max_field_id = schema.max_field_id().unwrap_or_default();
964 Self::try_new_from_reader(
965 reader.path(),
966 reader.clone(),
967 Some(metadata),
968 schema,
969 0,
970 0,
971 max_field_id,
972 cache,
973 )
974 .await
975 }
976}
977
978#[cfg(test)]
979mod tests {
980 use crate::format::{DataFile, DeletionFile, DeletionFileType};
981 use std::num::NonZero;
982
983 use super::*;
984
985 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
986 use lance_core::datatypes::Field;
987
988 #[test]
989 fn test_writer_version() {
990 let wv = WriterVersion::default();
991 assert_eq!(wv.library, "lance");
992 let parts = wv.semver().unwrap();
993
994 let cargo_version = env!("CARGO_PKG_VERSION");
996 let expected_tag = if cargo_version.contains('-') {
997 Some(cargo_version.split('-').nth(1).unwrap())
998 } else {
999 None
1000 };
1001
1002 assert_eq!(
1003 parts,
1004 (
1005 env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
1006 env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
1007 env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
1009 expected_tag
1010 )
1011 );
1012
1013 let base_version = cargo_version.split('-').next().unwrap();
1015 assert_eq!(
1016 format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
1017 base_version
1018 );
1019
1020 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1021 let bumped = wv.bump(*part, false);
1022 let bumped_parts = bumped.semver_or_panic();
1023 assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
1024 }
1025 }
1026
1027 #[test]
1028 fn test_fragments_by_offset_range() {
1029 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1030 "a",
1031 arrow_schema::DataType::Int64,
1032 false,
1033 )]);
1034 let schema = Schema::try_from(&arrow_schema).unwrap();
1035 let fragments = vec![
1036 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1037 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1038 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1039 ];
1040 let manifest = Manifest::new(
1041 schema,
1042 Arc::new(fragments),
1043 DataStorageFormat::default(),
1044 None,
1045 HashMap::new(),
1046 );
1047
1048 let actual = manifest.fragments_by_offset_range(0..10);
1049 assert_eq!(actual.len(), 1);
1050 assert_eq!(actual[0].0, 0);
1051 assert_eq!(actual[0].1.id, 0);
1052
1053 let actual = manifest.fragments_by_offset_range(5..15);
1054 assert_eq!(actual.len(), 2);
1055 assert_eq!(actual[0].0, 0);
1056 assert_eq!(actual[0].1.id, 0);
1057 assert_eq!(actual[1].0, 10);
1058 assert_eq!(actual[1].1.id, 1);
1059
1060 let actual = manifest.fragments_by_offset_range(15..50);
1061 assert_eq!(actual.len(), 2);
1062 assert_eq!(actual[0].0, 10);
1063 assert_eq!(actual[0].1.id, 1);
1064 assert_eq!(actual[1].0, 25);
1065 assert_eq!(actual[1].1.id, 2);
1066
1067 let actual = manifest.fragments_by_offset_range(45..100);
1069 assert!(actual.is_empty());
1070
1071 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1072 }
1073
1074 #[test]
1075 fn test_max_field_id() {
1076 let mut field0 =
1078 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1079 field0.set_id(-1, &mut 0);
1080 let mut field2 =
1081 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1082 field2.set_id(-1, &mut 2);
1083
1084 let schema = Schema {
1085 fields: vec![field0, field2],
1086 metadata: Default::default(),
1087 };
1088 let fragments = vec![
1089 Fragment {
1090 id: 0,
1091 files: vec![DataFile::new_legacy_from_fields(
1092 "path1",
1093 vec![0, 1, 2],
1094 None,
1095 )],
1096 deletion_file: None,
1097 row_id_meta: None,
1098 physical_rows: None,
1099 created_at_version_meta: None,
1100 last_updated_at_version_meta: None,
1101 },
1102 Fragment {
1103 id: 1,
1104 files: vec![
1105 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1106 DataFile::new_legacy_from_fields("path3", vec![2], None),
1107 ],
1108 deletion_file: None,
1109 row_id_meta: None,
1110 physical_rows: None,
1111 created_at_version_meta: None,
1112 last_updated_at_version_meta: None,
1113 },
1114 ];
1115
1116 let manifest = Manifest::new(
1117 schema,
1118 Arc::new(fragments),
1119 DataStorageFormat::default(),
1120 None,
1121 HashMap::new(),
1122 );
1123
1124 assert_eq!(manifest.max_field_id(), 43);
1125 }
1126
1127 #[test]
1128 fn test_config() {
1129 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1130 "a",
1131 arrow_schema::DataType::Int64,
1132 false,
1133 )]);
1134 let schema = Schema::try_from(&arrow_schema).unwrap();
1135 let fragments = vec![
1136 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1137 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1138 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1139 ];
1140 let mut manifest = Manifest::new(
1141 schema,
1142 Arc::new(fragments),
1143 DataStorageFormat::default(),
1144 None,
1145 HashMap::new(),
1146 );
1147
1148 let mut config = manifest.config.clone();
1149 config.insert("lance.test".to_string(), "value".to_string());
1150 config.insert("other-key".to_string(), "other-value".to_string());
1151
1152 manifest.config_mut().extend(config.clone());
1153 assert_eq!(manifest.config, config.clone());
1154
1155 config.remove("other-key");
1156 manifest.config_mut().remove("other-key");
1157 assert_eq!(manifest.config, config);
1158 }
1159
1160 #[test]
1161 fn test_manifest_summary() {
1162 let arrow_schema = ArrowSchema::new(vec![
1164 ArrowField::new("id", arrow_schema::DataType::Int64, false),
1165 ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1166 ]);
1167 let schema = Schema::try_from(&arrow_schema).unwrap();
1168
1169 let empty_manifest = Manifest::new(
1170 schema.clone(),
1171 Arc::new(vec![]),
1172 DataStorageFormat::default(),
1173 None,
1174 HashMap::new(),
1175 );
1176
1177 let empty_summary = empty_manifest.summary();
1178 assert_eq!(empty_summary.total_rows, 0);
1179 assert_eq!(empty_summary.total_files_size, 0);
1180 assert_eq!(empty_summary.total_fragments, 0);
1181 assert_eq!(empty_summary.total_data_files, 0);
1182 assert_eq!(empty_summary.total_deletion_file_rows, 0);
1183 assert_eq!(empty_summary.total_data_file_rows, 0);
1184 assert_eq!(empty_summary.total_deletion_files, 0);
1185
1186 let empty_fragments = vec![
1188 Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1189 Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1190 ];
1191
1192 let empty_files_manifest = Manifest::new(
1193 schema.clone(),
1194 Arc::new(empty_fragments),
1195 DataStorageFormat::default(),
1196 None,
1197 HashMap::new(),
1198 );
1199
1200 let empty_files_summary = empty_files_manifest.summary();
1201 assert_eq!(empty_files_summary.total_rows, 0);
1202 assert_eq!(empty_files_summary.total_files_size, 0);
1203 assert_eq!(empty_files_summary.total_fragments, 2);
1204 assert_eq!(empty_files_summary.total_data_files, 2);
1205 assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1206 assert_eq!(empty_files_summary.total_data_file_rows, 0);
1207 assert_eq!(empty_files_summary.total_deletion_files, 0);
1208
1209 let real_fragments = vec![
1211 Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1212 Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1213 Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1214 ];
1215
1216 let real_data_manifest = Manifest::new(
1217 schema.clone(),
1218 Arc::new(real_fragments),
1219 DataStorageFormat::default(),
1220 None,
1221 HashMap::new(),
1222 );
1223
1224 let real_data_summary = real_data_manifest.summary();
1225 assert_eq!(real_data_summary.total_rows, 425); assert_eq!(real_data_summary.total_files_size, 0); assert_eq!(real_data_summary.total_fragments, 3);
1228 assert_eq!(real_data_summary.total_data_files, 3);
1229 assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1230 assert_eq!(real_data_summary.total_data_file_rows, 425);
1231 assert_eq!(real_data_summary.total_deletion_files, 0);
1232
1233 let file_version = LanceFileVersion::default();
1234 let mut fragment_with_deletion = Fragment::new(0)
1236 .with_file(
1237 "data_with_deletion.lance",
1238 vec![0, 1],
1239 vec![0, 1],
1240 &file_version,
1241 NonZero::new(1000),
1242 )
1243 .with_physical_rows(50);
1244 fragment_with_deletion.deletion_file = Some(DeletionFile {
1245 read_version: 123,
1246 id: 456,
1247 file_type: DeletionFileType::Array,
1248 num_deleted_rows: Some(10),
1249 base_id: None,
1250 });
1251
1252 let manifest_with_deletion = Manifest::new(
1253 schema,
1254 Arc::new(vec![fragment_with_deletion]),
1255 DataStorageFormat::default(),
1256 None,
1257 HashMap::new(),
1258 );
1259
1260 let deletion_summary = manifest_with_deletion.summary();
1261 assert_eq!(deletion_summary.total_rows, 40); assert_eq!(deletion_summary.total_files_size, 1000);
1263 assert_eq!(deletion_summary.total_fragments, 1);
1264 assert_eq!(deletion_summary.total_data_files, 1);
1265 assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1266 assert_eq!(deletion_summary.total_data_file_rows, 50);
1267 assert_eq!(deletion_summary.total_deletion_files, 1);
1268
1269 let stats_map: BTreeMap<String, String> = deletion_summary.into();
1271 assert_eq!(stats_map.len(), 7)
1272 }
1273}