1use async_trait::async_trait;
5use chrono::prelude::*;
6use deepsize::DeepSizeOf;
7use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
8use lance_file::previous::reader::FileReader as PreviousFileReader;
9use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
10use lance_io::traits::{ProtoStruct, Reader};
11use object_store::path::Path;
12use prost::Message;
13use prost_types::Timestamp;
14use std::collections::{BTreeMap, HashMap};
15use std::ops::Range;
16use std::sync::Arc;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::LanceCache;
22use lance_core::datatypes::Schema;
23use lance_core::{Error, Result};
24use lance_io::object_store::{ObjectStore, ObjectStoreRegistry};
25use lance_io::utils::read_struct;
26use snafu::location;
27
28#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
35pub struct Manifest {
36 pub schema: Schema,
38
39 pub version: u64,
41
42 pub branch: Option<String>,
44
45 pub writer_version: Option<WriterVersion>,
47
48 pub fragments: Arc<Vec<Fragment>>,
53
54 pub version_aux_data: usize,
56
57 pub index_section: Option<usize>,
59
60 pub timestamp_nanos: u128,
62
63 pub tag: Option<String>,
65
66 pub reader_feature_flags: u64,
68
69 pub writer_feature_flags: u64,
71
72 pub max_fragment_id: Option<u32>,
75
76 pub transaction_file: Option<String>,
78
79 pub transaction_section: Option<usize>,
81
82 fragment_offsets: Vec<usize>,
85
86 pub next_row_id: u64,
88
89 pub data_storage_format: DataStorageFormat,
91
92 pub config: HashMap<String, String>,
94
95 pub table_metadata: HashMap<String, String>,
101
102 pub base_paths: HashMap<u32, BasePath>,
104}
105
106pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
108
109pub fn is_detached_version(version: u64) -> bool {
110 version & DETACHED_VERSION_MASK != 0
111}
112
113fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
114 fragments
115 .iter()
116 .map(|f| f.num_rows().unwrap_or_default())
117 .chain([0]) .scan(0_usize, |offset, len| {
119 let start = *offset;
120 *offset += len;
121 Some(start)
122 })
123 .collect()
124}
125
126#[derive(Default)]
127pub struct ManifestSummary {
128 pub total_fragments: u64,
129 pub total_data_files: u64,
130 pub total_files_size: u64,
131 pub total_deletion_files: u64,
132 pub total_data_file_rows: u64,
133 pub total_deletion_file_rows: u64,
134 pub total_rows: u64,
135}
136
137impl From<ManifestSummary> for BTreeMap<String, String> {
138 fn from(summary: ManifestSummary) -> Self {
139 let mut stats_map = Self::new();
140 stats_map.insert(
141 "total_fragments".to_string(),
142 summary.total_fragments.to_string(),
143 );
144 stats_map.insert(
145 "total_data_files".to_string(),
146 summary.total_data_files.to_string(),
147 );
148 stats_map.insert(
149 "total_files_size".to_string(),
150 summary.total_files_size.to_string(),
151 );
152 stats_map.insert(
153 "total_deletion_files".to_string(),
154 summary.total_deletion_files.to_string(),
155 );
156 stats_map.insert(
157 "total_data_file_rows".to_string(),
158 summary.total_data_file_rows.to_string(),
159 );
160 stats_map.insert(
161 "total_deletion_file_rows".to_string(),
162 summary.total_deletion_file_rows.to_string(),
163 );
164 stats_map.insert("total_rows".to_string(), summary.total_rows.to_string());
165 stats_map
166 }
167}
168
169impl Manifest {
170 pub fn new(
171 schema: Schema,
172 fragments: Arc<Vec<Fragment>>,
173 data_storage_format: DataStorageFormat,
174 base_paths: HashMap<u32, BasePath>,
175 ) -> Self {
176 let fragment_offsets = compute_fragment_offsets(&fragments);
177
178 Self {
179 schema,
180 version: 1,
181 branch: None,
182 writer_version: Some(WriterVersion::default()),
183 fragments,
184 version_aux_data: 0,
185 index_section: None,
186 timestamp_nanos: 0,
187 tag: None,
188 reader_feature_flags: 0,
189 writer_feature_flags: 0,
190 max_fragment_id: None,
191 transaction_file: None,
192 transaction_section: None,
193 fragment_offsets,
194 next_row_id: 0,
195 data_storage_format,
196 config: HashMap::new(),
197 table_metadata: HashMap::new(),
198 base_paths,
199 }
200 }
201
202 pub fn new_from_previous(
203 previous: &Self,
204 schema: Schema,
205 fragments: Arc<Vec<Fragment>>,
206 ) -> Self {
207 let fragment_offsets = compute_fragment_offsets(&fragments);
208
209 Self {
210 schema,
211 version: previous.version + 1,
212 branch: previous.branch.clone(),
213 writer_version: Some(WriterVersion::default()),
214 fragments,
215 version_aux_data: 0,
216 index_section: None, timestamp_nanos: 0, tag: None,
219 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
222 transaction_file: None,
223 transaction_section: None,
224 fragment_offsets,
225 next_row_id: previous.next_row_id,
226 data_storage_format: previous.data_storage_format.clone(),
227 config: previous.config.clone(),
228 table_metadata: previous.table_metadata.clone(),
229 base_paths: previous.base_paths.clone(),
230 }
231 }
232
233 pub fn shallow_clone(
238 &self,
239 ref_name: Option<String>,
240 ref_path: String,
241 ref_base_id: u32,
242 branch_name: Option<String>,
243 transaction_file: String,
244 ) -> Self {
245 let cloned_fragments = self
246 .fragments
247 .as_ref()
248 .iter()
249 .map(|fragment| {
250 let mut cloned_fragment = fragment.clone();
251 for file in &mut cloned_fragment.files {
252 if file.base_id.is_none() {
253 file.base_id = Some(ref_base_id);
254 }
255 }
256
257 if let Some(deletion) = &mut cloned_fragment.deletion_file {
258 if deletion.base_id.is_none() {
259 deletion.base_id = Some(ref_base_id);
260 }
261 }
262 cloned_fragment
263 })
264 .collect::<Vec<_>>();
265
266 Self {
267 schema: self.schema.clone(),
268 version: self.version,
269 branch: branch_name,
270 writer_version: self.writer_version.clone(),
271 fragments: Arc::new(cloned_fragments),
272 version_aux_data: self.version_aux_data,
273 index_section: None, timestamp_nanos: self.timestamp_nanos,
275 tag: None,
276 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: self.max_fragment_id,
279 transaction_file: Some(transaction_file),
280 transaction_section: None,
281 fragment_offsets: self.fragment_offsets.clone(),
282 next_row_id: self.next_row_id,
283 data_storage_format: self.data_storage_format.clone(),
284 config: self.config.clone(),
285 base_paths: {
286 let mut base_paths = self.base_paths.clone();
287 let base_path = BasePath::new(ref_base_id, ref_path, ref_name, true);
288 base_paths.insert(ref_base_id, base_path);
289 base_paths
290 },
291 table_metadata: self.table_metadata.clone(),
292 }
293 }
294
295 pub fn timestamp(&self) -> DateTime<Utc> {
297 let nanos = self.timestamp_nanos % 1_000_000_000;
298 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
299 Utc.from_utc_datetime(
300 &DateTime::from_timestamp(seconds, nanos as u32)
301 .unwrap_or_default()
302 .naive_utc(),
303 )
304 }
305
306 pub fn set_timestamp(&mut self, nanos: u128) {
308 self.timestamp_nanos = nanos;
309 }
310
311 pub fn config_mut(&mut self) -> &mut HashMap<String, String> {
313 &mut self.config
314 }
315
316 pub fn table_metadata_mut(&mut self) -> &mut HashMap<String, String> {
318 &mut self.table_metadata
319 }
320
321 pub fn schema_metadata_mut(&mut self) -> &mut HashMap<String, String> {
323 &mut self.schema.metadata
324 }
325
326 pub fn field_metadata_mut(&mut self, field_id: i32) -> Option<&mut HashMap<String, String>> {
330 self.schema
331 .field_by_id_mut(field_id)
332 .map(|field| &mut field.metadata)
333 }
334
335 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
337 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
338 self.config.extend(upsert_values);
339 }
340
341 #[deprecated(note = "Use config_mut() for direct access to config HashMap")]
343 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
344 self.config
345 .retain(|key, _| !delete_keys.contains(&key.as_str()));
346 }
347
348 #[deprecated(note = "Use schema_metadata_mut() for direct access to schema metadata HashMap")]
350 pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
351 self.schema.metadata = new_metadata;
352 }
353
354 #[deprecated(
358 note = "Use field_metadata_mut(field_id) for direct access to field metadata HashMap"
359 )]
360 pub fn replace_field_metadata(
361 &mut self,
362 field_id: i32,
363 new_metadata: HashMap<String, String>,
364 ) -> Result<()> {
365 if let Some(field) = self.schema.field_by_id_mut(field_id) {
366 field.metadata = new_metadata;
367 Ok(())
368 } else {
369 Err(Error::invalid_input(
370 format!(
371 "Field with id {} does not exist for replace_field_metadata",
372 field_id
373 ),
374 location!(),
375 ))
376 }
377 }
378
379 pub fn update_max_fragment_id(&mut self) {
381 if self.fragments.is_empty() {
383 return;
384 }
385
386 let max_fragment_id = self
387 .fragments
388 .iter()
389 .map(|f| f.id)
390 .max()
391 .unwrap() .try_into()
393 .unwrap();
394
395 match self.max_fragment_id {
396 None => {
397 self.max_fragment_id = Some(max_fragment_id);
399 }
400 Some(current_max) => {
401 if max_fragment_id > current_max {
404 self.max_fragment_id = Some(max_fragment_id);
405 }
406 }
407 }
408 }
409
410 pub fn max_fragment_id(&self) -> Option<u64> {
415 if let Some(max_id) = self.max_fragment_id {
416 Some(max_id.into())
418 } else {
419 self.fragments.iter().map(|f| f.id).max()
421 }
422 }
423
424 pub fn max_field_id(&self) -> i32 {
429 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
430 let fragment_max_id = self
431 .fragments
432 .iter()
433 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
434 .max()
435 .copied();
436 let fragment_max_id = fragment_max_id.unwrap_or(-1);
437 schema_max_id.max(fragment_max_id)
438 }
439
440 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
443 if since.version >= self.version {
444 return Err(Error::io(
445 format!(
446 "fragments_since: given version {} is newer than manifest version {}",
447 since.version, self.version
448 ),
449 location!(),
450 ));
451 }
452 let start = since.max_fragment_id();
453 Ok(self
454 .fragments
455 .iter()
456 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
457 .cloned()
458 .collect())
459 }
460
461 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
477 let start = range.start;
478 let end = range.end;
479 let idx = self
480 .fragment_offsets
481 .binary_search(&start)
482 .unwrap_or_else(|idx| idx - 1);
483
484 let mut fragments = vec![];
485 for i in idx..self.fragments.len() {
486 if self.fragment_offsets[i] >= end
487 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
488 <= start
489 {
490 break;
491 }
492 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
493 }
494
495 fragments
496 }
497
498 pub fn uses_stable_row_ids(&self) -> bool {
500 self.reader_feature_flags & FLAG_STABLE_ROW_IDS != 0
501 }
502
503 pub fn serialized(&self) -> Vec<u8> {
506 let pb_manifest: pb::Manifest = self.into();
507 pb_manifest.encode_to_vec()
508 }
509
510 pub fn should_use_legacy_format(&self) -> bool {
511 self.data_storage_format.version == LEGACY_FORMAT_VERSION
512 }
513
514 pub fn summary(&self) -> ManifestSummary {
525 let mut summary =
527 self.fragments
528 .iter()
529 .fold(ManifestSummary::default(), |mut summary, f| {
530 summary.total_data_files += f.files.len() as u64;
532 if let Some(num_rows) = f.num_rows() {
534 summary.total_rows += num_rows as u64;
535 }
536 for data_file in &f.files {
538 if let Some(size_bytes) = data_file.file_size_bytes.get() {
539 summary.total_files_size += size_bytes.get();
540 }
541 }
542 if f.deletion_file.is_some() {
544 summary.total_deletion_files += 1;
545 }
546 if let Some(deletion_file) = &f.deletion_file {
548 if let Some(num_deleted) = deletion_file.num_deleted_rows {
549 summary.total_deletion_file_rows += num_deleted as u64;
550 }
551 }
552 summary
553 });
554 summary.total_fragments = self.fragments.len() as u64;
555 summary.total_data_file_rows = summary.total_rows + summary.total_deletion_file_rows;
556
557 summary
558 }
559}
560
561#[derive(Debug, Clone, PartialEq)]
562pub struct BasePath {
563 pub id: u32,
564 pub name: Option<String>,
565 pub is_dataset_root: bool,
566 pub path: String,
568}
569
570impl BasePath {
571 pub fn new(id: u32, path: String, name: Option<String>, is_dataset_root: bool) -> Self {
580 Self {
581 id,
582 name,
583 is_dataset_root,
584 path,
585 }
586 }
587
588 pub fn extract_path(&self, registry: Arc<ObjectStoreRegistry>) -> Result<Path> {
592 ObjectStore::extract_path_from_uri(registry, &self.path)
593 }
594}
595
596impl DeepSizeOf for BasePath {
597 fn deep_size_of_children(&self, context: &mut deepsize::Context) -> usize {
598 self.name.deep_size_of_children(context)
599 + self.path.deep_size_of_children(context) * 2
600 + size_of::<bool>()
601 }
602}
603
604#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
605pub struct WriterVersion {
606 pub library: String,
607 pub version: String,
608 pub prerelease: Option<String>,
609 pub build_metadata: Option<String>,
610}
611
612#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
613pub struct DataStorageFormat {
614 pub file_format: String,
615 pub version: String,
616}
617
618const LANCE_FORMAT_NAME: &str = "lance";
619
620impl DataStorageFormat {
621 pub fn new(version: LanceFileVersion) -> Self {
622 Self {
623 file_format: LANCE_FORMAT_NAME.to_string(),
624 version: version.resolve().to_string(),
625 }
626 }
627
628 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
629 self.version.parse::<LanceFileVersion>()
630 }
631}
632
633impl Default for DataStorageFormat {
634 fn default() -> Self {
635 Self::new(LanceFileVersion::default())
636 }
637}
638
639impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
640 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
641 Self {
642 file_format: pb.file_format,
643 version: pb.version,
644 }
645 }
646}
647
648#[derive(Debug, Clone, Copy, PartialEq, Eq)]
649pub enum VersionPart {
650 Major,
651 Minor,
652 Patch,
653}
654
655fn bump_version(version: &mut semver::Version, part: VersionPart) {
656 match part {
657 VersionPart::Major => {
658 version.major += 1;
659 version.minor = 0;
660 version.patch = 0;
661 }
662 VersionPart::Minor => {
663 version.minor += 1;
664 version.patch = 0;
665 }
666 VersionPart::Patch => {
667 version.patch += 1;
668 }
669 }
670}
671
672impl WriterVersion {
673 fn split_version(full_version: &str) -> Option<(String, Option<String>, Option<String>)> {
683 let mut parsed = semver::Version::parse(full_version).ok()?;
684
685 let prerelease = if parsed.pre.is_empty() {
686 None
687 } else {
688 Some(parsed.pre.to_string())
689 };
690
691 let build_metadata = if parsed.build.is_empty() {
692 None
693 } else {
694 Some(parsed.build.to_string())
695 };
696
697 parsed.pre = semver::Prerelease::EMPTY;
699 parsed.build = semver::BuildMetadata::EMPTY;
700 Some((parsed.to_string(), prerelease, build_metadata))
701 }
702
703 #[deprecated(note = "Use `lance_lib_version()` instead")]
706 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
707 let (version_part, tag) = if let Some(dash_idx) = self.version.find('-') {
709 (
710 &self.version[..dash_idx],
711 Some(&self.version[dash_idx + 1..]),
712 )
713 } else {
714 (self.version.as_str(), None)
715 };
716
717 let mut parts = version_part.split('.');
718 let major = parts.next().unwrap_or("0").parse().ok()?;
719 let minor = parts.next().unwrap_or("0").parse().ok()?;
720 let patch = parts.next().unwrap_or("0").parse().ok()?;
721
722 Some((major, minor, patch, tag))
723 }
724
725 pub fn lance_lib_version(&self) -> Option<semver::Version> {
733 if self.library != "lance" {
734 return None;
735 }
736
737 let mut version = semver::Version::parse(&self.version).ok()?;
738
739 if let Some(ref prerelease) = self.prerelease {
740 version.pre = semver::Prerelease::new(prerelease).ok()?;
741 }
742
743 if let Some(ref build_metadata) = self.build_metadata {
744 version.build = semver::BuildMetadata::new(build_metadata).ok()?;
745 }
746
747 Some(version)
748 }
749
750 #[deprecated(
751 note = "Use `lance_lib_version()` instead, which safely checks the library field and returns Option"
752 )]
753 #[allow(deprecated)]
754 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
755 self.semver()
756 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
757 }
758
759 #[deprecated(note = "Use `lance_lib_version()` and its `older_than` method instead.")]
765 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
766 let version = self
767 .lance_lib_version()
768 .expect("Not lance library or invalid version");
769 let other = semver::Version {
770 major: major.into(),
771 minor: minor.into(),
772 patch: patch.into(),
773 pre: semver::Prerelease::EMPTY,
774 build: semver::BuildMetadata::EMPTY,
775 };
776 version < other
777 }
778
779 #[deprecated(note = "This is meant for testing and will be made private in future version.")]
780 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
781 let mut version = self.lance_lib_version().expect("Should be lance version");
782 bump_version(&mut version, part);
783 if !keep_tag {
784 version.pre = semver::Prerelease::EMPTY;
785 }
786 let (clean_version, prerelease, build_metadata) = Self::split_version(&version.to_string())
787 .expect("Bumped version should be valid semver");
788 Self {
789 library: self.library.clone(),
790 version: clean_version,
791 prerelease,
792 build_metadata,
793 }
794 }
795}
796
797impl Default for WriterVersion {
798 #[cfg(not(test))]
799 fn default() -> Self {
800 let full_version = env!("CARGO_PKG_VERSION");
801 let (version, prerelease, build_metadata) =
802 Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
803 Self {
804 library: "lance".to_string(),
805 version,
806 prerelease,
807 build_metadata,
808 }
809 }
810
811 #[cfg(test)]
813 #[allow(deprecated)]
814 fn default() -> Self {
815 let full_version = env!("CARGO_PKG_VERSION");
816 let (version, prerelease, build_metadata) =
817 Self::split_version(full_version).expect("CARGO_PKG_VERSION should be valid semver");
818 Self {
819 library: "lance".to_string(),
820 version,
821 prerelease,
822 build_metadata,
823 }
824 .bump(VersionPart::Patch, true)
825 }
826}
827
828impl ProtoStruct for Manifest {
829 type Proto = pb::Manifest;
830}
831
832impl From<pb::BasePath> for BasePath {
833 fn from(p: pb::BasePath) -> Self {
834 Self::new(p.id, p.path, p.name, p.is_dataset_root)
835 }
836}
837
838impl From<BasePath> for pb::BasePath {
839 fn from(p: BasePath) -> Self {
840 Self {
841 id: p.id,
842 name: p.name,
843 is_dataset_root: p.is_dataset_root,
844 path: p.path,
845 }
846 }
847}
848
849impl TryFrom<pb::Manifest> for Manifest {
850 type Error = Error;
851
852 fn try_from(p: pb::Manifest) -> Result<Self> {
853 let timestamp_nanos = p.timestamp.map(|ts| {
854 let sec = ts.seconds as u128 * 1e9 as u128;
855 let nanos = ts.nanos as u128;
856 sec + nanos
857 });
858 let writer_version = match p.writer_version {
860 Some(pb::manifest::WriterVersion {
861 library,
862 version,
863 prerelease,
864 build_metadata,
865 }) => Some(WriterVersion {
866 library,
867 version,
868 prerelease,
869 build_metadata,
870 }),
871 _ => None,
872 };
873 let fragments = Arc::new(
874 p.fragments
875 .into_iter()
876 .map(Fragment::try_from)
877 .collect::<Result<Vec<_>>>()?,
878 );
879 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
880 let fields_with_meta = FieldsWithMeta {
881 fields: Fields(p.fields),
882 metadata: p.schema_metadata,
883 };
884
885 if FLAG_STABLE_ROW_IDS & p.reader_feature_flags != 0
886 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
887 {
888 return Err(Error::Internal {
889 message: "All fragments must have row ids".into(),
890 location: location!(),
891 });
892 }
893
894 let data_storage_format = match p.data_format {
895 None => {
896 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
897 DataStorageFormat::new(inferred_version)
899 } else {
900 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
902 DataStorageFormat::new(LanceFileVersion::Stable)
903 } else {
904 DataStorageFormat::new(LanceFileVersion::Legacy)
905 }
906 }
907 }
908 Some(format) => DataStorageFormat::from(format),
909 };
910
911 let schema = Schema::from(fields_with_meta);
912
913 Ok(Self {
914 schema,
915 version: p.version,
916 branch: p.branch,
917 writer_version,
918 version_aux_data: p.version_aux_data as usize,
919 index_section: p.index_section.map(|i| i as usize),
920 timestamp_nanos: timestamp_nanos.unwrap_or(0),
921 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
922 reader_feature_flags: p.reader_feature_flags,
923 writer_feature_flags: p.writer_feature_flags,
924 max_fragment_id: p.max_fragment_id,
925 fragments,
926 transaction_file: if p.transaction_file.is_empty() {
927 None
928 } else {
929 Some(p.transaction_file)
930 },
931 transaction_section: p.transaction_section.map(|i| i as usize),
932 fragment_offsets,
933 next_row_id: p.next_row_id,
934 data_storage_format,
935 config: p.config,
936 table_metadata: p.table_metadata,
937 base_paths: p
938 .base_paths
939 .iter()
940 .map(|item| (item.id, item.clone().into()))
941 .collect(),
942 })
943 }
944}
945
946impl From<&Manifest> for pb::Manifest {
947 fn from(m: &Manifest) -> Self {
948 let timestamp_nanos = if m.timestamp_nanos == 0 {
949 None
950 } else {
951 let nanos = m.timestamp_nanos % 1e9 as u128;
952 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
953 Some(Timestamp {
954 seconds,
955 nanos: nanos as i32,
956 })
957 };
958 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
959 Self {
960 fields: fields_with_meta.fields.0,
961 schema_metadata: m
962 .schema
963 .metadata
964 .iter()
965 .map(|(k, v)| (k.clone(), v.as_bytes().to_vec()))
966 .collect(),
967 version: m.version,
968 branch: m.branch.clone(),
969 writer_version: m
970 .writer_version
971 .as_ref()
972 .map(|wv| pb::manifest::WriterVersion {
973 library: wv.library.clone(),
974 version: wv.version.clone(),
975 prerelease: wv.prerelease.clone(),
976 build_metadata: wv.build_metadata.clone(),
977 }),
978 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
979 table_metadata: m.table_metadata.clone(),
980 version_aux_data: m.version_aux_data as u64,
981 index_section: m.index_section.map(|i| i as u64),
982 timestamp: timestamp_nanos,
983 tag: m.tag.clone().unwrap_or_default(),
984 reader_feature_flags: m.reader_feature_flags,
985 writer_feature_flags: m.writer_feature_flags,
986 max_fragment_id: m.max_fragment_id,
987 transaction_file: m.transaction_file.clone().unwrap_or_default(),
988 next_row_id: m.next_row_id,
989 data_format: Some(pb::manifest::DataStorageFormat {
990 file_format: m.data_storage_format.file_format.clone(),
991 version: m.data_storage_format.version.clone(),
992 }),
993 config: m.config.clone(),
994 base_paths: m
995 .base_paths
996 .values()
997 .map(|base_path| pb::BasePath {
998 id: base_path.id,
999 name: base_path.name.clone(),
1000 is_dataset_root: base_path.is_dataset_root,
1001 path: base_path.path.clone(),
1002 })
1003 .collect(),
1004 transaction_section: m.transaction_section.map(|i| i as u64),
1005 }
1006 }
1007}
1008
1009#[async_trait]
1010pub trait SelfDescribingFileReader {
1011 async fn try_new_self_described(
1019 object_store: &ObjectStore,
1020 path: &Path,
1021 cache: Option<&LanceCache>,
1022 ) -> Result<Self>
1023 where
1024 Self: Sized,
1025 {
1026 let reader = object_store.open(path).await?;
1027 Self::try_new_self_described_from_reader(reader.into(), cache).await
1028 }
1029
1030 async fn try_new_self_described_from_reader(
1031 reader: Arc<dyn Reader>,
1032 cache: Option<&LanceCache>,
1033 ) -> Result<Self>
1034 where
1035 Self: Sized;
1036}
1037
1038#[async_trait]
1039impl SelfDescribingFileReader for PreviousFileReader {
1040 async fn try_new_self_described_from_reader(
1041 reader: Arc<dyn Reader>,
1042 cache: Option<&LanceCache>,
1043 ) -> Result<Self> {
1044 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
1045 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
1046 message: format!(
1047 "Attempt to open file at {} as self-describing but it did not contain a manifest",
1048 reader.path(),
1049 ),
1050 location: location!(),
1051 })?;
1052 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
1053 if manifest.should_use_legacy_format() {
1054 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
1055 }
1056 let schema = manifest.schema;
1057 let max_field_id = schema.max_field_id().unwrap_or_default();
1058 Self::try_new_from_reader(
1059 reader.path(),
1060 reader.clone(),
1061 Some(metadata),
1062 schema,
1063 0,
1064 0,
1065 max_field_id,
1066 cache,
1067 )
1068 .await
1069 }
1070}
1071
1072#[cfg(test)]
1073mod tests {
1074 use crate::format::{DataFile, DeletionFile, DeletionFileType};
1075 use std::num::NonZero;
1076
1077 use super::*;
1078
1079 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
1080 use lance_core::datatypes::Field;
1081
1082 #[test]
1083 fn test_writer_version() {
1084 let wv = WriterVersion::default();
1085 assert_eq!(wv.library, "lance");
1086
1087 let cargo_version = env!("CARGO_PKG_VERSION");
1089 let expected_tag = if cargo_version.contains('-') {
1090 Some(cargo_version.split('-').nth(1).unwrap())
1091 } else {
1092 None
1093 };
1094
1095 let version_parts: Vec<&str> = wv.version.split('.').collect();
1097 assert_eq!(
1098 version_parts.len(),
1099 3,
1100 "Version should be major.minor.patch"
1101 );
1102 assert!(
1103 !wv.version.contains('-'),
1104 "Version field should not contain prerelease"
1105 );
1106
1107 assert_eq!(wv.prerelease.as_deref(), expected_tag);
1109 assert_eq!(wv.build_metadata, None);
1111
1112 let version = wv.lance_lib_version().unwrap();
1114 assert_eq!(
1115 version.major,
1116 env!("CARGO_PKG_VERSION_MAJOR").parse::<u64>().unwrap()
1117 );
1118 assert_eq!(
1119 version.minor,
1120 env!("CARGO_PKG_VERSION_MINOR").parse::<u64>().unwrap()
1121 );
1122 assert_eq!(
1123 version.patch,
1124 env!("CARGO_PKG_VERSION_PATCH").parse::<u64>().unwrap() + 1
1126 );
1127 assert_eq!(version.pre.as_str(), expected_tag.unwrap_or(""));
1128
1129 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
1130 let mut bumped_version = version.clone();
1131 bump_version(&mut bumped_version, *part);
1132 assert!(version < bumped_version);
1133 }
1134 }
1135
1136 #[test]
1137 fn test_writer_version_split() {
1138 let (version, prerelease, build_metadata) =
1140 WriterVersion::split_version("2.0.0-rc.1").unwrap();
1141 assert_eq!(version, "2.0.0");
1142 assert_eq!(prerelease, Some("rc.1".to_string()));
1143 assert_eq!(build_metadata, None);
1144
1145 let (version, prerelease, build_metadata) = WriterVersion::split_version("2.0.0").unwrap();
1147 assert_eq!(version, "2.0.0");
1148 assert_eq!(prerelease, None);
1149 assert_eq!(build_metadata, None);
1150
1151 let (version, prerelease, build_metadata) =
1153 WriterVersion::split_version("2.0.0-rc.1+build.123").unwrap();
1154 assert_eq!(version, "2.0.0");
1155 assert_eq!(prerelease, Some("rc.1".to_string()));
1156 assert_eq!(build_metadata, Some("build.123".to_string()));
1157
1158 let (version, prerelease, build_metadata) =
1160 WriterVersion::split_version("2.0.0+build.123").unwrap();
1161 assert_eq!(version, "2.0.0");
1162 assert_eq!(prerelease, None);
1163 assert_eq!(build_metadata, Some("build.123".to_string()));
1164
1165 assert!(WriterVersion::split_version("not-a-version").is_none());
1167 }
1168
1169 #[test]
1170 fn test_writer_version_comparison_with_prerelease() {
1171 let v1 = WriterVersion {
1172 library: "lance".to_string(),
1173 version: "2.0.0".to_string(),
1174 prerelease: Some("rc.1".to_string()),
1175 build_metadata: None,
1176 };
1177
1178 let v2 = WriterVersion {
1179 library: "lance".to_string(),
1180 version: "2.0.0".to_string(),
1181 prerelease: None,
1182 build_metadata: None,
1183 };
1184
1185 let semver1 = v1.lance_lib_version().unwrap();
1186 let semver2 = v2.lance_lib_version().unwrap();
1187
1188 assert!(semver1 < semver2);
1190 }
1191
1192 #[test]
1193 fn test_writer_version_with_build_metadata() {
1194 let v = WriterVersion {
1195 library: "lance".to_string(),
1196 version: "2.0.0".to_string(),
1197 prerelease: Some("rc.1".to_string()),
1198 build_metadata: Some("build.123".to_string()),
1199 };
1200
1201 let semver = v.lance_lib_version().unwrap();
1202 assert_eq!(semver.to_string(), "2.0.0-rc.1+build.123");
1203 assert_eq!(semver.major, 2);
1204 assert_eq!(semver.minor, 0);
1205 assert_eq!(semver.patch, 0);
1206 assert_eq!(semver.pre.as_str(), "rc.1");
1207 assert_eq!(semver.build.as_str(), "build.123");
1208 }
1209
1210 #[test]
1211 fn test_writer_version_non_semver() {
1212 let v = WriterVersion {
1214 library: "lance".to_string(),
1215 version: "custom-build-v1".to_string(),
1216 prerelease: None,
1217 build_metadata: None,
1218 };
1219
1220 assert!(v.lance_lib_version().is_none());
1222
1223 assert_eq!(v.library, "lance");
1225 assert_eq!(v.version, "custom-build-v1");
1226 }
1227
1228 #[test]
1229 #[allow(deprecated)]
1230 fn test_older_than_with_prerelease() {
1231 let v_rc = WriterVersion {
1233 library: "lance".to_string(),
1234 version: "2.0.0".to_string(),
1235 prerelease: Some("rc.1".to_string()),
1236 build_metadata: None,
1237 };
1238
1239 assert!(v_rc.older_than(2, 0, 0));
1241
1242 assert!(v_rc.older_than(2, 0, 1));
1244
1245 assert!(!v_rc.older_than(1, 9, 9));
1247
1248 let v_release = WriterVersion {
1249 library: "lance".to_string(),
1250 version: "2.0.0".to_string(),
1251 prerelease: None,
1252 build_metadata: None,
1253 };
1254
1255 assert!(!v_release.older_than(2, 0, 0));
1257
1258 assert!(v_release.older_than(2, 0, 1));
1260 }
1261
1262 #[test]
1263 fn test_fragments_by_offset_range() {
1264 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1265 "a",
1266 arrow_schema::DataType::Int64,
1267 false,
1268 )]);
1269 let schema = Schema::try_from(&arrow_schema).unwrap();
1270 let fragments = vec![
1271 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1272 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1273 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1274 ];
1275 let manifest = Manifest::new(
1276 schema,
1277 Arc::new(fragments),
1278 DataStorageFormat::default(),
1279 HashMap::new(),
1280 );
1281
1282 let actual = manifest.fragments_by_offset_range(0..10);
1283 assert_eq!(actual.len(), 1);
1284 assert_eq!(actual[0].0, 0);
1285 assert_eq!(actual[0].1.id, 0);
1286
1287 let actual = manifest.fragments_by_offset_range(5..15);
1288 assert_eq!(actual.len(), 2);
1289 assert_eq!(actual[0].0, 0);
1290 assert_eq!(actual[0].1.id, 0);
1291 assert_eq!(actual[1].0, 10);
1292 assert_eq!(actual[1].1.id, 1);
1293
1294 let actual = manifest.fragments_by_offset_range(15..50);
1295 assert_eq!(actual.len(), 2);
1296 assert_eq!(actual[0].0, 10);
1297 assert_eq!(actual[0].1.id, 1);
1298 assert_eq!(actual[1].0, 25);
1299 assert_eq!(actual[1].1.id, 2);
1300
1301 let actual = manifest.fragments_by_offset_range(45..100);
1303 assert!(actual.is_empty());
1304
1305 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
1306 }
1307
1308 #[test]
1309 fn test_max_field_id() {
1310 let mut field0 =
1312 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
1313 field0.set_id(-1, &mut 0);
1314 let mut field2 =
1315 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
1316 field2.set_id(-1, &mut 2);
1317
1318 let schema = Schema {
1319 fields: vec![field0, field2],
1320 metadata: Default::default(),
1321 };
1322 let fragments = vec![
1323 Fragment {
1324 id: 0,
1325 files: vec![DataFile::new_legacy_from_fields(
1326 "path1",
1327 vec![0, 1, 2],
1328 None,
1329 )],
1330 deletion_file: None,
1331 row_id_meta: None,
1332 physical_rows: None,
1333 created_at_version_meta: None,
1334 last_updated_at_version_meta: None,
1335 },
1336 Fragment {
1337 id: 1,
1338 files: vec![
1339 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43], None),
1340 DataFile::new_legacy_from_fields("path3", vec![2], None),
1341 ],
1342 deletion_file: None,
1343 row_id_meta: None,
1344 physical_rows: None,
1345 created_at_version_meta: None,
1346 last_updated_at_version_meta: None,
1347 },
1348 ];
1349
1350 let manifest = Manifest::new(
1351 schema,
1352 Arc::new(fragments),
1353 DataStorageFormat::default(),
1354 HashMap::new(),
1355 );
1356
1357 assert_eq!(manifest.max_field_id(), 43);
1358 }
1359
1360 #[test]
1361 fn test_config() {
1362 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
1363 "a",
1364 arrow_schema::DataType::Int64,
1365 false,
1366 )]);
1367 let schema = Schema::try_from(&arrow_schema).unwrap();
1368 let fragments = vec![
1369 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
1370 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
1371 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
1372 ];
1373 let mut manifest = Manifest::new(
1374 schema,
1375 Arc::new(fragments),
1376 DataStorageFormat::default(),
1377 HashMap::new(),
1378 );
1379
1380 let mut config = manifest.config.clone();
1381 config.insert("lance.test".to_string(), "value".to_string());
1382 config.insert("other-key".to_string(), "other-value".to_string());
1383
1384 manifest.config_mut().extend(config.clone());
1385 assert_eq!(manifest.config, config.clone());
1386
1387 config.remove("other-key");
1388 manifest.config_mut().remove("other-key");
1389 assert_eq!(manifest.config, config);
1390 }
1391
1392 #[test]
1393 fn test_manifest_summary() {
1394 let arrow_schema = ArrowSchema::new(vec![
1396 ArrowField::new("id", arrow_schema::DataType::Int64, false),
1397 ArrowField::new("name", arrow_schema::DataType::Utf8, true),
1398 ]);
1399 let schema = Schema::try_from(&arrow_schema).unwrap();
1400
1401 let empty_manifest = Manifest::new(
1402 schema.clone(),
1403 Arc::new(vec![]),
1404 DataStorageFormat::default(),
1405 HashMap::new(),
1406 );
1407
1408 let empty_summary = empty_manifest.summary();
1409 assert_eq!(empty_summary.total_rows, 0);
1410 assert_eq!(empty_summary.total_files_size, 0);
1411 assert_eq!(empty_summary.total_fragments, 0);
1412 assert_eq!(empty_summary.total_data_files, 0);
1413 assert_eq!(empty_summary.total_deletion_file_rows, 0);
1414 assert_eq!(empty_summary.total_data_file_rows, 0);
1415 assert_eq!(empty_summary.total_deletion_files, 0);
1416
1417 let empty_fragments = vec![
1419 Fragment::with_file_legacy(0, "empty_file1.lance", &schema, Some(0)),
1420 Fragment::with_file_legacy(1, "empty_file2.lance", &schema, Some(0)),
1421 ];
1422
1423 let empty_files_manifest = Manifest::new(
1424 schema.clone(),
1425 Arc::new(empty_fragments),
1426 DataStorageFormat::default(),
1427 HashMap::new(),
1428 );
1429
1430 let empty_files_summary = empty_files_manifest.summary();
1431 assert_eq!(empty_files_summary.total_rows, 0);
1432 assert_eq!(empty_files_summary.total_files_size, 0);
1433 assert_eq!(empty_files_summary.total_fragments, 2);
1434 assert_eq!(empty_files_summary.total_data_files, 2);
1435 assert_eq!(empty_files_summary.total_deletion_file_rows, 0);
1436 assert_eq!(empty_files_summary.total_data_file_rows, 0);
1437 assert_eq!(empty_files_summary.total_deletion_files, 0);
1438
1439 let real_fragments = vec![
1441 Fragment::with_file_legacy(0, "data_file1.lance", &schema, Some(100)),
1442 Fragment::with_file_legacy(1, "data_file2.lance", &schema, Some(250)),
1443 Fragment::with_file_legacy(2, "data_file3.lance", &schema, Some(75)),
1444 ];
1445
1446 let real_data_manifest = Manifest::new(
1447 schema.clone(),
1448 Arc::new(real_fragments),
1449 DataStorageFormat::default(),
1450 HashMap::new(),
1451 );
1452
1453 let real_data_summary = real_data_manifest.summary();
1454 assert_eq!(real_data_summary.total_rows, 425); assert_eq!(real_data_summary.total_files_size, 0); assert_eq!(real_data_summary.total_fragments, 3);
1457 assert_eq!(real_data_summary.total_data_files, 3);
1458 assert_eq!(real_data_summary.total_deletion_file_rows, 0);
1459 assert_eq!(real_data_summary.total_data_file_rows, 425);
1460 assert_eq!(real_data_summary.total_deletion_files, 0);
1461
1462 let file_version = LanceFileVersion::default();
1463 let mut fragment_with_deletion = Fragment::new(0)
1465 .with_file(
1466 "data_with_deletion.lance",
1467 vec![0, 1],
1468 vec![0, 1],
1469 &file_version,
1470 NonZero::new(1000),
1471 )
1472 .with_physical_rows(50);
1473 fragment_with_deletion.deletion_file = Some(DeletionFile {
1474 read_version: 123,
1475 id: 456,
1476 file_type: DeletionFileType::Array,
1477 num_deleted_rows: Some(10),
1478 base_id: None,
1479 });
1480
1481 let manifest_with_deletion = Manifest::new(
1482 schema,
1483 Arc::new(vec![fragment_with_deletion]),
1484 DataStorageFormat::default(),
1485 HashMap::new(),
1486 );
1487
1488 let deletion_summary = manifest_with_deletion.summary();
1489 assert_eq!(deletion_summary.total_rows, 40); assert_eq!(deletion_summary.total_files_size, 1000);
1491 assert_eq!(deletion_summary.total_fragments, 1);
1492 assert_eq!(deletion_summary.total_data_files, 1);
1493 assert_eq!(deletion_summary.total_deletion_file_rows, 10);
1494 assert_eq!(deletion_summary.total_data_file_rows, 50);
1495 assert_eq!(deletion_summary.total_deletion_files, 1);
1496
1497 let stats_map: BTreeMap<String, String> = deletion_summary.into();
1499 assert_eq!(stats_map.len(), 7)
1500 }
1501}