1use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{Fields, FieldsWithMeta, populate_schema_dictionary};
8use lance_file::previous::reader::FileReader as PreviousFileReader;
9use lance_file::version::{LEGACY_FORMAT_VERSION, LanceFileVersion};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{FLAG_STABLE_ROW_IDS, has_deprecated_v2_feature_flag};
20use crate::format::fragment::DataFileFieldInterner;
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::Schema;
24use lance_core::{Error, Result};
25use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
26use lance_io::utils::read_struct;
27
28#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36 pub schema: Schema,
38
39 pub version: u64,
41
42 pub branch: Option<String>,
44
45 pub writer_version: Option<WriterVersion>,
47
48 pub fragments: Arc<Vec<Fragment>>,
53
54 pub version_aux_data: usize,
56
57 pub index_section: Option<usize>,
59
60 pub timestamp_nanos: u128,
62
63 pub tag: Option<String>,
65
66 pub reader_feature_flags: u64,
68
69 pub writer_feature_flags: u64,
71
72 pub max_fragment_id: Option<u32>,
75
76 pub transaction_file: Option<String>,
78
79 pub transaction_section: Option<usize>,
81
82 fragment_offsets: Vec<usize>,
85
86 pub next_row_id: u64,
88
89 pub data_storage_format: DataStorageFormat,
91
92 pub config: HashMap<String, String>,
94
95 pub table_metadata: HashMap<String, String>,
101
102 pub base_paths: HashMap<u32, BasePath>,
104}
105
106pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
108
109pub fn is_detached_version(version: u64) -> bool {
110 version & DETACHED_VERSION_MASK != 0
111}
112
113fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
114 fragments
115 .iter()
116 .map(|f| f.num_rows().unwrap_or_default())
117 .chain([0]) .scan(0_usize, |offset, len| {
119 let start = *offset;
120 *offset += len;
121 Some(start)
122 })
123 .collect()
124}
125
126#[derive(Default)]
127pub struct ManifestSummary {
128 pub total_fragments: u64,
129 pub total_data_files: u64,
130 pub total_files_size: u64,
131 pub total_deletion_files: u64,
132 pub total_data_file_rows: u64,
133 pub total_deletion_file_rows: u64,
134 pub total_rows: u64,
135}
136
137impl From<ManifestSummary> for BTreeMap<String, String> {
138 fn from(summary: ManifestSummary) -> Self {
139 let mut stats_map = Self::new();
140 stats_map.insert(
141 "total_fragments".to_string(),
142 summary.total_fragments.to_string(),
143 );
144 stats_map.insert(
145 "total_data_files".to_string(),
146 summary.total_data_files.to_string(),
147 );
148 stats_map.insert(
149 "total_files_size".to_string(),
150 summary.total_files_size.to_string(),
151 );
152 stats_map.insert(
153 "total_deletion_files".to_string(),
154 summary.total_deletion_files.to_string(),
155 );
156 stats_map.insert(
157 "total_data_file_rows".to_string(),
158 summary.total_data_file_rows.to_string(),
159 );
160 stats_map.insert(
161 "total_deletion_file_rows".to_string(),
162 summary.total_deletion_file_rows.to_string(),
163 );
164 stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
165 stats_map
166 }
167}
168
169impl Manifest {
170 pub fn new(
171 schema: Schema,
172 fragments: Arc<Vec<Fragment>>,
173 data_storage_format: DataStorageFormat,
174 base_paths: HashMap<u32, BasePath>,
175 ) -> Self {
176 let fragment_offsets = compute_fragment_offsets(&fragments);
177
178 Self {
179 schema,
180 version: 1,
181 branch: None,
182 writer_version: Some(WriterVersion::default()),
183 fragments,
184 version_aux_data: 0,
185 index_section: None,
186 timestamp_nanos: 0,
187 tag: None,
188 reader_feature_flags: 0,
189 writer_feature_flags: 0,
190 max_fragment_id: None,
191 transaction_file: None,
192 transaction_section: None,
193 fragment_offsets,
194 next_row_id: 0,
195 data_storage_format,
196 config: HashMap::new(),
197 table_metadata: HashMap::new(),
198 base_paths,
199 }
200 }
201
202 pub fn new_from_previous(
203 previous: &Self,
204 schema: Schema,
205 fragments: Arc<Vec<Fragment>>,
206 ) -> Self {
207 let fragment_offsets = compute_fragment_offsets(&fragments);
208
209 Self {
210 schema,
211 version: previous.version + 1,
212 branch: previous.branch.clone(),
213 writer_version: Some(WriterVersion::default()),
214 fragments,
215 version_aux_data: 0,
216 index_section: None, timestamp_nanos: 0, tag: None,
219 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
222 transaction_file: None,
223 transaction_section: None,
224 fragment_offsets,
225 next_row_id: previous.next_row_id,
226 data_storage_format: previous.data_storage_format.clone(),
227 config: previous.config.clone(),
228 table_metadata: previous.table_metadata.clone(),
229 base_paths: previous.base_paths.clone(),
230 }
231 }
232
233 pub fn shallow_clone(
238 &self,
239 ref_name: Option<String>,
240 ref_path: String,
241 ref_base_id: u32,
242 branch_name: Option<String>,
243 transaction_file: String,
244 ) -> Self {
245 let cloned_fragments = self
246 .fragments
247 .as_ref()
248 .iter()
249 .map(|fragment| {
250 let mut cloned_fragment = fragment.clone();
251 for file in &mut cloned_fragment.files {
252 if file.base_id.is_none() {
253 file.base_id = Some(ref_base_id);
254 }
255 }
256
257 if let Some(deletion) = &mut cloned_fragment.deletion_file
258 && deletion.base_id.is_none()
259 {
260 deletion.base_id = Some(ref_base_id);
261 }
262 cloned_fragment
263 })
264 .collect::<Vec<_>>();
265
266 Self {
267 schema: self.schema.clone(),
268 version: self.version,
269 branch: branch_name,
270 writer_version: self.writer_version.clone(),
271 fragments: Arc::new(cloned_fragments),
272 version_aux_data: self.version_aux_data,
273 index_section: None, timestamp_nanos: self.timestamp_nanos,
275 tag: None,
276 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: self.max_fragment_id,
279 transaction_file: Some(transaction_file),
280 transaction_section: None,
281 fragment_offsets: self.fragment_offsets.clone(),
282 next_row_id: self.next_row_id,
283 data_storage_format: self.data_storage_format.clone(),
284 config: self.config.clone(),
285 base_paths: {
286 let mut base_paths = self.base_paths.clone();
287 let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
288 base_paths.insert(ref_base_id, base_path);
289 base_paths
290 },
291 table_metadata: self.table_metadata.clone(),
292 }
293 }
294
295 pub fn timestamp(&self) -> DateTime<Utc> {
297 let nanos = self.timestamp_nanos % 1_000_000_000;
298 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
299 Utc.from_utc_datetime(
300 &DateTime::from_timestamp(seconds, nanos as u32)
301 .unwrap_or_default()
302 .naive_utc(),
303 )
304 }
305
306 pub fn set_timestamp(&mut self, nanos: u128) {
308 self.timestamp_nanos = nanos;
309 }
310
311 pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
313 &mut self.config
314 }
315
316 pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
318 &mut self.table_metadata
319 }
320
321 pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
323 &mut self.schema.metadata
324 }
325
326 pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
330 self.schema
331 .field_by_id_mut(field_id)
332 .map(|field| &mut field.metadata)
333 }
334
335 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
337 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
338 self.config.extend(upsert_values);
339 }
340
341 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
343 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
344 self.config
345 .retain(|key, _| !delete_keys.contains(&key.as_str()));
346 }
347
348 #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
350 pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
351 self.schema.metadata = new_metadata;
352 }
353
354 #[deprecated(
358 note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
359 )]
360 pub fn replace_field_metadata(
361 &mut self,
362 field_id: i32,
363 new_metadata: HashMap<String, String>,
364 ) -> Result<()> {
365 if let Some(field) = self.schema.field_by_id_mut(field_id) {
366 field.metadata = new_metadata;
367 Ok(())
368 } else {
369 Err(Error::invalid_input(format!(
370 "Field with id {} does not exist for replace_field_metadata",
371 field_id
372 )))
373 }
374 }
375
376 pub fn update_max_fragment_id(&mut self) {
378 if self.fragments.is_empty() {
380 return;
381 }
382
383 let max_fragment_id = self
384 .fragments
385 .iter()
386 .map(|f| f.id)
387 .max()
388 .unwrap() .try_into()
390 .unwrap();
391
392 match self.max_fragment_id {
393 None => {
394 self.max_fragment_id = Some(max_fragment_id);
396 }
397 Some(current_max) => {
398 if max_fragment_id > current_max {
401 self.max_fragment_id = Some(max_fragment_id);
402 }
403 }
404 }
405 }
406
407 pub fn max_fragment_id(&self) -> Option<u64> {
412 if let Some(max_id) = self.max_fragment_id {
413 Some(max_id.into())
415 } else {
416 self.fragments.iter().map(|f| f.id).max()
418 }
419 }
420
421 pub fn max_field_id(&self) -> i32 {
426 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
427 let fragment_max_id = self
428 .fragments
429 .iter()
430 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.iter()))
431 .max()
432 .copied();
433 let fragment_max_id = fragment_max_id.unwrap_or(-1);
434 schema_max_id.max(fragment_max_id)
435 }
436
437 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
440 if since.version >= self.version {
441 return Err(Error::invalid_input(format!(
442 "fragments_since: given version {} is newer than manifest version {}",
443 since.version, self.version
444 )));
445 }
446 let start = since.max_fragment_id();
447 Ok(self
448 .fragments
449 .iter()
450 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
451 .cloned()
452 .collect())
453 }
454
455 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
471 let start = range.start;
472 let end = range.end;
473 let idx = self
474 .fragment_offsets
475 .binary_search(&start)
476 .unwrap_or_else(|idx| idx - 1);
477
478 let mut fragments = vec![];
479 for i in idx..self.fragments.len() {
480 if self.fragment_offsets[i] >= end
481 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
482 <= start
483 {
484 break;
485 }
486 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
487 }
488
489 fragments
490 }
491
492 pub fn uses_stable_row_ids(&self) -> bool {
494 self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
495 }
496
497 pub fn serialized(&self) -> Vec<u8> {
500 let pb_manifest: pb::Manifest = self.into();
501 pb_manifest.encode_to_vec()
502 }
503
504 pub fn should_use_legacy_format(&self) -> bool {
505 self.data_storage_format.version == LEGACY_FORMAT_VERSION
506 }
507
508 pub fn summary(&self) -> ManifestSummary {
519 let mut summary =
521 self.fragments
522 .iter()
523 .fold(ManifestSummary::default(), |mut summary, f| {
524 summary.total_data_files += f.files.len() as u64;
526 if let Some(num_rows) = f.num_rows() {
528 summary.total_rows += num_rows as u64;
529 }
530 for data_file in &f.files {
532 if let Some(size_bytes) = data_file.file_size_bytes.get() {
533 summary.total_files_size += size_bytes.get();
534 }
535 }
536 if f.deletion_file.is_some() {
538 summary.total_deletion_files += 1;
539 }
540 if let Some(deletion_file) = &f.deletion_file
542 && let Some(num_deleted) = deletion_file.num_deleted_rows
543 {
544 summary.total_deletion_file_rows += num_deleted as u64;
545 }
546 summary
547 });
548 summary.total_fragments = self.fragments.len() as u64;
549 summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
550
551 summary
552 }
553}
554
555#[derive(Debug, Clone, PartialEq)]
556pub struct BasePath {
557 pub id: u32,
558 pub name: Option<String>,
559 pub is_dataset_root: bool,
560 pub path: String,
562}
563
564impl BasePath {
565 pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
574 Self {
575 id,
576 name,
577 is_dataset_root,
578 path,
579 }
580 }
581
582 pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
586 ObjectStore::extract_path_from_uri(registry, &self.path)
587 }
588}
589
590impl DeepSizeOf for BasePath {
591 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
592 self.name.deep_size_of_children(context)
593 + self.path.deep_size_of_children(context) * 2
594 + size_of::<bool>()
595 }
596}
597
598#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
599pub struct WriterVersion {
600 pub library: String,
601 pub version: String,
602 pub prerelease: Option<String>,
603 pub build_metadata: Option<String>,
604}
605
606#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
607pub struct DataStorageFormat {
608 pub file_format: String,
609 pub version: String,
610}
611
612const LANCE_FORMAT_NAME: &str = "lance";
613
614impl DataStorageFormat {
615 pub fn new(version: LanceFileVersion) -> Self {
616 Self {
617 file_format: LANCE_FORMAT_NAME.to_string(),
618 version: version.resolve().to_string(),
619 }
620 }
621
622 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
623 self.version.parse::<LanceFileVersion>()
624 }
625}
626
627impl Default for DataStorageFormat {
628 fn default() -> Self {
629 Self::new(LanceFileVersion::default())
630 }
631}
632
633impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
634 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
635 Self {
636 file_format: pb.file_format,
637 version: pb.version,
638 }
639 }
640}
641
642#[derive(Debug, Clone, Copy, PartialEq, Eq)]
643pub enum VersionPart {
644 Major,
645 Minor,
646 Patch,
647}
648
649fn bump_version(version: &mut semver::Version, part: VersionPart) {
650 match part {
651 VersionPart::Major => {
652 version.major += 1;
653 version.minor = 0;
654 version.patch = 0;
655 }
656 VersionPart::Minor => {
657 version.minor += 1;
658 version.patch = 0;
659 }
660 VersionPart::Patch => {
661 version.patch += 1;
662 }
663 }
664}
665
666impl WriterVersion {
667 fn split_version(full_version: &str) -> Option<(String, Option<String>, Option<String>)> {
677 let mut parsed = semver::Version::parse(full_version).ok()?;
678
679 let prerelease = if parsed.pre.is_empty() {
680 None
681 } else {
682 Some(parsed.pre.to_string())
683 };
684
685 let build_metadata = if parsed.build.is_empty() {
686 None
687 } else {
688 Some(parsed.build.to_string())
689 };
690
691 parsed.pre = semver::Prerelease::EMPTY;
693 parsed.build = semver::BuildMetadata::EMPTY;
694 Some((parsed.to_string(), prerelease, build_metadata))
695 }
696
697 #[deprecated(note = "Use `lance_lib_version()` instead")]
700 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
701 let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
703 (
704 &self.version[..dash_idx],
705 Some(&self.version[dash_idx + 1..]),
706 )
707 } else {
708 (self.version.as_str(), None)
709 };
710
711 let mut parts = version_part.split('.');
712 let major = parts.next().unwrap_or("0").parse().ok()?;
713 let minor = parts.next().unwrap_or("0").parse().ok()?;
714 let patch = parts.next().unwrap_or("0").parse().ok()?;
715
716 Some((major, minor, patch, tag))
717 }
718
719 pub fn lance_lib_version(&self) -> Option<semver::Version> {
727 if self.library != "lance" {
728 return None;
729 }
730
731 let mut version = semver::Version::parse(&self.version).ok()?;
732
733 if let Some(ref prerelease) = self.prerelease {
734 version.pre = semver::Prerelease::new(prerelease).ok()?;
735 }
736
737 if let Some(ref build_metadata) = self.build_metadata {
738 version.build = semver::BuildMetadata::new(build_metadata).ok()?;
739 }
740
741 Some(version)
742 }
743
744 #[deprecated(
745 note = "Use `lance_lib_version()` instead, which safely checks the library field and returns Option"
746 )]
747 #[allow(deprecated)]
748 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
749 self.semver()
750 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
751 }
752
753 #[deprecated(note = "Use `lance_lib_version()` and its `older_than` method instead.")]
759 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
760 let version = self
761 .lance_lib_version()
762 .expect("Not lance library or invalid version");
763 let other = semver::Version {
764 major: major.into(),
765 minor: minor.into(),
766 patch: patch.into(),
767 pre: semver::Prerelease::EMPTY,
768 build: semver::BuildMetadata::EMPTY,
769 };
770 version < other
771 }
772
773 #[deprecated(note = "This is meant for testing and will be made private in future version.")]
774 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
775 let mut version = self.lance_lib_version().expect("Should be lance version");
776 bump_version(&mut version, part);
777 if !keep_tag {
778 version.pre = semver::Prerelease::EMPTY;
779 }
780 let (clean_version, prerelease, build_metadata) = Self::split_version(&version.to_string())
781 .expect("Bumped version should be valid semver");
782 Self {
783 library: self.library.clone(),
784 version: clean_version,
785 prerelease,
786 build_metadata,
787 }
788 }
789}
790
791impl Default for WriterVersion {
792 #[cfg(not(test))]
793 fn default() -> Self {
794 let full_version = env!("CARGO_PKG_VERSION");
795 let (version, prerelease, build_metadata) =
796 Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
797 Self {
798 library: "lance".to_string(),
799 version,
800 prerelease,
801 build_metadata,
802 }
803 }
804
805 #[cfg(test)]
807 #[allow(deprecated)]
808 fn default() -> Self {
809 let full_version = env!("CARGO_PKG_VERSION");
810 let (version, prerelease, build_metadata) =
811 Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
812 Self {
813 library: "lance".to_string(),
814 version,
815 prerelease,
816 build_metadata,
817 }
818 .bump(VersionPart::Patch, true)
819 }
820}
821
822impl ProtoStruct for Manifest {
823 type Proto = pb::Manifest;
824}
825
826impl From<pb::BasePath> for BasePath {
827 fn from(p: pb::BasePath) -> Self {
828 Self::new(p.id, p.path, p.name, p.is_dataset_root)
829 }
830}
831
832impl From<BasePath> for pb::BasePath {
833 fn from(p: BasePath) -> Self {
834 Self {
835 id: p.id,
836 name: p.name,
837 is_dataset_root: p.is_dataset_root,
838 path: p.path,
839 }
840 }
841}
842
843impl TryFrom<pb::Manifest> for Manifest {
844 type Error = Error;
845
846 fn try_from(p: pb::Manifest) -> Result<Self> {
847 let timestamp_nanos = p.timestamp.map(|ts| {
848 let sec = ts.seconds as u128 * 1e9 as u128;
849 let nanos = ts.nanos as u128;
850 sec + nanos
851 });
852 let writer_version = match p.writer_version {
854 Some(pb::manifest::WriterVersion {
855 library,
856 version,
857 prerelease,
858 build_metadata,
859 }) => Some(WriterVersion {
860 library,
861 version,
862 prerelease,
863 build_metadata,
864 }),
865 _ => None,
866 };
867 let mut interner = DataFileFieldInterner::default();
868 let fragments = Arc::new(
869 p.fragments
870 .into_iter()
871 .map(|f| interner.intern_fragment(f))
872 .collect::<Result<Vec<_>>>()?,
873 );
874 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
875 let fields_with_meta = FieldsWithMeta {
876 fields: Fields(p.fields),
877 metadata: p.schema_metadata,
878 };
879
880 if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
881 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
882 {
883 return Err(Error::internal("All fragments must have row ids"));
884 }
885
886 let data_storage_format = match p.data_format {
887 None => {
888 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
889 DataStorageFormat::new(inferred_version)
891 } else {
892 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
894 DataStorageFormat::new(LanceFileVersion::Stable)
895 } else {
896 DataStorageFormat::new(LanceFileVersion::Legacy)
897 }
898 }
899 }
900 Some(format) => DataStorageFormat::from(format),
901 };
902
903 let schema = Schema::from(fields_with_meta);
904
905 Ok(Self {
906 schema,
907 version: p.version,
908 branch: p.branch,
909 writer_version,
910 version_aux_data: p.version_aux_data as usize,
911 index_section: p.index_section.map(|i| i as usize),
912 timestamp_nanos: timestamp_nanos.unwrap_or(0),
913 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
914 reader_feature_flags: p.reader_feature_flags,
915 writer_feature_flags: p.writer_feature_flags,
916 max_fragment_id: p.max_fragment_id,
917 fragments,
918 transaction_file: if p.transaction_file.is_empty() {
919 None
920 } else {
921 Some(p.transaction_file)
922 },
923 transaction_section: p.transaction_section.map(|i| i as usize),
924 fragment_offsets,
925 next_row_id: p.next_row_id,
926 data_storage_format,
927 config: p.config,
928 table_metadata: p.table_metadata,
929 base_paths: p
930 .base_paths
931 .iter()
932 .map(|item| (item.id, item.clone().into()))
933 .collect(),
934 })
935 }
936}
937
938impl From<&Manifest> for pb::Manifest {
939 fn from(m: &Manifest) -> Self {
940 let timestamp_nanos = if m.timestamp_nanos == 0 {
941 None
942 } else {
943 let nanos = m.timestamp_nanos % 1e9 as u128;
944 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
945 Some(Timestamp {
946 seconds,
947 nanos: nanos as i32,
948 })
949 };
950 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
951 Self {
952 fields: fields_with_meta.fields.0,
953 schema_metadata: m
954 .schema
955 .metadata
956 .iter()
957 .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
958 .collect(),
959 version: m.version,
960 branch: m.branch.clone(),
961 writer_version: m
962 .writer_version
963 .as_ref()
964 .map(|wv| pb::manifest::WriterVersion {
965 library: wv.library.clone(),
966 version: wv.version.clone(),
967 prerelease: wv.prerelease.clone(),
968 build_metadata: wv.build_metadata.clone(),
969 }),
970 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
971 table_metadata: m.table_metadata.clone(),
972 version_aux_data: m.version_aux_data as u64,
973 index_section: m.index_section.map(|i| i as u64),
974 timestamp: timestamp_nanos,
975 tag: m.tag.clone().unwrap_or_default(),
976 reader_feature_flags: m.reader_feature_flags,
977 writer_feature_flags: m.writer_feature_flags,
978 max_fragment_id: m.max_fragment_id,
979 transaction_file: m.transaction_file.clone().unwrap_or_default(),
980 next_row_id: m.next_row_id,
981 data_format: Some(pb::manifest::DataStorageFormat {
982 file_format: m.data_storage_format.file_format.clone(),
983 version: m.data_storage_format.version.clone(),
984 }),
985 config: m.config.clone(),
986 base_paths: m
987 .base_paths
988 .values()
989 .map(|base_path| pb::BasePath {
990 id: base_path.id,
991 name: base_path.name.clone(),
992 is_dataset_root: base_path.is_dataset_root,
993 path: base_path.path.clone(),
994 })
995 .collect(),
996 transaction_section: m.transaction_section.map(|i| i as u64),
997 }
998 }
999}
1000
1001#[async_trait]
1002pub trait SelfDescribingFileReader {
1003 async fn try_new_self_described(
1011 object_store: &ObjectStore,
1012 path: &Path,
1013 cache: Option<&LanceCache>,
1014 ) -> Result<Self>
1015 where
1016 Self: Sized,
1017 {
1018 let reader = object_store.open(path).await?;
1019 Self::try_new_self_described_from_reader(reader.into(), cache).await
1020 }
1021
1022 async fn try_new_self_described_from_reader(
1023 reader: Arc<dyn Reader>,
1024 cache: Option<&LanceCache>,
1025 ) -> Result<Self>
1026 where
1027 Self: Sized;
1028}
1029
1030#[async_trait]
1031impl SelfDescribingFileReader for PreviousFileReader {
1032 async fn try_new_self_described_from_reader(
1033 reader: Arc<dyn Reader>,
1034 cache: Option<&LanceCache>,
1035 ) -> Result<Self> {
1036 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
1037 let manifest_position = metadata.manifest_position.ok_or(Error::internal(format!(
1038 "Attempt to open file at {} as self-describing but it did not contain a manifest",
1039 reader.path(),
1040 )))?;
1041 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
1042 if manifest.should_use_legacy_format() {
1043 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1044 }
1045 let schema = manifest.schema;
1046 let max_field_id = schema.max_field_id().unwrap_or_default();
1047 Self::try_new_from_reader(
1048 reader.path(),
1049 reader.clone(),
1050 Some(metadata),
1051 schema,
1052 0,
1053 0,
1054 max_field_id,
1055 cache,
1056 )
1057 .await
1058 }
1059}
1060
1061#[cfg(test)]
1062mod tests {
1063 use crate::format::{DataFile, DeletionFile, DeletionFileType};
1064 use std::num::NonZero;
1065
1066 use super::*;
1067
1068 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
1069 use lance_core::datatypes::Field;
1070
1071 #[test]
1072 fn test_writer_version() {
1073 let wv = WriterVersion::default();
1074 assert_eq!(wv.library, "lance");
1075
1076 let cargo_version = env!("CARGO_PKG_VERSION");
1078 let expected_tag = if cargo_version.contains('-') {
1079 Some(cargo_version.split('-').nth(1).unwrap())
1080 } else {
1081 None
1082 };
1083
1084 let version_parts: Vec<&str> = wv.version.split('.').collect();
1086 assert_eq!(
1087 version_parts.len(),
1088 3,
1089 "Version should be major.minor.patch"
1090 );
1091 assert!(
1092 !wv.version.contains('-'),
1093 "Version field should not contain prerelease"
1094 );
1095
1096 assert_eq!(wv.prerelease.as_deref(), expected_tag);
1098 assert_eq!(wv.build_metadata, None);
1100
1101 let version = wv.lance_lib_version().unwrap();
1103 assert_eq!(
1104 version.major,
1105 env!("CARGO_PKG_VERSION_MAJOR").parse::<u64>().unwrap()
1106 );
1107 assert_eq!(
1108 version.minor,
1109 env!("CARGO_PKG_VERSION_MINOR").parse::<u64>().unwrap()
1110 );
1111 assert_eq!(
1112 version.patch,
1113 env!("CARGO_PKG_VERSION_PATCH").parse::<u64>().unwrap() + 1
1115 );
1116 assert_eq!(version.pre.as_str(), expected_tag.unwrap_or(""));
1117
1118 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1119 let mut bumped_version = version.clone();
1120 bump_version(&mut bumped_version, *part);
1121 assert!(version < bumped_version);
1122 }
1123 }
1124
1125 #[test]
1126 fn test_writer_version_split() {
1127 let (version, prerelease, build_metadata) =
1129 WriterVersion::split_version("2.0.0-rc.1").unwrap();
1130 assert_eq!(version, "2.0.0");
1131 assert_eq!(prerelease, Some("rc.1".to_string()));
1132 assert_eq!(build_metadata, None);
1133
1134 let (version, prerelease, build_metadata) = WriterVersion::split_version("2.0.0").unwrap();
1136 assert_eq!(version, "2.0.0");
1137 assert_eq!(prerelease, None);
1138 assert_eq!(build_metadata, None);
1139
1140 let (version, prerelease, build_metadata) =
1142 WriterVersion::split_version("2.0.0-rc.1+build.123").unwrap();
1143 assert_eq!(version, "2.0.0");
1144 assert_eq!(prerelease, Some("rc.1".to_string()));
1145 assert_eq!(build_metadata, Some("build.123".to_string()));
1146
1147 let (version, prerelease, build_metadata) =
1149 WriterVersion::split_version("2.0.0+build.123").unwrap();
1150 assert_eq!(version, "2.0.0");
1151 assert_eq!(prerelease, None);
1152 assert_eq!(build_metadata, Some("build.123".to_string()));
1153
1154 assert!(WriterVersion::split_version("not-a-version").is_none());
1156 }
1157
1158 #[test]
1159 fn test_writer_version_comparison_with_prerelease() {
1160 let v1 = WriterVersion {
1161 library: "lance".to_string(),
1162 version: "2.0.0".to_string(),
1163 prerelease: Some("rc.1".to_string()),
1164 build_metadata: None,
1165 };
1166
1167 let v2 = WriterVersion {
1168 library: "lance".to_string(),
1169 version: "2.0.0".to_string(),
1170 prerelease: None,
1171 build_metadata: None,
1172 };
1173
1174 let semver1 = v1.lance_lib_version().unwrap();
1175 let semver2 = v2.lance_lib_version().unwrap();
1176
1177 assert!(semver1 < semver2);
1179 }
1180
1181 #[test]
1182 fn test_writer_version_with_build_metadata() {
1183 let v = WriterVersion {
1184 library: "lance".to_string(),
1185 version: "2.0.0".to_string(),
1186 prerelease: Some("rc.1".to_string()),
1187 build_metadata: Some("build.123".to_string()),
1188 };
1189
1190 let semver = v.lance_lib_version().unwrap();
1191 assert_eq!(semver.to_string(), "2.0.0-rc.1+build.123");
1192 assert_eq!(semver.major, 2);
1193 assert_eq!(semver.minor, 0);
1194 assert_eq!(semver.patch, 0);
1195 assert_eq!(semver.pre.as_str(), "rc.1");
1196 assert_eq!(semver.build.as_str(), "build.123");
1197 }
1198
1199 #[test]
1200 fn test_writer_version_non_semver() {
1201 let v = WriterVersion {
1203 library: "lance".to_string(),
1204 version: "custom-build-v1".to_string(),
1205 prerelease: None,
1206 build_metadata: None,
1207 };
1208
1209 assert!(v.lance_lib_version().is_none());
1211
1212 assert_eq!(v.library, "lance");
1214 assert_eq!(v.version, "custom-build-v1");
1215 }
1216
1217 #[test]
1218 #[allow(deprecated)]
1219 fn test_older_than_with_prerelease() {
1220 let v_rc = WriterVersion {
1222 library: "lance".to_string(),
1223 version: "2.0.0".to_string(),
1224 prerelease: Some("rc.1".to_string()),
1225 build_metadata: None,
1226 };
1227
1228 assert!(v_rc.older_than(2, 0, 0));
1230
1231 assert!(v_rc.older_than(2, 0, 1));
1233
1234 assert!(!v_rc.older_than(1, 9, 9));
1236
1237 let v_release = WriterVersion {
1238 library: "lance".to_string(),
1239 version: "2.0.0".to_string(),
1240 prerelease: None,
1241 build_metadata: None,
1242 };
1243
1244 assert!(!v_release.older_than(2, 0, 0));
1246
1247 assert!(v_release.older_than(2, 0, 1));
1249 }
1250
1251 #[test]
1252 fn test_fragments_by_offset_range() {
1253 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1254 "a",
1255 arrow_schema::DataType::Int64,
1256 false,
1257 )]);
1258 let schema = Schema::try_from(&arrow_schema).unwrap();
1259 let fragments = vec![
1260 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1261 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1262 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1263 ];
1264 let manifest = Manifest::new(
1265 schema,
1266 Arc::new(fragments),
1267 DataStorageFormat::default(),
1268 HashMap::new(),
1269 );
1270
1271 let actual = manifest.fragments_by_offset_range(0..10);
1272 assert_eq!(actual.len(), 1);
1273 assert_eq!(actual[0].0, 0);
1274 assert_eq!(actual[0].1.id, 0);
1275
1276 let actual = manifest.fragments_by_offset_range(5..15);
1277 assert_eq!(actual.len(), 2);
1278 assert_eq!(actual[0].0, 0);
1279 assert_eq!(actual[0].1.id, 0);
1280 assert_eq!(actual[1].0, 10);
1281 assert_eq!(actual[1].1.id, 1);
1282
1283 let actual = manifest.fragments_by_offset_range(15..50);
1284 assert_eq!(actual.len(), 2);
1285 assert_eq!(actual[0].0, 10);
1286 assert_eq!(actual[0].1.id, 1);
1287 assert_eq!(actual[1].0, 25);
1288 assert_eq!(actual[1].1.id, 2);
1289
1290 let actual = manifest.fragments_by_offset_range(45..100);
1292 assert!(actual.is_empty());
1293
1294 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1295 }
1296
1297 #[test]
1298 fn test_max_field_id() {
1299 let mut field0 =
1301 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1302 field0.set_id(-1, &mut 0);
1303 let mut field2 =
1304 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1305 field2.set_id(-1, &mut 2);
1306
1307 let schema = Schema {
1308 fields: vec![field0, field2],
1309 metadata: Default::default(),
1310 };
1311 let fragments = vec![
1312 Fragment {
1313 id: 0,
1314 files: vec![DataFile::new_legacy_from_fields(
1315 "path1",
1316 vec![0, 1, 2],
1317 None,
1318 )],
1319 deletion_file: None,
1320 row_id_meta: None,
1321 physical_rows: None,
1322 created_at_version_meta: None,
1323 last_updated_at_version_meta: None,
1324 },
1325 Fragment {
1326 id: 1,
1327 files: vec![
1328 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1329 DataFile::new_legacy_from_fields("path3", vec![2], None),
1330 ],
1331 deletion_file: None,
1332 row_id_meta: None,
1333 physical_rows: None,
1334 created_at_version_meta: None,
1335 last_updated_at_version_meta: None,
1336 },
1337 ];
1338
1339 let manifest = Manifest::new(
1340 schema,
1341 Arc::new(fragments),
1342 DataStorageFormat::default(),
1343 HashMap::new(),
1344 );
1345
1346 assert_eq!(manifest.max_field_id(), 43);
1347 }
1348
1349 #[test]
1350 fn test_config() {
1351 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1352 "a",
1353 arrow_schema::DataType::Int64,
1354 false,
1355 )]);
1356 let schema = Schema::try_from(&arrow_schema).unwrap();
1357 let fragments = vec![
1358 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1359 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1360 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1361 ];
1362 let mut manifest = Manifest::new(
1363 schema,
1364 Arc::new(fragments),
1365 DataStorageFormat::default(),
1366 HashMap::new(),
1367 );
1368
1369 let mut config = manifest.config.clone();
1370 config.insert("lance.test".to_string(), "value".to_string());
1371 config.insert("other-key".to_string(), "other-value".to_string());
1372
1373 manifest.config_mut().extend(config.clone());
1374 assert_eq!(manifest.config, config.clone());
1375
1376 config.remove("other-key");
1377 manifest.config_mut().remove("other-key");
1378 assert_eq!(manifest.config, config);
1379 }
1380
1381 #[test]
1382 fn test_manifest_summary() {
1383 let arrow_schema = ArrowSchema::new(vec![
1385 ArrowField::new("id", arrow_schema::DataType::Int64, false),
1386 ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1387 ]);
1388 let schema = Schema::try_from(&arrow_schema).unwrap();
1389
1390 let empty_manifest = Manifest::new(
1391 schema.clone(),
1392 Arc::new(vec![]),
1393 DataStorageFormat::default(),
1394 HashMap::new(),
1395 );
1396
1397 let empty_summary = empty_manifest.summary();
1398 assert_eq!(empty_summary.total_rows, 0);
1399 assert_eq!(empty_summary.total_files_size, 0);
1400 assert_eq!(empty_summary.total_fragments, 0);
1401 assert_eq!(empty_summary.total_data_files, 0);
1402 assert_eq!(empty_summary.total_deletion_file_rows, 0);
1403 assert_eq!(empty_summary.total_data_file_rows, 0);
1404 assert_eq!(empty_summary.total_deletion_files, 0);
1405
1406 let empty_fragments = vec![
1408 Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1409 Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1410 ];
1411
1412 let empty_files_manifest = Manifest::new(
1413 schema.clone(),
1414 Arc::new(empty_fragments),
1415 DataStorageFormat::default(),
1416 HashMap::new(),
1417 );
1418
1419 let empty_files_summary = empty_files_manifest.summary();
1420 assert_eq!(empty_files_summary.total_rows, 0);
1421 assert_eq!(empty_files_summary.total_files_size, 0);
1422 assert_eq!(empty_files_summary.total_fragments, 2);
1423 assert_eq!(empty_files_summary.total_data_files, 2);
1424 assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1425 assert_eq!(empty_files_summary.total_data_file_rows, 0);
1426 assert_eq!(empty_files_summary.total_deletion_files, 0);
1427
1428 let real_fragments = vec![
1430 Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1431 Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1432 Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1433 ];
1434
1435 let real_data_manifest = Manifest::new(
1436 schema.clone(),
1437 Arc::new(real_fragments),
1438 DataStorageFormat::default(),
1439 HashMap::new(),
1440 );
1441
1442 let real_data_summary = real_data_manifest.summary();
1443 assert_eq!(real_data_summary.total_rows, 425); assert_eq!(real_data_summary.total_files_size, 0); assert_eq!(real_data_summary.total_fragments, 3);
1446 assert_eq!(real_data_summary.total_data_files, 3);
1447 assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1448 assert_eq!(real_data_summary.total_data_file_rows, 425);
1449 assert_eq!(real_data_summary.total_deletion_files, 0);
1450
1451 let file_version = LanceFileVersion::default();
1452 let mut fragment_with_deletion = Fragment::new(0)
1454 .with_file(
1455 "data_with_deletion.lance",
1456 vec![0, 1],
1457 vec![0, 1],
1458 &file_version,
1459 NonZero::new(1000),
1460 )
1461 .with_physical_rows(50);
1462 fragment_with_deletion.deletion_file = Some(DeletionFile {
1463 read_version: 123,
1464 id: 456,
1465 file_type: DeletionFileType::Array,
1466 num_deleted_rows: Some(10),
1467 base_id: None,
1468 });
1469
1470 let manifest_with_deletion = Manifest::new(
1471 schema,
1472 Arc::new(vec![fragment_with_deletion]),
1473 DataStorageFormat::default(),
1474 HashMap::new(),
1475 );
1476
1477 let deletion_summary = manifest_with_deletion.summary();
1478 assert_eq!(deletion_summary.total_rows, 40); assert_eq!(deletion_summary.total_files_size, 1000);
1480 assert_eq!(deletion_summary.total_fragments, 1);
1481 assert_eq!(deletion_summary.total_data_files, 1);
1482 assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1483 assert_eq!(deletion_summary.total_data_file_rows, 50);
1484 assert_eq!(deletion_summary.total_deletion_files, 1);
1485
1486 let stats_map: BTreeMap<String, String> = deletion_summary.into();
1488 assert_eq!(stats_map.len(), 7)
1489 }
1490}