1use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
8use lance_file::reader::FileReader;
9use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::{Schema, StorageClass};
23use lance_core::{Error, Result};
24use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
25use lance_io::utils::read_struct;
26use snafu::location;
27
28#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36 pub schema: Schema,
38
39 pub local_schema: Schema,
41
42 pub version: u64,
44
45 pub branch: Option<String>,
47
48 pub writer_version: Option<WriterVersion>,
50
51 pub fragments: Arc<Vec<Fragment>>,
56
57 pub version_aux_data: usize,
59
60 pub index_section: Option<usize>,
62
63 pub timestamp_nanos: u128,
65
66 pub tag: Option<String>,
68
69 pub reader_feature_flags: u64,
71
72 pub writer_feature_flags: u64,
74
75 pub max_fragment_id: Option<u32>,
78
79 pub transaction_file: Option<String>,
81
82 fragment_offsets: Vec<usize>,
85
86 pub next_row_id: u64,
88
89 pub data_storage_format: DataStorageFormat,
91
92 pub config: HashMap<String, String>,
94
95 pub table_metadata: HashMap<String, String>,
101
102 pub blob_dataset_version: Option<u64>,
104
105 pub base_paths: HashMap<u32, BasePath>,
107}
108
109pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
111
112pub fn is_detached_version(version: u64) -> bool {
113 version & DETACHED_VERSION_MASK != 0
114}
115
116fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
117 fragments
118 .iter()
119 .map(|f| f.num_rows().unwrap_or_default())
120 .chain([0]) .scan(0_usize, |offset, len| {
122 let start = *offset;
123 *offset += len;
124 Some(start)
125 })
126 .collect()
127}
128
129#[derive(Default)]
130pub struct ManifestSummary {
131 pub total_fragments: u64,
132 pub total_data_files: u64,
133 pub total_files_size: u64,
134 pub total_deletion_files: u64,
135 pub total_data_file_rows: u64,
136 pub total_deletion_file_rows: u64,
137 pub total_rows: u64,
138}
139
140impl From<ManifestSummary> for BTreeMap<String, String> {
141 fn from(summary: ManifestSummary) -> Self {
142 let mut stats_map = Self::new();
143 stats_map.insert(
144 "total_fragments".to_string(),
145 summary.total_fragments.to_string(),
146 );
147 stats_map.insert(
148 "total_data_files".to_string(),
149 summary.total_data_files.to_string(),
150 );
151 stats_map.insert(
152 "total_files_size".to_string(),
153 summary.total_files_size.to_string(),
154 );
155 stats_map.insert(
156 "total_deletion_files".to_string(),
157 summary.total_deletion_files.to_string(),
158 );
159 stats_map.insert(
160 "total_data_file_rows".to_string(),
161 summary.total_data_file_rows.to_string(),
162 );
163 stats_map.insert(
164 "total_deletion_file_rows".to_string(),
165 summary.total_deletion_file_rows.to_string(),
166 );
167 stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
168 stats_map
169 }
170}
171
172impl Manifest {
173 pub fn new(
174 schema: Schema,
175 fragments: Arc<Vec<Fragment>>,
176 data_storage_format: DataStorageFormat,
177 blob_dataset_version: Option<u64>,
178 base_paths: HashMap<u32, BasePath>,
179 ) -> Self {
180 let fragment_offsets = compute_fragment_offsets(&fragments);
181 let local_schema = schema.retain_storage_class(StorageClass::Default);
182
183 Self {
184 schema,
185 local_schema,
186 version: 1,
187 branch: None,
188 writer_version: Some(WriterVersion::default()),
189 fragments,
190 version_aux_data: 0,
191 index_section: None,
192 timestamp_nanos: 0,
193 tag: None,
194 reader_feature_flags: 0,
195 writer_feature_flags: 0,
196 max_fragment_id: None,
197 transaction_file: None,
198 fragment_offsets,
199 next_row_id: 0,
200 data_storage_format,
201 config: HashMap::new(),
202 table_metadata: HashMap::new(),
203 blob_dataset_version,
204 base_paths,
205 }
206 }
207
208 pub fn new_from_previous(
209 previous: &Self,
210 schema: Schema,
211 fragments: Arc<Vec<Fragment>>,
212 new_blob_version: Option<u64>,
213 ) -> Self {
214 let fragment_offsets = compute_fragment_offsets(&fragments);
215 let local_schema = schema.retain_storage_class(StorageClass::Default);
216
217 let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
218
219 Self {
220 schema,
221 local_schema,
222 version: previous.version + 1,
223 branch: previous.branch.clone(),
224 writer_version: Some(WriterVersion::default()),
225 fragments,
226 version_aux_data: 0,
227 index_section: None, timestamp_nanos: 0, tag: None,
230 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
233 transaction_file: None,
234 fragment_offsets,
235 next_row_id: previous.next_row_id,
236 data_storage_format: previous.data_storage_format.clone(),
237 config: previous.config.clone(),
238 table_metadata: previous.table_metadata.clone(),
239 blob_dataset_version,
240 base_paths: previous.base_paths.clone(),
241 }
242 }
243
244 pub fn shallow_clone(
249 &self,
250 ref_name: Option<String>,
251 ref_path: String,
252 ref_base_id: u32,
253 branch_name: Option<String>,
254 transaction_file: String,
255 ) -> Self {
256 let cloned_fragments = self
257 .fragments
258 .as_ref()
259 .iter()
260 .map(|fragment| {
261 let mut cloned_fragment = fragment.clone();
262 for file in &mut cloned_fragment.files {
263 if file.base_id.is_none() {
264 file.base_id = Some(ref_base_id);
265 }
266 }
267
268 if let Some(deletion) = &mut cloned_fragment.deletion_file {
269 if deletion.base_id.is_none() {
270 deletion.base_id = Some(ref_base_id);
271 }
272 }
273 cloned_fragment
274 })
275 .collect::<Vec<_>>();
276
277 Self {
278 schema: self.schema.clone(),
279 local_schema: self.local_schema.clone(),
280 version: self.version,
281 branch: branch_name,
282 writer_version: self.writer_version.clone(),
283 fragments: Arc::new(cloned_fragments),
284 version_aux_data: self.version_aux_data,
285 index_section: None, timestamp_nanos: self.timestamp_nanos,
287 tag: None,
288 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: self.max_fragment_id,
291 transaction_file: Some(transaction_file),
292 fragment_offsets: self.fragment_offsets.clone(),
293 next_row_id: self.next_row_id,
294 data_storage_format: self.data_storage_format.clone(),
295 config: self.config.clone(),
296 blob_dataset_version: self.blob_dataset_version,
297 base_paths: {
298 let mut base_paths = self.base_paths.clone();
299 let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
300 base_paths.insert(ref_base_id, base_path);
301 base_paths
302 },
303 table_metadata: self.table_metadata.clone(),
304 }
305 }
306
307 pub fn timestamp(&self) -> DateTime<Utc> {
309 let nanos = self.timestamp_nanos % 1_000_000_000;
310 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
311 Utc.from_utc_datetime(
312 &DateTime::from_timestamp(seconds, nanos as u32)
313 .unwrap_or_default()
314 .naive_utc(),
315 )
316 }
317
318 pub fn set_timestamp(&mut self, nanos: u128) {
320 self.timestamp_nanos = nanos;
321 }
322
323 pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
325 &mut self.config
326 }
327
328 pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
330 &mut self.table_metadata
331 }
332
333 pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
335 &mut self.schema.metadata
336 }
337
338 pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
342 self.schema
343 .field_by_id_mut(field_id)
344 .map(|field| &mut field.metadata)
345 }
346
347 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
349 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
350 self.config.extend(upsert_values);
351 }
352
353 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
355 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
356 self.config
357 .retain(|key, _| !delete_keys.contains(&key.as_str()));
358 }
359
360 #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
362 pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
363 self.schema.metadata = new_metadata;
364 }
365
366 #[deprecated(
370 note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
371 )]
372 pub fn replace_field_metadata(
373 &mut self,
374 field_id: i32,
375 new_metadata: HashMap<String, String>,
376 ) -> Result<()> {
377 if let Some(field) = self.schema.field_by_id_mut(field_id) {
378 field.metadata = new_metadata;
379 Ok(())
380 } else {
381 Err(Error::invalid_input(
382 format!(
383 "Field with id {} does not exist for replace_field_metadata",
384 field_id
385 ),
386 location!(),
387 ))
388 }
389 }
390
391 pub fn update_max_fragment_id(&mut self) {
393 if self.fragments.is_empty() {
395 return;
396 }
397
398 let max_fragment_id = self
399 .fragments
400 .iter()
401 .map(|f| f.id)
402 .max()
403 .unwrap() .try_into()
405 .unwrap();
406
407 match self.max_fragment_id {
408 None => {
409 self.max_fragment_id = Some(max_fragment_id);
411 }
412 Some(current_max) => {
413 if max_fragment_id > current_max {
416 self.max_fragment_id = Some(max_fragment_id);
417 }
418 }
419 }
420 }
421
422 pub fn max_fragment_id(&self) -> Option<u64> {
427 if let Some(max_id) = self.max_fragment_id {
428 Some(max_id.into())
430 } else {
431 self.fragments.iter().map(|f| f.id).max()
433 }
434 }
435
436 pub fn max_field_id(&self) -> i32 {
441 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
442 let fragment_max_id = self
443 .fragments
444 .iter()
445 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
446 .max()
447 .copied();
448 let fragment_max_id = fragment_max_id.unwrap_or(-1);
449 schema_max_id.max(fragment_max_id)
450 }
451
452 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
455 if since.version >= self.version {
456 return Err(Error::io(
457 format!(
458 "fragments_since: given version {} is newer than manifest version {}",
459 since.version, self.version
460 ),
461 location!(),
462 ));
463 }
464 let start = since.max_fragment_id();
465 Ok(self
466 .fragments
467 .iter()
468 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
469 .cloned()
470 .collect())
471 }
472
473 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
489 let start = range.start;
490 let end = range.end;
491 let idx = self
492 .fragment_offsets
493 .binary_search(&start)
494 .unwrap_or_else(|idx| idx - 1);
495
496 let mut fragments = vec![];
497 for i in idx..self.fragments.len() {
498 if self.fragment_offsets[i] >= end
499 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
500 <= start
501 {
502 break;
503 }
504 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
505 }
506
507 fragments
508 }
509
510 pub fn uses_stable_row_ids(&self) -> bool {
512 self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
513 }
514
515 pub fn serialized(&self) -> Vec<u8> {
518 let pb_manifest: pb::Manifest = self.into();
519 pb_manifest.encode_to_vec()
520 }
521
522 pub fn should_use_legacy_format(&self) -> bool {
523 self.data_storage_format.version == LEGACY_FORMAT_VERSION
524 }
525
526 pub fn summary(&self) -> ManifestSummary {
537 let mut summary =
539 self.fragments
540 .iter()
541 .fold(ManifestSummary::default(), |mut summary, f| {
542 summary.total_data_files += f.files.len() as u64;
544 if let Some(num_rows) = f.num_rows() {
546 summary.total_rows += num_rows as u64;
547 }
548 for data_file in &f.files {
550 if let Some(size_bytes) = data_file.file_size_bytes.get() {
551 summary.total_files_size += size_bytes.get();
552 }
553 }
554 if f.deletion_file.is_some() {
556 summary.total_deletion_files += 1;
557 }
558 if let Some(deletion_file) = &f.deletion_file {
560 if let Some(num_deleted) = deletion_file.num_deleted_rows {
561 summary.total_deletion_file_rows += num_deleted as u64;
562 }
563 }
564 summary
565 });
566 summary.total_fragments = self.fragments.len() as u64;
567 summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
568
569 summary
570 }
571}
572
573#[derive(Debug, Clone, PartialEq)]
574pub struct BasePath {
575 pub id: u32,
576 pub name: Option<String>,
577 pub is_dataset_root: bool,
578 pub path: String,
580}
581
582impl BasePath {
583 pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
592 Self {
593 id,
594 name,
595 is_dataset_root,
596 path,
597 }
598 }
599
600 pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
604 ObjectStore::extract_path_from_uri(registry, &self.path)
605 }
606}
607
608impl DeepSizeOf for BasePath {
609 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
610 self.name.deep_size_of_children(context)
611 + self.path.deep_size_of_children(context) * 2
612 + size_of::<bool>()
613 }
614}
615
616#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
617pub struct WriterVersion {
618 pub library: String,
619 pub version: String,
620 pub prerelease: Option<String>,
621 pub build_metadata: Option<String>,
622}
623
624#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
625pub struct DataStorageFormat {
626 pub file_format: String,
627 pub version: String,
628}
629
630const LANCE_FORMAT_NAME: &str = "lance";
631
632impl DataStorageFormat {
633 pub fn new(version: LanceFileVersion) -> Self {
634 Self {
635 file_format: LANCE_FORMAT_NAME.to_string(),
636 version: version.resolve().to_string(),
637 }
638 }
639
640 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
641 self.version.parse::<LanceFileVersion>()
642 }
643}
644
645impl Default for DataStorageFormat {
646 fn default() -> Self {
647 Self::new(LanceFileVersion::default())
648 }
649}
650
651impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
652 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
653 Self {
654 file_format: pb.file_format,
655 version: pb.version,
656 }
657 }
658}
659
660#[derive(Debug, Clone, Copy, PartialEq, Eq)]
661pub enum VersionPart {
662 Major,
663 Minor,
664 Patch,
665}
666
667fn bump_version(version: &mut semver::Version, part: VersionPart) {
668 match part {
669 VersionPart::Major => {
670 version.major += 1;
671 version.minor = 0;
672 version.patch = 0;
673 }
674 VersionPart::Minor => {
675 version.minor += 1;
676 version.patch = 0;
677 }
678 VersionPart::Patch => {
679 version.patch += 1;
680 }
681 }
682}
683
684impl WriterVersion {
685 fn split_version(full_version: &str) -> Option<(String, Option<String>, Option<String>)> {
695 let mut parsed = semver::Version::parse(full_version).ok()?;
696
697 let prerelease = if parsed.pre.is_empty() {
698 None
699 } else {
700 Some(parsed.pre.to_string())
701 };
702
703 let build_metadata = if parsed.build.is_empty() {
704 None
705 } else {
706 Some(parsed.build.to_string())
707 };
708
709 parsed.pre = semver::Prerelease::EMPTY;
711 parsed.build = semver::BuildMetadata::EMPTY;
712 Some((parsed.to_string(), prerelease, build_metadata))
713 }
714
715 #[deprecated(note = "Use `lance_lib_version()` instead")]
718 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
719 let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
721 (
722 &self.version[..dash_idx],
723 Some(&self.version[dash_idx + 1..]),
724 )
725 } else {
726 (self.version.as_str(), None)
727 };
728
729 let mut parts = version_part.split('.');
730 let major = parts.next().unwrap_or("0").parse().ok()?;
731 let minor = parts.next().unwrap_or("0").parse().ok()?;
732 let patch = parts.next().unwrap_or("0").parse().ok()?;
733
734 Some((major, minor, patch, tag))
735 }
736
737 pub fn lance_lib_version(&self) -> Option<semver::Version> {
745 if self.library != "lance" {
746 return None;
747 }
748
749 let mut version = semver::Version::parse(&self.version).ok()?;
750
751 if let Some(ref prerelease) = self.prerelease {
752 version.pre = semver::Prerelease::new(prerelease).ok()?;
753 }
754
755 if let Some(ref build_metadata) = self.build_metadata {
756 version.build = semver::BuildMetadata::new(build_metadata).ok()?;
757 }
758
759 Some(version)
760 }
761
762 #[deprecated(
763 note = "Use `lance_lib_version()` instead, which safely checks the library field and returns Option"
764 )]
765 #[allow(deprecated)]
766 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
767 self.semver()
768 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
769 }
770
771 #[deprecated(note = "Use `lance_lib_version()` and its `older_than` method instead.")]
777 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
778 let version = self
779 .lance_lib_version()
780 .expect("Not lance library or invalid version");
781 let other = semver::Version {
782 major: major.into(),
783 minor: minor.into(),
784 patch: patch.into(),
785 pre: semver::Prerelease::EMPTY,
786 build: semver::BuildMetadata::EMPTY,
787 };
788 version < other
789 }
790
791 #[deprecated(note = "This is meant for testing and will be made private in future version.")]
792 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
793 let mut version = self.lance_lib_version().expect("Should be lance version");
794 bump_version(&mut version, part);
795 if !keep_tag {
796 version.pre = semver::Prerelease::EMPTY;
797 }
798 let (clean_version, prerelease, build_metadata) = Self::split_version(&version.to_string())
799 .expect("Bumped version should be valid semver");
800 Self {
801 library: self.library.clone(),
802 version: clean_version,
803 prerelease,
804 build_metadata,
805 }
806 }
807}
808
809impl Default for WriterVersion {
810 #[cfg(not(test))]
811 fn default() -> Self {
812 let full_version = env!("CARGO_PKG_VERSION");
813 let (version, prerelease, build_metadata) =
814 Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
815 Self {
816 library: "lance".to_string(),
817 version,
818 prerelease,
819 build_metadata,
820 }
821 }
822
823 #[cfg(test)]
825 #[allow(deprecated)]
826 fn default() -> Self {
827 let full_version = env!("CARGO_PKG_VERSION");
828 let (version, prerelease, build_metadata) =
829 Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
830 Self {
831 library: "lance".to_string(),
832 version,
833 prerelease,
834 build_metadata,
835 }
836 .bump(VersionPart::Patch, true)
837 }
838}
839
840impl ProtoStruct for Manifest {
841 type Proto = pb::Manifest;
842}
843
844impl From<pb::BasePath> for BasePath {
845 fn from(p: pb::BasePath) -> Self {
846 Self::new(p.id, p.path, p.name, p.is_dataset_root)
847 }
848}
849
850impl From<BasePath> for pb::BasePath {
851 fn from(p: BasePath) -> Self {
852 Self {
853 id: p.id,
854 name: p.name,
855 is_dataset_root: p.is_dataset_root,
856 path: p.path,
857 }
858 }
859}
860
861impl TryFrom<pb::Manifest> for Manifest {
862 type Error = Error;
863
864 fn try_from(p: pb::Manifest) -> Result<Self> {
865 let timestamp_nanos = p.timestamp.map(|ts| {
866 let sec = ts.seconds as u128 * 1e9 as u128;
867 let nanos = ts.nanos as u128;
868 sec + nanos
869 });
870 let writer_version = match p.writer_version {
872 Some(pb::manifest::WriterVersion {
873 library,
874 version,
875 prerelease,
876 build_metadata,
877 }) => Some(WriterVersion {
878 library,
879 version,
880 prerelease,
881 build_metadata,
882 }),
883 _ => None,
884 };
885 let fragments = Arc::new(
886 p.fragments
887 .into_iter()
888 .map(Fragment::try_from)
889 .collect::<Result<Vec<_>>>()?,
890 );
891 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
892 let fields_with_meta = FieldsWithMeta {
893 fields: Fields(p.fields),
894 metadata: p.schema_metadata,
895 };
896
897 if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
898 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
899 {
900 return Err(Error::Internal {
901 message: "All fragments must have row ids".into(),
902 location: location!(),
903 });
904 }
905
906 let data_storage_format = match p.data_format {
907 None => {
908 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
909 DataStorageFormat::new(inferred_version)
911 } else {
912 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
914 DataStorageFormat::new(LanceFileVersion::Stable)
915 } else {
916 DataStorageFormat::new(LanceFileVersion::Legacy)
917 }
918 }
919 }
920 Some(format) => DataStorageFormat::from(format),
921 };
922
923 let schema = Schema::from(fields_with_meta);
924 let local_schema = schema.retain_storage_class(StorageClass::Default);
925
926 Ok(Self {
927 schema,
928 local_schema,
929 version: p.version,
930 branch: p.branch,
931 writer_version,
932 version_aux_data: p.version_aux_data as usize,
933 index_section: p.index_section.map(|i| i as usize),
934 timestamp_nanos: timestamp_nanos.unwrap_or(0),
935 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
936 reader_feature_flags: p.reader_feature_flags,
937 writer_feature_flags: p.writer_feature_flags,
938 max_fragment_id: p.max_fragment_id,
939 fragments,
940 transaction_file: if p.transaction_file.is_empty() {
941 None
942 } else {
943 Some(p.transaction_file)
944 },
945 fragment_offsets,
946 next_row_id: p.next_row_id,
947 data_storage_format,
948 config: p.config,
949 table_metadata: p.table_metadata,
950 blob_dataset_version: if p.blob_dataset_version == 0 {
951 None
952 } else {
953 Some(p.blob_dataset_version)
954 },
955 base_paths: p
956 .base_paths
957 .iter()
958 .map(|item| (item.id, item.clone().into()))
959 .collect(),
960 })
961 }
962}
963
964impl From<&Manifest> for pb::Manifest {
965 fn from(m: &Manifest) -> Self {
966 let timestamp_nanos = if m.timestamp_nanos == 0 {
967 None
968 } else {
969 let nanos = m.timestamp_nanos % 1e9 as u128;
970 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
971 Some(Timestamp {
972 seconds,
973 nanos: nanos as i32,
974 })
975 };
976 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
977 Self {
978 fields: fields_with_meta.fields.0,
979 schema_metadata: m
980 .schema
981 .metadata
982 .iter()
983 .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
984 .collect(),
985 version: m.version,
986 branch: m.branch.clone(),
987 writer_version: m
988 .writer_version
989 .as_ref()
990 .map(|wv| pb::manifest::WriterVersion {
991 library: wv.library.clone(),
992 version: wv.version.clone(),
993 prerelease: wv.prerelease.clone(),
994 build_metadata: wv.build_metadata.clone(),
995 }),
996 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
997 table_metadata: m.table_metadata.clone(),
998 version_aux_data: m.version_aux_data as u64,
999 index_section: m.index_section.map(|i| i as u64),
1000 timestamp: timestamp_nanos,
1001 tag: m.tag.clone().unwrap_or_default(),
1002 reader_feature_flags: m.reader_feature_flags,
1003 writer_feature_flags: m.writer_feature_flags,
1004 max_fragment_id: m.max_fragment_id,
1005 transaction_file: m.transaction_file.clone().unwrap_or_default(),
1006 next_row_id: m.next_row_id,
1007 data_format: Some(pb::manifest::DataStorageFormat {
1008 file_format: m.data_storage_format.file_format.clone(),
1009 version: m.data_storage_format.version.clone(),
1010 }),
1011 config: m.config.clone(),
1012 blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
1013 base_paths: m
1014 .base_paths
1015 .values()
1016 .map(|base_path| pb::BasePath {
1017 id: base_path.id,
1018 name: base_path.name.clone(),
1019 is_dataset_root: base_path.is_dataset_root,
1020 path: base_path.path.clone(),
1021 })
1022 .collect(),
1023 }
1024 }
1025}
1026
1027#[async_trait]
1028pub trait SelfDescribingFileReader {
1029 async fn try_new_self_described(
1037 object_store: &ObjectStore,
1038 path: &Path,
1039 cache: Option<&LanceCache>,
1040 ) -> Result<Self>
1041 where
1042 Self: Sized,
1043 {
1044 let reader = object_store.open(path).await?;
1045 Self::try_new_self_described_from_reader(reader.into(), cache).await
1046 }
1047
1048 async fn try_new_self_described_from_reader(
1049 reader: Arc<dyn Reader>,
1050 cache: Option<&LanceCache>,
1051 ) -> Result<Self>
1052 where
1053 Self: Sized;
1054}
1055
1056#[async_trait]
1057impl SelfDescribingFileReader for FileReader {
1058 async fn try_new_self_described_from_reader(
1059 reader: Arc<dyn Reader>,
1060 cache: Option<&LanceCache>,
1061 ) -> Result<Self> {
1062 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
1063 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
1064 message: format!(
1065 "Attempt to open file at {} as self-describing but it did not contain a manifest",
1066 reader.path(),
1067 ),
1068 location: location!(),
1069 })?;
1070 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
1071 if manifest.should_use_legacy_format() {
1072 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1073 }
1074 let schema = manifest.schema;
1075 let max_field_id = schema.max_field_id().unwrap_or_default();
1076 Self::try_new_from_reader(
1077 reader.path(),
1078 reader.clone(),
1079 Some(metadata),
1080 schema,
1081 0,
1082 0,
1083 max_field_id,
1084 cache,
1085 )
1086 .await
1087 }
1088}
1089
1090#[cfg(test)]
1091mod tests {
1092 use crate::format::{DataFile, DeletionFile, DeletionFileType};
1093 use std::num::NonZero;
1094
1095 use super::*;
1096
1097 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
1098 use lance_core::datatypes::Field;
1099
1100 #[test]
1101 fn test_writer_version() {
1102 let wv = WriterVersion::default();
1103 assert_eq!(wv.library, "lance");
1104
1105 let cargo_version = env!("CARGO_PKG_VERSION");
1107 let expected_tag = if cargo_version.contains('-') {
1108 Some(cargo_version.split('-').nth(1).unwrap())
1109 } else {
1110 None
1111 };
1112
1113 let version_parts: Vec<&str> = wv.version.split('.').collect();
1115 assert_eq!(
1116 version_parts.len(),
1117 3,
1118 "Version should be major.minor.patch"
1119 );
1120 assert!(
1121 !wv.version.contains('-'),
1122 "Version field should not contain prerelease"
1123 );
1124
1125 assert_eq!(wv.prerelease.as_deref(), expected_tag);
1127 assert_eq!(wv.build_metadata, None);
1129
1130 let version = wv.lance_lib_version().unwrap();
1132 assert_eq!(
1133 version.major,
1134 env!("CARGO_PKG_VERSION_MAJOR").parse::<u64>().unwrap()
1135 );
1136 assert_eq!(
1137 version.minor,
1138 env!("CARGO_PKG_VERSION_MINOR").parse::<u64>().unwrap()
1139 );
1140 assert_eq!(
1141 version.patch,
1142 env!("CARGO_PKG_VERSION_PATCH").parse::<u64>().unwrap() + 1
1144 );
1145 assert_eq!(version.pre.as_str(), expected_tag.unwrap_or(""));
1146
1147 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1148 let mut bumped_version = version.clone();
1149 bump_version(&mut bumped_version, *part);
1150 assert!(version < bumped_version);
1151 }
1152 }
1153
1154 #[test]
1155 fn test_writer_version_split() {
1156 let (version, prerelease, build_metadata) =
1158 WriterVersion::split_version("2.0.0-rc.1").unwrap();
1159 assert_eq!(version, "2.0.0");
1160 assert_eq!(prerelease, Some("rc.1".to_string()));
1161 assert_eq!(build_metadata, None);
1162
1163 let (version, prerelease, build_metadata) = WriterVersion::split_version("2.0.0").unwrap();
1165 assert_eq!(version, "2.0.0");
1166 assert_eq!(prerelease, None);
1167 assert_eq!(build_metadata, None);
1168
1169 let (version, prerelease, build_metadata) =
1171 WriterVersion::split_version("2.0.0-rc.1+build.123").unwrap();
1172 assert_eq!(version, "2.0.0");
1173 assert_eq!(prerelease, Some("rc.1".to_string()));
1174 assert_eq!(build_metadata, Some("build.123".to_string()));
1175
1176 let (version, prerelease, build_metadata) =
1178 WriterVersion::split_version("2.0.0+build.123").unwrap();
1179 assert_eq!(version, "2.0.0");
1180 assert_eq!(prerelease, None);
1181 assert_eq!(build_metadata, Some("build.123".to_string()));
1182
1183 assert!(WriterVersion::split_version("not-a-version").is_none());
1185 }
1186
1187 #[test]
1188 fn test_writer_version_comparison_with_prerelease() {
1189 let v1 = WriterVersion {
1190 library: "lance".to_string(),
1191 version: "2.0.0".to_string(),
1192 prerelease: Some("rc.1".to_string()),
1193 build_metadata: None,
1194 };
1195
1196 let v2 = WriterVersion {
1197 library: "lance".to_string(),
1198 version: "2.0.0".to_string(),
1199 prerelease: None,
1200 build_metadata: None,
1201 };
1202
1203 let semver1 = v1.lance_lib_version().unwrap();
1204 let semver2 = v2.lance_lib_version().unwrap();
1205
1206 assert!(semver1 < semver2);
1208 }
1209
1210 #[test]
1211 fn test_writer_version_with_build_metadata() {
1212 let v = WriterVersion {
1213 library: "lance".to_string(),
1214 version: "2.0.0".to_string(),
1215 prerelease: Some("rc.1".to_string()),
1216 build_metadata: Some("build.123".to_string()),
1217 };
1218
1219 let semver = v.lance_lib_version().unwrap();
1220 assert_eq!(semver.to_string(), "2.0.0-rc.1+build.123");
1221 assert_eq!(semver.major, 2);
1222 assert_eq!(semver.minor, 0);
1223 assert_eq!(semver.patch, 0);
1224 assert_eq!(semver.pre.as_str(), "rc.1");
1225 assert_eq!(semver.build.as_str(), "build.123");
1226 }
1227
1228 #[test]
1229 fn test_writer_version_non_semver() {
1230 let v = WriterVersion {
1232 library: "lance".to_string(),
1233 version: "custom-build-v1".to_string(),
1234 prerelease: None,
1235 build_metadata: None,
1236 };
1237
1238 assert!(v.lance_lib_version().is_none());
1240
1241 assert_eq!(v.library, "lance");
1243 assert_eq!(v.version, "custom-build-v1");
1244 }
1245
1246 #[test]
1247 #[allow(deprecated)]
1248 fn test_older_than_with_prerelease() {
1249 let v_rc = WriterVersion {
1251 library: "lance".to_string(),
1252 version: "2.0.0".to_string(),
1253 prerelease: Some("rc.1".to_string()),
1254 build_metadata: None,
1255 };
1256
1257 assert!(v_rc.older_than(2, 0, 0));
1259
1260 assert!(v_rc.older_than(2, 0, 1));
1262
1263 assert!(!v_rc.older_than(1, 9, 9));
1265
1266 let v_release = WriterVersion {
1267 library: "lance".to_string(),
1268 version: "2.0.0".to_string(),
1269 prerelease: None,
1270 build_metadata: None,
1271 };
1272
1273 assert!(!v_release.older_than(2, 0, 0));
1275
1276 assert!(v_release.older_than(2, 0, 1));
1278 }
1279
1280 #[test]
1281 fn test_fragments_by_offset_range() {
1282 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1283 "a",
1284 arrow_schema::DataType::Int64,
1285 false,
1286 )]);
1287 let schema = Schema::try_from(&arrow_schema).unwrap();
1288 let fragments = vec![
1289 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1290 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1291 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1292 ];
1293 let manifest = Manifest::new(
1294 schema,
1295 Arc::new(fragments),
1296 DataStorageFormat::default(),
1297 None,
1298 HashMap::new(),
1299 );
1300
1301 let actual = manifest.fragments_by_offset_range(0..10);
1302 assert_eq!(actual.len(), 1);
1303 assert_eq!(actual[0].0, 0);
1304 assert_eq!(actual[0].1.id, 0);
1305
1306 let actual = manifest.fragments_by_offset_range(5..15);
1307 assert_eq!(actual.len(), 2);
1308 assert_eq!(actual[0].0, 0);
1309 assert_eq!(actual[0].1.id, 0);
1310 assert_eq!(actual[1].0, 10);
1311 assert_eq!(actual[1].1.id, 1);
1312
1313 let actual = manifest.fragments_by_offset_range(15..50);
1314 assert_eq!(actual.len(), 2);
1315 assert_eq!(actual[0].0, 10);
1316 assert_eq!(actual[0].1.id, 1);
1317 assert_eq!(actual[1].0, 25);
1318 assert_eq!(actual[1].1.id, 2);
1319
1320 let actual = manifest.fragments_by_offset_range(45..100);
1322 assert!(actual.is_empty());
1323
1324 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1325 }
1326
1327 #[test]
1328 fn test_max_field_id() {
1329 let mut field0 =
1331 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1332 field0.set_id(-1, &mut 0);
1333 let mut field2 =
1334 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1335 field2.set_id(-1, &mut 2);
1336
1337 let schema = Schema {
1338 fields: vec![field0, field2],
1339 metadata: Default::default(),
1340 };
1341 let fragments = vec![
1342 Fragment {
1343 id: 0,
1344 files: vec![DataFile::new_legacy_from_fields(
1345 "path1",
1346 vec![0, 1, 2],
1347 None,
1348 )],
1349 deletion_file: None,
1350 row_id_meta: None,
1351 physical_rows: None,
1352 created_at_version_meta: None,
1353 last_updated_at_version_meta: None,
1354 },
1355 Fragment {
1356 id: 1,
1357 files: vec![
1358 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1359 DataFile::new_legacy_from_fields("path3", vec![2], None),
1360 ],
1361 deletion_file: None,
1362 row_id_meta: None,
1363 physical_rows: None,
1364 created_at_version_meta: None,
1365 last_updated_at_version_meta: None,
1366 },
1367 ];
1368
1369 let manifest = Manifest::new(
1370 schema,
1371 Arc::new(fragments),
1372 DataStorageFormat::default(),
1373 None,
1374 HashMap::new(),
1375 );
1376
1377 assert_eq!(manifest.max_field_id(), 43);
1378 }
1379
1380 #[test]
1381 fn test_config() {
1382 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1383 "a",
1384 arrow_schema::DataType::Int64,
1385 false,
1386 )]);
1387 let schema = Schema::try_from(&arrow_schema).unwrap();
1388 let fragments = vec![
1389 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1390 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1391 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1392 ];
1393 let mut manifest = Manifest::new(
1394 schema,
1395 Arc::new(fragments),
1396 DataStorageFormat::default(),
1397 None,
1398 HashMap::new(),
1399 );
1400
1401 let mut config = manifest.config.clone();
1402 config.insert("lance.test".to_string(), "value".to_string());
1403 config.insert("other-key".to_string(), "other-value".to_string());
1404
1405 manifest.config_mut().extend(config.clone());
1406 assert_eq!(manifest.config, config.clone());
1407
1408 config.remove("other-key");
1409 manifest.config_mut().remove("other-key");
1410 assert_eq!(manifest.config, config);
1411 }
1412
1413 #[test]
1414 fn test_manifest_summary() {
1415 let arrow_schema = ArrowSchema::new(vec![
1417 ArrowField::new("id", arrow_schema::DataType::Int64, false),
1418 ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1419 ]);
1420 let schema = Schema::try_from(&arrow_schema).unwrap();
1421
1422 let empty_manifest = Manifest::new(
1423 schema.clone(),
1424 Arc::new(vec![]),
1425 DataStorageFormat::default(),
1426 None,
1427 HashMap::new(),
1428 );
1429
1430 let empty_summary = empty_manifest.summary();
1431 assert_eq!(empty_summary.total_rows, 0);
1432 assert_eq!(empty_summary.total_files_size, 0);
1433 assert_eq!(empty_summary.total_fragments, 0);
1434 assert_eq!(empty_summary.total_data_files, 0);
1435 assert_eq!(empty_summary.total_deletion_file_rows, 0);
1436 assert_eq!(empty_summary.total_data_file_rows, 0);
1437 assert_eq!(empty_summary.total_deletion_files, 0);
1438
1439 let empty_fragments = vec![
1441 Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1442 Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1443 ];
1444
1445 let empty_files_manifest = Manifest::new(
1446 schema.clone(),
1447 Arc::new(empty_fragments),
1448 DataStorageFormat::default(),
1449 None,
1450 HashMap::new(),
1451 );
1452
1453 let empty_files_summary = empty_files_manifest.summary();
1454 assert_eq!(empty_files_summary.total_rows, 0);
1455 assert_eq!(empty_files_summary.total_files_size, 0);
1456 assert_eq!(empty_files_summary.total_fragments, 2);
1457 assert_eq!(empty_files_summary.total_data_files, 2);
1458 assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1459 assert_eq!(empty_files_summary.total_data_file_rows, 0);
1460 assert_eq!(empty_files_summary.total_deletion_files, 0);
1461
1462 let real_fragments = vec![
1464 Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1465 Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1466 Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1467 ];
1468
1469 let real_data_manifest = Manifest::new(
1470 schema.clone(),
1471 Arc::new(real_fragments),
1472 DataStorageFormat::default(),
1473 None,
1474 HashMap::new(),
1475 );
1476
1477 let real_data_summary = real_data_manifest.summary();
1478 assert_eq!(real_data_summary.total_rows, 425); assert_eq!(real_data_summary.total_files_size, 0); assert_eq!(real_data_summary.total_fragments, 3);
1481 assert_eq!(real_data_summary.total_data_files, 3);
1482 assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1483 assert_eq!(real_data_summary.total_data_file_rows, 425);
1484 assert_eq!(real_data_summary.total_deletion_files, 0);
1485
1486 let file_version = LanceFileVersion::default();
1487 let mut fragment_with_deletion = Fragment::new(0)
1489 .with_file(
1490 "data_with_deletion.lance",
1491 vec![0, 1],
1492 vec![0, 1],
1493 &file_version,
1494 NonZero::new(1000),
1495 )
1496 .with_physical_rows(50);
1497 fragment_with_deletion.deletion_file = Some(DeletionFile {
1498 read_version: 123,
1499 id: 456,
1500 file_type: DeletionFileType::Array,
1501 num_deleted_rows: Some(10),
1502 base_id: None,
1503 });
1504
1505 let manifest_with_deletion = Manifest::new(
1506 schema,
1507 Arc::new(vec![fragment_with_deletion]),
1508 DataStorageFormat::default(),
1509 None,
1510 HashMap::new(),
1511 );
1512
1513 let deletion_summary = manifest_with_deletion.summary();
1514 assert_eq!(deletion_summary.total_rows, 40); assert_eq!(deletion_summary.total_files_size, 1000);
1516 assert_eq!(deletion_summary.total_fragments, 1);
1517 assert_eq!(deletion_summary.total_data_files, 1);
1518 assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1519 assert_eq!(deletion_summary.total_data_file_rows, 50);
1520 assert_eq!(deletion_summary.total_deletion_files, 1);
1521
1522 let stats_map: BTreeMap<String, String> = deletion_summary.into();
1524 assert_eq!(stats_map.len(), 7)
1525 }
1526}