1use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{Fields, FieldsWithMeta, populate_schema_dictionary};
8use lance_file::previous::reader::FileReader as PreviousFileReader;
9use lance_file::version::{LEGACY_FORMAT_VERSION, LanceFileVersion};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{FLAG_STABLE_ROW_IDS, has_deprecated_v2_feature_flag};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::Schema;
23use lance_core::{Error, Result};
24use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
25use lance_io::utils::read_struct;
26
27#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
34pub struct Manifest {
35 pub schema: Schema,
37
38 pub version: u64,
40
41 pub branch: Option<String>,
43
44 pub writer_version: Option<WriterVersion>,
46
47 pub fragments: Arc<Vec<Fragment>>,
52
53 pub version_aux_data: usize,
55
56 pub index_section: Option<usize>,
58
59 pub timestamp_nanos: u128,
61
62 pub tag: Option<String>,
64
65 pub reader_feature_flags: u64,
67
68 pub writer_feature_flags: u64,
70
71 pub max_fragment_id: Option<u32>,
74
75 pub transaction_file: Option<String>,
77
78 pub transaction_section: Option<usize>,
80
81 fragment_offsets: Vec<usize>,
84
85 pub next_row_id: u64,
87
88 pub data_storage_format: DataStorageFormat,
90
91 pub config: HashMap<String, String>,
93
94 pub table_metadata: HashMap<String, String>,
100
101 pub base_paths: HashMap<u32, BasePath>,
103}
104
105pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
107
108pub fn is_detached_version(version: u64) -> bool {
109 version & DETACHED_VERSION_MASK != 0
110}
111
112fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
113 fragments
114 .iter()
115 .map(|f| f.num_rows().unwrap_or_default())
116 .chain([0]) .scan(0_usize, |offset, len| {
118 let start = *offset;
119 *offset += len;
120 Some(start)
121 })
122 .collect()
123}
124
125#[derive(Default)]
126pub struct ManifestSummary {
127 pub total_fragments: u64,
128 pub total_data_files: u64,
129 pub total_files_size: u64,
130 pub total_deletion_files: u64,
131 pub total_data_file_rows: u64,
132 pub total_deletion_file_rows: u64,
133 pub total_rows: u64,
134}
135
136impl From<ManifestSummary> for BTreeMap<String, String> {
137 fn from(summary: ManifestSummary) -> Self {
138 let mut stats_map = Self::new();
139 stats_map.insert(
140 "total_fragments".to_string(),
141 summary.total_fragments.to_string(),
142 );
143 stats_map.insert(
144 "total_data_files".to_string(),
145 summary.total_data_files.to_string(),
146 );
147 stats_map.insert(
148 "total_files_size".to_string(),
149 summary.total_files_size.to_string(),
150 );
151 stats_map.insert(
152 "total_deletion_files".to_string(),
153 summary.total_deletion_files.to_string(),
154 );
155 stats_map.insert(
156 "total_data_file_rows".to_string(),
157 summary.total_data_file_rows.to_string(),
158 );
159 stats_map.insert(
160 "total_deletion_file_rows".to_string(),
161 summary.total_deletion_file_rows.to_string(),
162 );
163 stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
164 stats_map
165 }
166}
167
168impl Manifest {
169 pub fn new(
170 schema: Schema,
171 fragments: Arc<Vec<Fragment>>,
172 data_storage_format: DataStorageFormat,
173 base_paths: HashMap<u32, BasePath>,
174 ) -> Self {
175 let fragment_offsets = compute_fragment_offsets(&fragments);
176
177 Self {
178 schema,
179 version: 1,
180 branch: None,
181 writer_version: Some(WriterVersion::default()),
182 fragments,
183 version_aux_data: 0,
184 index_section: None,
185 timestamp_nanos: 0,
186 tag: None,
187 reader_feature_flags: 0,
188 writer_feature_flags: 0,
189 max_fragment_id: None,
190 transaction_file: None,
191 transaction_section: None,
192 fragment_offsets,
193 next_row_id: 0,
194 data_storage_format,
195 config: HashMap::new(),
196 table_metadata: HashMap::new(),
197 base_paths,
198 }
199 }
200
201 pub fn new_from_previous(
202 previous: &Self,
203 schema: Schema,
204 fragments: Arc<Vec<Fragment>>,
205 ) -> Self {
206 let fragment_offsets = compute_fragment_offsets(&fragments);
207
208 Self {
209 schema,
210 version: previous.version + 1,
211 branch: previous.branch.clone(),
212 writer_version: Some(WriterVersion::default()),
213 fragments,
214 version_aux_data: 0,
215 index_section: None, timestamp_nanos: 0, tag: None,
218 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
221 transaction_file: None,
222 transaction_section: None,
223 fragment_offsets,
224 next_row_id: previous.next_row_id,
225 data_storage_format: previous.data_storage_format.clone(),
226 config: previous.config.clone(),
227 table_metadata: previous.table_metadata.clone(),
228 base_paths: previous.base_paths.clone(),
229 }
230 }
231
232 pub fn shallow_clone(
237 &self,
238 ref_name: Option<String>,
239 ref_path: String,
240 ref_base_id: u32,
241 branch_name: Option<String>,
242 transaction_file: String,
243 ) -> Self {
244 let cloned_fragments = self
245 .fragments
246 .as_ref()
247 .iter()
248 .map(|fragment| {
249 let mut cloned_fragment = fragment.clone();
250 for file in &mut cloned_fragment.files {
251 if file.base_id.is_none() {
252 file.base_id = Some(ref_base_id);
253 }
254 }
255
256 if let Some(deletion) = &mut cloned_fragment.deletion_file
257 && deletion.base_id.is_none()
258 {
259 deletion.base_id = Some(ref_base_id);
260 }
261 cloned_fragment
262 })
263 .collect::<Vec<_>>();
264
265 Self {
266 schema: self.schema.clone(),
267 version: self.version,
268 branch: branch_name,
269 writer_version: self.writer_version.clone(),
270 fragments: Arc::new(cloned_fragments),
271 version_aux_data: self.version_aux_data,
272 index_section: None, timestamp_nanos: self.timestamp_nanos,
274 tag: None,
275 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: self.max_fragment_id,
278 transaction_file: Some(transaction_file),
279 transaction_section: None,
280 fragment_offsets: self.fragment_offsets.clone(),
281 next_row_id: self.next_row_id,
282 data_storage_format: self.data_storage_format.clone(),
283 config: self.config.clone(),
284 base_paths: {
285 let mut base_paths = self.base_paths.clone();
286 let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
287 base_paths.insert(ref_base_id, base_path);
288 base_paths
289 },
290 table_metadata: self.table_metadata.clone(),
291 }
292 }
293
294 pub fn timestamp(&self) -> DateTime<Utc> {
296 let nanos = self.timestamp_nanos % 1_000_000_000;
297 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
298 Utc.from_utc_datetime(
299 &DateTime::from_timestamp(seconds, nanos as u32)
300 .unwrap_or_default()
301 .naive_utc(),
302 )
303 }
304
305 pub fn set_timestamp(&mut self, nanos: u128) {
307 self.timestamp_nanos = nanos;
308 }
309
310 pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
312 &mut self.config
313 }
314
315 pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
317 &mut self.table_metadata
318 }
319
320 pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
322 &mut self.schema.metadata
323 }
324
325 pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
329 self.schema
330 .field_by_id_mut(field_id)
331 .map(|field| &mut field.metadata)
332 }
333
334 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
336 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
337 self.config.extend(upsert_values);
338 }
339
340 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
342 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
343 self.config
344 .retain(|key, _| !delete_keys.contains(&key.as_str()));
345 }
346
347 #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
349 pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
350 self.schema.metadata = new_metadata;
351 }
352
353 #[deprecated(
357 note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
358 )]
359 pub fn replace_field_metadata(
360 &mut self,
361 field_id: i32,
362 new_metadata: HashMap<String, String>,
363 ) -> Result<()> {
364 if let Some(field) = self.schema.field_by_id_mut(field_id) {
365 field.metadata = new_metadata;
366 Ok(())
367 } else {
368 Err(Error::invalid_input(format!(
369 "Field with id {} does not exist for replace_field_metadata",
370 field_id
371 )))
372 }
373 }
374
375 pub fn update_max_fragment_id(&mut self) {
377 if self.fragments.is_empty() {
379 return;
380 }
381
382 let max_fragment_id = self
383 .fragments
384 .iter()
385 .map(|f| f.id)
386 .max()
387 .unwrap() .try_into()
389 .unwrap();
390
391 match self.max_fragment_id {
392 None => {
393 self.max_fragment_id = Some(max_fragment_id);
395 }
396 Some(current_max) => {
397 if max_fragment_id > current_max {
400 self.max_fragment_id = Some(max_fragment_id);
401 }
402 }
403 }
404 }
405
406 pub fn max_fragment_id(&self) -> Option<u64> {
411 if let Some(max_id) = self.max_fragment_id {
412 Some(max_id.into())
414 } else {
415 self.fragments.iter().map(|f| f.id).max()
417 }
418 }
419
420 pub fn max_field_id(&self) -> i32 {
425 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
426 let fragment_max_id = self
427 .fragments
428 .iter()
429 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
430 .max()
431 .copied();
432 let fragment_max_id = fragment_max_id.unwrap_or(-1);
433 schema_max_id.max(fragment_max_id)
434 }
435
436 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
439 if since.version >= self.version {
440 return Err(Error::invalid_input(format!(
441 "fragments_since: given version {} is newer than manifest version {}",
442 since.version, self.version
443 )));
444 }
445 let start = since.max_fragment_id();
446 Ok(self
447 .fragments
448 .iter()
449 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
450 .cloned()
451 .collect())
452 }
453
454 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
470 let start = range.start;
471 let end = range.end;
472 let idx = self
473 .fragment_offsets
474 .binary_search(&start)
475 .unwrap_or_else(|idx| idx - 1);
476
477 let mut fragments = vec![];
478 for i in idx..self.fragments.len() {
479 if self.fragment_offsets[i] >= end
480 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
481 <= start
482 {
483 break;
484 }
485 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
486 }
487
488 fragments
489 }
490
491 pub fn uses_stable_row_ids(&self) -> bool {
493 self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
494 }
495
496 pub fn serialized(&self) -> Vec<u8> {
499 let pb_manifest: pb::Manifest = self.into();
500 pb_manifest.encode_to_vec()
501 }
502
503 pub fn should_use_legacy_format(&self) -> bool {
504 self.data_storage_format.version == LEGACY_FORMAT_VERSION
505 }
506
507 pub fn summary(&self) -> ManifestSummary {
518 let mut summary =
520 self.fragments
521 .iter()
522 .fold(ManifestSummary::default(), |mut summary, f| {
523 summary.total_data_files += f.files.len() as u64;
525 if let Some(num_rows) = f.num_rows() {
527 summary.total_rows += num_rows as u64;
528 }
529 for data_file in &f.files {
531 if let Some(size_bytes) = data_file.file_size_bytes.get() {
532 summary.total_files_size += size_bytes.get();
533 }
534 }
535 if f.deletion_file.is_some() {
537 summary.total_deletion_files += 1;
538 }
539 if let Some(deletion_file) = &f.deletion_file
541 && let Some(num_deleted) = deletion_file.num_deleted_rows
542 {
543 summary.total_deletion_file_rows += num_deleted as u64;
544 }
545 summary
546 });
547 summary.total_fragments = self.fragments.len() as u64;
548 summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
549
550 summary
551 }
552}
553
554#[derive(Debug, Clone, PartialEq)]
555pub struct BasePath {
556 pub id: u32,
557 pub name: Option<String>,
558 pub is_dataset_root: bool,
559 pub path: String,
561}
562
563impl BasePath {
564 pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
573 Self {
574 id,
575 name,
576 is_dataset_root,
577 path,
578 }
579 }
580
581 pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
585 ObjectStore::extract_path_from_uri(registry, &self.path)
586 }
587}
588
589impl DeepSizeOf for BasePath {
590 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
591 self.name.deep_size_of_children(context)
592 + self.path.deep_size_of_children(context) * 2
593 + size_of::<bool>()
594 }
595}
596
597#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
598pub struct WriterVersion {
599 pub library: String,
600 pub version: String,
601 pub prerelease: Option<String>,
602 pub build_metadata: Option<String>,
603}
604
605#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
606pub struct DataStorageFormat {
607 pub file_format: String,
608 pub version: String,
609}
610
611const LANCE_FORMAT_NAME: &str = "lance";
612
613impl DataStorageFormat {
614 pub fn new(version: LanceFileVersion) -> Self {
615 Self {
616 file_format: LANCE_FORMAT_NAME.to_string(),
617 version: version.resolve().to_string(),
618 }
619 }
620
621 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
622 self.version.parse::<LanceFileVersion>()
623 }
624}
625
626impl Default for DataStorageFormat {
627 fn default() -> Self {
628 Self::new(LanceFileVersion::default())
629 }
630}
631
632impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
633 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
634 Self {
635 file_format: pb.file_format,
636 version: pb.version,
637 }
638 }
639}
640
641#[derive(Debug, Clone, Copy, PartialEq, Eq)]
642pub enum VersionPart {
643 Major,
644 Minor,
645 Patch,
646}
647
648fn bump_version(version: &mut semver::Version, part: VersionPart) {
649 match part {
650 VersionPart::Major => {
651 version.major += 1;
652 version.minor = 0;
653 version.patch = 0;
654 }
655 VersionPart::Minor => {
656 version.minor += 1;
657 version.patch = 0;
658 }
659 VersionPart::Patch => {
660 version.patch += 1;
661 }
662 }
663}
664
665impl WriterVersion {
666 fn split_version(full_version: &str) -> Option<(String, Option<String>, Option<String>)> {
676 let mut parsed = semver::Version::parse(full_version).ok()?;
677
678 let prerelease = if parsed.pre.is_empty() {
679 None
680 } else {
681 Some(parsed.pre.to_string())
682 };
683
684 let build_metadata = if parsed.build.is_empty() {
685 None
686 } else {
687 Some(parsed.build.to_string())
688 };
689
690 parsed.pre = semver::Prerelease::EMPTY;
692 parsed.build = semver::BuildMetadata::EMPTY;
693 Some((parsed.to_string(), prerelease, build_metadata))
694 }
695
696 #[deprecated(note = "Use `lance_lib_version()` instead")]
699 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
700 let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
702 (
703 &self.version[..dash_idx],
704 Some(&self.version[dash_idx + 1..]),
705 )
706 } else {
707 (self.version.as_str(), None)
708 };
709
710 let mut parts = version_part.split('.');
711 let major = parts.next().unwrap_or("0").parse().ok()?;
712 let minor = parts.next().unwrap_or("0").parse().ok()?;
713 let patch = parts.next().unwrap_or("0").parse().ok()?;
714
715 Some((major, minor, patch, tag))
716 }
717
718 pub fn lance_lib_version(&self) -> Option<semver::Version> {
726 if self.library != "lance" {
727 return None;
728 }
729
730 let mut version = semver::Version::parse(&self.version).ok()?;
731
732 if let Some(ref prerelease) = self.prerelease {
733 version.pre = semver::Prerelease::new(prerelease).ok()?;
734 }
735
736 if let Some(ref build_metadata) = self.build_metadata {
737 version.build = semver::BuildMetadata::new(build_metadata).ok()?;
738 }
739
740 Some(version)
741 }
742
743 #[deprecated(
744 note = "Use `lance_lib_version()` instead, which safely checks the library field and returns Option"
745 )]
746 #[allow(deprecated)]
747 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
748 self.semver()
749 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
750 }
751
752 #[deprecated(note = "Use `lance_lib_version()` and its `older_than` method instead.")]
758 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
759 let version = self
760 .lance_lib_version()
761 .expect("Not lance library or invalid version");
762 let other = semver::Version {
763 major: major.into(),
764 minor: minor.into(),
765 patch: patch.into(),
766 pre: semver::Prerelease::EMPTY,
767 build: semver::BuildMetadata::EMPTY,
768 };
769 version < other
770 }
771
772 #[deprecated(note = "This is meant for testing and will be made private in future version.")]
773 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
774 let mut version = self.lance_lib_version().expect("Should be lance version");
775 bump_version(&mut version, part);
776 if !keep_tag {
777 version.pre = semver::Prerelease::EMPTY;
778 }
779 let (clean_version, prerelease, build_metadata) = Self::split_version(&version.to_string())
780 .expect("Bumped version should be valid semver");
781 Self {
782 library: self.library.clone(),
783 version: clean_version,
784 prerelease,
785 build_metadata,
786 }
787 }
788}
789
790impl Default for WriterVersion {
791 #[cfg(not(test))]
792 fn default() -> Self {
793 let full_version = env!("CARGO_PKG_VERSION");
794 let (version, prerelease, build_metadata) =
795 Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
796 Self {
797 library: "lance".to_string(),
798 version,
799 prerelease,
800 build_metadata,
801 }
802 }
803
804 #[cfg(test)]
806 #[allow(deprecated)]
807 fn default() -> Self {
808 let full_version = env!("CARGO_PKG_VERSION");
809 let (version, prerelease, build_metadata) =
810 Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
811 Self {
812 library: "lance".to_string(),
813 version,
814 prerelease,
815 build_metadata,
816 }
817 .bump(VersionPart::Patch, true)
818 }
819}
820
821impl ProtoStruct for Manifest {
822 type Proto = pb::Manifest;
823}
824
825impl From<pb::BasePath> for BasePath {
826 fn from(p: pb::BasePath) -> Self {
827 Self::new(p.id, p.path, p.name, p.is_dataset_root)
828 }
829}
830
831impl From<BasePath> for pb::BasePath {
832 fn from(p: BasePath) -> Self {
833 Self {
834 id: p.id,
835 name: p.name,
836 is_dataset_root: p.is_dataset_root,
837 path: p.path,
838 }
839 }
840}
841
842impl TryFrom<pb::Manifest> for Manifest {
843 type Error = Error;
844
845 fn try_from(p: pb::Manifest) -> Result<Self> {
846 let timestamp_nanos = p.timestamp.map(|ts| {
847 let sec = ts.seconds as u128 * 1e9 as u128;
848 let nanos = ts.nanos as u128;
849 sec + nanos
850 });
851 let writer_version = match p.writer_version {
853 Some(pb::manifest::WriterVersion {
854 library,
855 version,
856 prerelease,
857 build_metadata,
858 }) => Some(WriterVersion {
859 library,
860 version,
861 prerelease,
862 build_metadata,
863 }),
864 _ => None,
865 };
866 let fragments = Arc::new(
867 p.fragments
868 .into_iter()
869 .map(Fragment::try_from)
870 .collect::<Result<Vec<_>>>()?,
871 );
872 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
873 let fields_with_meta = FieldsWithMeta {
874 fields: Fields(p.fields),
875 metadata: p.schema_metadata,
876 };
877
878 if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
879 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
880 {
881 return Err(Error::internal("All fragments must have row ids"));
882 }
883
884 let data_storage_format = match p.data_format {
885 None => {
886 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
887 DataStorageFormat::new(inferred_version)
889 } else {
890 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
892 DataStorageFormat::new(LanceFileVersion::Stable)
893 } else {
894 DataStorageFormat::new(LanceFileVersion::Legacy)
895 }
896 }
897 }
898 Some(format) => DataStorageFormat::from(format),
899 };
900
901 let schema = Schema::from(fields_with_meta);
902
903 Ok(Self {
904 schema,
905 version: p.version,
906 branch: p.branch,
907 writer_version,
908 version_aux_data: p.version_aux_data as usize,
909 index_section: p.index_section.map(|i| i as usize),
910 timestamp_nanos: timestamp_nanos.unwrap_or(0),
911 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
912 reader_feature_flags: p.reader_feature_flags,
913 writer_feature_flags: p.writer_feature_flags,
914 max_fragment_id: p.max_fragment_id,
915 fragments,
916 transaction_file: if p.transaction_file.is_empty() {
917 None
918 } else {
919 Some(p.transaction_file)
920 },
921 transaction_section: p.transaction_section.map(|i| i as usize),
922 fragment_offsets,
923 next_row_id: p.next_row_id,
924 data_storage_format,
925 config: p.config,
926 table_metadata: p.table_metadata,
927 base_paths: p
928 .base_paths
929 .iter()
930 .map(|item| (item.id, item.clone().into()))
931 .collect(),
932 })
933 }
934}
935
936impl From<&Manifest> for pb::Manifest {
937 fn from(m: &Manifest) -> Self {
938 let timestamp_nanos = if m.timestamp_nanos == 0 {
939 None
940 } else {
941 let nanos = m.timestamp_nanos % 1e9 as u128;
942 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
943 Some(Timestamp {
944 seconds,
945 nanos: nanos as i32,
946 })
947 };
948 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
949 Self {
950 fields: fields_with_meta.fields.0,
951 schema_metadata: m
952 .schema
953 .metadata
954 .iter()
955 .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
956 .collect(),
957 version: m.version,
958 branch: m.branch.clone(),
959 writer_version: m
960 .writer_version
961 .as_ref()
962 .map(|wv| pb::manifest::WriterVersion {
963 library: wv.library.clone(),
964 version: wv.version.clone(),
965 prerelease: wv.prerelease.clone(),
966 build_metadata: wv.build_metadata.clone(),
967 }),
968 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
969 table_metadata: m.table_metadata.clone(),
970 version_aux_data: m.version_aux_data as u64,
971 index_section: m.index_section.map(|i| i as u64),
972 timestamp: timestamp_nanos,
973 tag: m.tag.clone().unwrap_or_default(),
974 reader_feature_flags: m.reader_feature_flags,
975 writer_feature_flags: m.writer_feature_flags,
976 max_fragment_id: m.max_fragment_id,
977 transaction_file: m.transaction_file.clone().unwrap_or_default(),
978 next_row_id: m.next_row_id,
979 data_format: Some(pb::manifest::DataStorageFormat {
980 file_format: m.data_storage_format.file_format.clone(),
981 version: m.data_storage_format.version.clone(),
982 }),
983 config: m.config.clone(),
984 base_paths: m
985 .base_paths
986 .values()
987 .map(|base_path| pb::BasePath {
988 id: base_path.id,
989 name: base_path.name.clone(),
990 is_dataset_root: base_path.is_dataset_root,
991 path: base_path.path.clone(),
992 })
993 .collect(),
994 transaction_section: m.transaction_section.map(|i| i as u64),
995 }
996 }
997}
998
999#[async_trait]
1000pub trait SelfDescribingFileReader {
1001 async fn try_new_self_described(
1009 object_store: &ObjectStore,
1010 path: &Path,
1011 cache: Option<&LanceCache>,
1012 ) -> Result<Self>
1013 where
1014 Self: Sized,
1015 {
1016 let reader = object_store.open(path).await?;
1017 Self::try_new_self_described_from_reader(reader.into(), cache).await
1018 }
1019
1020 async fn try_new_self_described_from_reader(
1021 reader: Arc<dyn Reader>,
1022 cache: Option<&LanceCache>,
1023 ) -> Result<Self>
1024 where
1025 Self: Sized;
1026}
1027
1028#[async_trait]
1029impl SelfDescribingFileReader for PreviousFileReader {
1030 async fn try_new_self_described_from_reader(
1031 reader: Arc<dyn Reader>,
1032 cache: Option<&LanceCache>,
1033 ) -> Result<Self> {
1034 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
1035 let manifest_position = metadata.manifest_position.ok_or(Error::internal(format!(
1036 "Attempt to open file at {} as self-describing but it did not contain a manifest",
1037 reader.path(),
1038 )))?;
1039 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
1040 if manifest.should_use_legacy_format() {
1041 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1042 }
1043 let schema = manifest.schema;
1044 let max_field_id = schema.max_field_id().unwrap_or_default();
1045 Self::try_new_from_reader(
1046 reader.path(),
1047 reader.clone(),
1048 Some(metadata),
1049 schema,
1050 0,
1051 0,
1052 max_field_id,
1053 cache,
1054 )
1055 .await
1056 }
1057}
1058
1059#[cfg(test)]
1060mod tests {
1061 use crate::format::{DataFile, DeletionFile, DeletionFileType};
1062 use std::num::NonZero;
1063
1064 use super::*;
1065
1066 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
1067 use lance_core::datatypes::Field;
1068
1069 #[test]
1070 fn test_writer_version() {
1071 let wv = WriterVersion::default();
1072 assert_eq!(wv.library, "lance");
1073
1074 let cargo_version = env!("CARGO_PKG_VERSION");
1076 let expected_tag = if cargo_version.contains('-') {
1077 Some(cargo_version.split('-').nth(1).unwrap())
1078 } else {
1079 None
1080 };
1081
1082 let version_parts: Vec<&str> = wv.version.split('.').collect();
1084 assert_eq!(
1085 version_parts.len(),
1086 3,
1087 "Version should be major.minor.patch"
1088 );
1089 assert!(
1090 !wv.version.contains('-'),
1091 "Version field should not contain prerelease"
1092 );
1093
1094 assert_eq!(wv.prerelease.as_deref(), expected_tag);
1096 assert_eq!(wv.build_metadata, None);
1098
1099 let version = wv.lance_lib_version().unwrap();
1101 assert_eq!(
1102 version.major,
1103 env!("CARGO_PKG_VERSION_MAJOR").parse::<u64>().unwrap()
1104 );
1105 assert_eq!(
1106 version.minor,
1107 env!("CARGO_PKG_VERSION_MINOR").parse::<u64>().unwrap()
1108 );
1109 assert_eq!(
1110 version.patch,
1111 env!("CARGO_PKG_VERSION_PATCH").parse::<u64>().unwrap() + 1
1113 );
1114 assert_eq!(version.pre.as_str(), expected_tag.unwrap_or(""));
1115
1116 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1117 let mut bumped_version = version.clone();
1118 bump_version(&mut bumped_version, *part);
1119 assert!(version < bumped_version);
1120 }
1121 }
1122
1123 #[test]
1124 fn test_writer_version_split() {
1125 let (version, prerelease, build_metadata) =
1127 WriterVersion::split_version("2.0.0-rc.1").unwrap();
1128 assert_eq!(version, "2.0.0");
1129 assert_eq!(prerelease, Some("rc.1".to_string()));
1130 assert_eq!(build_metadata, None);
1131
1132 let (version, prerelease, build_metadata) = WriterVersion::split_version("2.0.0").unwrap();
1134 assert_eq!(version, "2.0.0");
1135 assert_eq!(prerelease, None);
1136 assert_eq!(build_metadata, None);
1137
1138 let (version, prerelease, build_metadata) =
1140 WriterVersion::split_version("2.0.0-rc.1+build.123").unwrap();
1141 assert_eq!(version, "2.0.0");
1142 assert_eq!(prerelease, Some("rc.1".to_string()));
1143 assert_eq!(build_metadata, Some("build.123".to_string()));
1144
1145 let (version, prerelease, build_metadata) =
1147 WriterVersion::split_version("2.0.0+build.123").unwrap();
1148 assert_eq!(version, "2.0.0");
1149 assert_eq!(prerelease, None);
1150 assert_eq!(build_metadata, Some("build.123".to_string()));
1151
1152 assert!(WriterVersion::split_version("not-a-version").is_none());
1154 }
1155
1156 #[test]
1157 fn test_writer_version_comparison_with_prerelease() {
1158 let v1 = WriterVersion {
1159 library: "lance".to_string(),
1160 version: "2.0.0".to_string(),
1161 prerelease: Some("rc.1".to_string()),
1162 build_metadata: None,
1163 };
1164
1165 let v2 = WriterVersion {
1166 library: "lance".to_string(),
1167 version: "2.0.0".to_string(),
1168 prerelease: None,
1169 build_metadata: None,
1170 };
1171
1172 let semver1 = v1.lance_lib_version().unwrap();
1173 let semver2 = v2.lance_lib_version().unwrap();
1174
1175 assert!(semver1 < semver2);
1177 }
1178
1179 #[test]
1180 fn test_writer_version_with_build_metadata() {
1181 let v = WriterVersion {
1182 library: "lance".to_string(),
1183 version: "2.0.0".to_string(),
1184 prerelease: Some("rc.1".to_string()),
1185 build_metadata: Some("build.123".to_string()),
1186 };
1187
1188 let semver = v.lance_lib_version().unwrap();
1189 assert_eq!(semver.to_string(), "2.0.0-rc.1+build.123");
1190 assert_eq!(semver.major, 2);
1191 assert_eq!(semver.minor, 0);
1192 assert_eq!(semver.patch, 0);
1193 assert_eq!(semver.pre.as_str(), "rc.1");
1194 assert_eq!(semver.build.as_str(), "build.123");
1195 }
1196
1197 #[test]
1198 fn test_writer_version_non_semver() {
1199 let v = WriterVersion {
1201 library: "lance".to_string(),
1202 version: "custom-build-v1".to_string(),
1203 prerelease: None,
1204 build_metadata: None,
1205 };
1206
1207 assert!(v.lance_lib_version().is_none());
1209
1210 assert_eq!(v.library, "lance");
1212 assert_eq!(v.version, "custom-build-v1");
1213 }
1214
1215 #[test]
1216 #[allow(deprecated)]
1217 fn test_older_than_with_prerelease() {
1218 let v_rc = WriterVersion {
1220 library: "lance".to_string(),
1221 version: "2.0.0".to_string(),
1222 prerelease: Some("rc.1".to_string()),
1223 build_metadata: None,
1224 };
1225
1226 assert!(v_rc.older_than(2, 0, 0));
1228
1229 assert!(v_rc.older_than(2, 0, 1));
1231
1232 assert!(!v_rc.older_than(1, 9, 9));
1234
1235 let v_release = WriterVersion {
1236 library: "lance".to_string(),
1237 version: "2.0.0".to_string(),
1238 prerelease: None,
1239 build_metadata: None,
1240 };
1241
1242 assert!(!v_release.older_than(2, 0, 0));
1244
1245 assert!(v_release.older_than(2, 0, 1));
1247 }
1248
1249 #[test]
1250 fn test_fragments_by_offset_range() {
1251 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1252 "a",
1253 arrow_schema::DataType::Int64,
1254 false,
1255 )]);
1256 let schema = Schema::try_from(&arrow_schema).unwrap();
1257 let fragments = vec![
1258 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1259 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1260 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1261 ];
1262 let manifest = Manifest::new(
1263 schema,
1264 Arc::new(fragments),
1265 DataStorageFormat::default(),
1266 HashMap::new(),
1267 );
1268
1269 let actual = manifest.fragments_by_offset_range(0..10);
1270 assert_eq!(actual.len(), 1);
1271 assert_eq!(actual[0].0, 0);
1272 assert_eq!(actual[0].1.id, 0);
1273
1274 let actual = manifest.fragments_by_offset_range(5..15);
1275 assert_eq!(actual.len(), 2);
1276 assert_eq!(actual[0].0, 0);
1277 assert_eq!(actual[0].1.id, 0);
1278 assert_eq!(actual[1].0, 10);
1279 assert_eq!(actual[1].1.id, 1);
1280
1281 let actual = manifest.fragments_by_offset_range(15..50);
1282 assert_eq!(actual.len(), 2);
1283 assert_eq!(actual[0].0, 10);
1284 assert_eq!(actual[0].1.id, 1);
1285 assert_eq!(actual[1].0, 25);
1286 assert_eq!(actual[1].1.id, 2);
1287
1288 let actual = manifest.fragments_by_offset_range(45..100);
1290 assert!(actual.is_empty());
1291
1292 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1293 }
1294
1295 #[test]
1296 fn test_max_field_id() {
1297 let mut field0 =
1299 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1300 field0.set_id(-1, &mut 0);
1301 let mut field2 =
1302 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1303 field2.set_id(-1, &mut 2);
1304
1305 let schema = Schema {
1306 fields: vec![field0, field2],
1307 metadata: Default::default(),
1308 };
1309 let fragments = vec![
1310 Fragment {
1311 id: 0,
1312 files: vec![DataFile::new_legacy_from_fields(
1313 "path1",
1314 vec![0, 1, 2],
1315 None,
1316 )],
1317 deletion_file: None,
1318 row_id_meta: None,
1319 physical_rows: None,
1320 created_at_version_meta: None,
1321 last_updated_at_version_meta: None,
1322 },
1323 Fragment {
1324 id: 1,
1325 files: vec![
1326 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1327 DataFile::new_legacy_from_fields("path3", vec![2], None),
1328 ],
1329 deletion_file: None,
1330 row_id_meta: None,
1331 physical_rows: None,
1332 created_at_version_meta: None,
1333 last_updated_at_version_meta: None,
1334 },
1335 ];
1336
1337 let manifest = Manifest::new(
1338 schema,
1339 Arc::new(fragments),
1340 DataStorageFormat::default(),
1341 HashMap::new(),
1342 );
1343
1344 assert_eq!(manifest.max_field_id(), 43);
1345 }
1346
1347 #[test]
1348 fn test_config() {
1349 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1350 "a",
1351 arrow_schema::DataType::Int64,
1352 false,
1353 )]);
1354 let schema = Schema::try_from(&arrow_schema).unwrap();
1355 let fragments = vec![
1356 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1357 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1358 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1359 ];
1360 let mut manifest = Manifest::new(
1361 schema,
1362 Arc::new(fragments),
1363 DataStorageFormat::default(),
1364 HashMap::new(),
1365 );
1366
1367 let mut config = manifest.config.clone();
1368 config.insert("lance.test".to_string(), "value".to_string());
1369 config.insert("other-key".to_string(), "other-value".to_string());
1370
1371 manifest.config_mut().extend(config.clone());
1372 assert_eq!(manifest.config, config.clone());
1373
1374 config.remove("other-key");
1375 manifest.config_mut().remove("other-key");
1376 assert_eq!(manifest.config, config);
1377 }
1378
1379 #[test]
1380 fn test_manifest_summary() {
1381 let arrow_schema = ArrowSchema::new(vec![
1383 ArrowField::new("id", arrow_schema::DataType::Int64, false),
1384 ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1385 ]);
1386 let schema = Schema::try_from(&arrow_schema).unwrap();
1387
1388 let empty_manifest = Manifest::new(
1389 schema.clone(),
1390 Arc::new(vec![]),
1391 DataStorageFormat::default(),
1392 HashMap::new(),
1393 );
1394
1395 let empty_summary = empty_manifest.summary();
1396 assert_eq!(empty_summary.total_rows, 0);
1397 assert_eq!(empty_summary.total_files_size, 0);
1398 assert_eq!(empty_summary.total_fragments, 0);
1399 assert_eq!(empty_summary.total_data_files, 0);
1400 assert_eq!(empty_summary.total_deletion_file_rows, 0);
1401 assert_eq!(empty_summary.total_data_file_rows, 0);
1402 assert_eq!(empty_summary.total_deletion_files, 0);
1403
1404 let empty_fragments = vec![
1406 Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1407 Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1408 ];
1409
1410 let empty_files_manifest = Manifest::new(
1411 schema.clone(),
1412 Arc::new(empty_fragments),
1413 DataStorageFormat::default(),
1414 HashMap::new(),
1415 );
1416
1417 let empty_files_summary = empty_files_manifest.summary();
1418 assert_eq!(empty_files_summary.total_rows, 0);
1419 assert_eq!(empty_files_summary.total_files_size, 0);
1420 assert_eq!(empty_files_summary.total_fragments, 2);
1421 assert_eq!(empty_files_summary.total_data_files, 2);
1422 assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1423 assert_eq!(empty_files_summary.total_data_file_rows, 0);
1424 assert_eq!(empty_files_summary.total_deletion_files, 0);
1425
1426 let real_fragments = vec![
1428 Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1429 Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1430 Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1431 ];
1432
1433 let real_data_manifest = Manifest::new(
1434 schema.clone(),
1435 Arc::new(real_fragments),
1436 DataStorageFormat::default(),
1437 HashMap::new(),
1438 );
1439
1440 let real_data_summary = real_data_manifest.summary();
1441 assert_eq!(real_data_summary.total_rows, 425); assert_eq!(real_data_summary.total_files_size, 0); assert_eq!(real_data_summary.total_fragments, 3);
1444 assert_eq!(real_data_summary.total_data_files, 3);
1445 assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1446 assert_eq!(real_data_summary.total_data_file_rows, 425);
1447 assert_eq!(real_data_summary.total_deletion_files, 0);
1448
1449 let file_version = LanceFileVersion::default();
1450 let mut fragment_with_deletion = Fragment::new(0)
1452 .with_file(
1453 "data_with_deletion.lance",
1454 vec![0, 1],
1455 vec![0, 1],
1456 &file_version,
1457 NonZero::new(1000),
1458 )
1459 .with_physical_rows(50);
1460 fragment_with_deletion.deletion_file = Some(DeletionFile {
1461 read_version: 123,
1462 id: 456,
1463 file_type: DeletionFileType::Array,
1464 num_deleted_rows: Some(10),
1465 base_id: None,
1466 });
1467
1468 let manifest_with_deletion = Manifest::new(
1469 schema,
1470 Arc::new(vec![fragment_with_deletion]),
1471 DataStorageFormat::default(),
1472 HashMap::new(),
1473 );
1474
1475 let deletion_summary = manifest_with_deletion.summary();
1476 assert_eq!(deletion_summary.total_rows, 40); assert_eq!(deletion_summary.total_files_size, 1000);
1478 assert_eq!(deletion_summary.total_fragments, 1);
1479 assert_eq!(deletion_summary.total_data_files, 1);
1480 assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1481 assert_eq!(deletion_summary.total_data_file_rows, 50);
1482 assert_eq!(deletion_summary.total_deletion_files, 1);
1483
1484 let stats_map: BTreeMap<String, String> = deletion_summary.into();
1486 assert_eq!(stats_map.len(), 7)
1487 }
1488}