1use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use deepsize::DeepSizeOf;
11use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
12use lance_file::reader::FileReader;
13use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
14use lance_io::traits::{ProtoStruct, Reader};
15use object_store::path::Path;
16use prost::Message;
17use prost_types::Timestamp;
18
19use super::Fragment;
20use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_MOVE_STABLE_ROW_IDS};
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::{Schema, StorageClass};
24use lance_core::{Error, Result};
25use lance_io::object_store::ObjectStore;
26use lance_io::utils::read_struct;
27use snafu::location;
28
29#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
36pub struct Manifest {
37 pub schema: Schema,
39
40 pub local_schema: Schema,
42
43 pub version: u64,
45
46 pub writer_version: Option<WriterVersion>,
48
49 pub fragments: Arc<Vec<Fragment>>,
54
55 pub version_aux_data: usize,
57
58 pub index_section: Option<usize>,
60
61 pub timestamp_nanos: u128,
63
64 pub tag: Option<String>,
66
67 pub reader_feature_flags: u64,
69
70 pub writer_feature_flags: u64,
72
73 pub max_fragment_id: Option<u32>,
76
77 pub transaction_file: Option<String>,
79
80 fragment_offsets: Vec<usize>,
83
84 pub next_row_id: u64,
86
87 pub data_storage_format: DataStorageFormat,
89
90 pub config: HashMap<String, String>,
92
93 pub blob_dataset_version: Option<u64>,
95}
96
97pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
99
100pub fn is_detached_version(version: u64) -> bool {
101 version & DETACHED_VERSION_MASK != 0
102}
103
104fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
105 fragments
106 .iter()
107 .map(|f| f.num_rows().unwrap_or_default())
108 .chain([0]) .scan(0_usize, |offset, len| {
110 let start = *offset;
111 *offset += len;
112 Some(start)
113 })
114 .collect()
115}
116
117impl Manifest {
118 pub fn new(
119 schema: Schema,
120 fragments: Arc<Vec<Fragment>>,
121 data_storage_format: DataStorageFormat,
122 blob_dataset_version: Option<u64>,
123 ) -> Self {
124 let fragment_offsets = compute_fragment_offsets(&fragments);
125 let local_schema = schema.retain_storage_class(StorageClass::Default);
126
127 Self {
128 schema,
129 local_schema,
130 version: 1,
131 writer_version: Some(WriterVersion::default()),
132 fragments,
133 version_aux_data: 0,
134 index_section: None,
135 timestamp_nanos: 0,
136 tag: None,
137 reader_feature_flags: 0,
138 writer_feature_flags: 0,
139 max_fragment_id: None,
140 transaction_file: None,
141 fragment_offsets,
142 next_row_id: 0,
143 data_storage_format,
144 config: HashMap::new(),
145 blob_dataset_version,
146 }
147 }
148
149 pub fn new_from_previous(
150 previous: &Self,
151 schema: Schema,
152 fragments: Arc<Vec<Fragment>>,
153 new_blob_version: Option<u64>,
154 ) -> Self {
155 let fragment_offsets = compute_fragment_offsets(&fragments);
156 let local_schema = schema.retain_storage_class(StorageClass::Default);
157
158 let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
159
160 Self {
161 schema,
162 local_schema,
163 version: previous.version + 1,
164 writer_version: Some(WriterVersion::default()),
165 fragments,
166 version_aux_data: 0,
167 index_section: None, timestamp_nanos: 0, tag: None,
170 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
173 transaction_file: None,
174 fragment_offsets,
175 next_row_id: previous.next_row_id,
176 data_storage_format: previous.data_storage_format.clone(),
177 config: previous.config.clone(),
178 blob_dataset_version,
179 }
180 }
181
182 pub fn timestamp(&self) -> DateTime<Utc> {
184 let nanos = self.timestamp_nanos % 1_000_000_000;
185 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
186 Utc.from_utc_datetime(
187 &DateTime::from_timestamp(seconds, nanos as u32)
188 .unwrap_or_default()
189 .naive_utc(),
190 )
191 }
192
193 pub fn set_timestamp(&mut self, nanos: u128) {
195 self.timestamp_nanos = nanos;
196 }
197
198 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
200 self.config.extend(upsert_values);
201 }
202
203 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
205 self.config
206 .retain(|key, _| !delete_keys.contains(&key.as_str()));
207 }
208
209 pub fn replace_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
211 self.schema.metadata = new_metadata;
212 }
213
214 pub fn replace_field_metadata(
218 &mut self,
219 field_id: i32,
220 new_metadata: HashMap<String, String>,
221 ) -> Result<()> {
222 if let Some(field) = self.schema.field_by_id_mut(field_id) {
223 field.metadata = new_metadata;
224 Ok(())
225 } else {
226 Err(Error::invalid_input(
227 format!(
228 "Field with id {} does not exist for replace_field_metadata",
229 field_id
230 ),
231 location!(),
232 ))
233 }
234 }
235
236 pub fn update_max_fragment_id(&mut self) {
238 if self.fragments.is_empty() {
240 return;
241 }
242
243 let max_fragment_id = self
244 .fragments
245 .iter()
246 .map(|f| f.id)
247 .max()
248 .unwrap() .try_into()
250 .unwrap();
251
252 match self.max_fragment_id {
253 None => {
254 self.max_fragment_id = Some(max_fragment_id);
256 }
257 Some(current_max) => {
258 if max_fragment_id > current_max {
261 self.max_fragment_id = Some(max_fragment_id);
262 }
263 }
264 }
265 }
266
267 pub fn max_fragment_id(&self) -> Option<u64> {
272 if let Some(max_id) = self.max_fragment_id {
273 Some(max_id.into())
275 } else {
276 self.fragments.iter().map(|f| f.id).max()
278 }
279 }
280
281 pub fn max_field_id(&self) -> i32 {
286 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
287 let fragment_max_id = self
288 .fragments
289 .iter()
290 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
291 .max()
292 .copied();
293 let fragment_max_id = fragment_max_id.unwrap_or(-1);
294 schema_max_id.max(fragment_max_id)
295 }
296
297 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
300 if since.version >= self.version {
301 return Err(Error::io(
302 format!(
303 "fragments_since: given version {} is newer than manifest version {}",
304 since.version, self.version
305 ),
306 location!(),
307 ));
308 }
309 let start = since.max_fragment_id();
310 Ok(self
311 .fragments
312 .iter()
313 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
314 .cloned()
315 .collect())
316 }
317
318 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
334 let start = range.start;
335 let end = range.end;
336 let idx = self
337 .fragment_offsets
338 .binary_search(&start)
339 .unwrap_or_else(|idx| idx - 1);
340
341 let mut fragments = vec![];
342 for i in idx..self.fragments.len() {
343 if self.fragment_offsets[i] >= end
344 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
345 <= start
346 {
347 break;
348 }
349 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
350 }
351
352 fragments
353 }
354
355 pub fn uses_move_stable_row_ids(&self) -> bool {
357 self.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0
358 }
359
360 pub fn serialized(&self) -> Vec<u8> {
363 let pb_manifest: pb::Manifest = self.into();
364 pb_manifest.encode_to_vec()
365 }
366
367 pub fn should_use_legacy_format(&self) -> bool {
368 self.data_storage_format.version == LEGACY_FORMAT_VERSION
369 }
370}
371
372#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
373pub struct WriterVersion {
374 pub library: String,
375 pub version: String,
376}
377
378#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
379pub struct DataStorageFormat {
380 pub file_format: String,
381 pub version: String,
382}
383
384const LANCE_FORMAT_NAME: &str = "lance";
385
386impl DataStorageFormat {
387 pub fn new(version: LanceFileVersion) -> Self {
388 Self {
389 file_format: LANCE_FORMAT_NAME.to_string(),
390 version: version.resolve().to_string(),
391 }
392 }
393
394 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
395 self.version.parse::<LanceFileVersion>()
396 }
397}
398
399impl Default for DataStorageFormat {
400 fn default() -> Self {
401 Self::new(LanceFileVersion::default())
402 }
403}
404
405impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
406 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
407 Self {
408 file_format: pb.file_format,
409 version: pb.version,
410 }
411 }
412}
413
414#[derive(Debug, Clone, Copy, PartialEq, Eq)]
415pub enum VersionPart {
416 Major,
417 Minor,
418 Patch,
419}
420
421impl WriterVersion {
422 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
425 let mut parts = self.version.split('.');
426 let major = parts.next().unwrap_or("0").parse().ok()?;
427 let minor = parts.next().unwrap_or("0").parse().ok()?;
428 let patch = parts.next().unwrap_or("0").parse().ok()?;
429 let tag = parts.next();
430 Some((major, minor, patch, tag))
431 }
432
433 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
434 self.semver()
435 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
436 }
437
438 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
440 let version = self.semver_or_panic();
441 (version.0, version.1, version.2) < (major, minor, patch)
442 }
443
444 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
445 let parts = self.semver_or_panic();
446 let tag = if keep_tag { parts.3 } else { None };
447 let new_parts = match part {
448 VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
449 VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
450 VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
451 };
452 let new_version = if let Some(tag) = tag {
453 format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
454 } else {
455 format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
456 };
457 Self {
458 library: self.library.clone(),
459 version: new_version,
460 }
461 }
462}
463
464impl Default for WriterVersion {
465 #[cfg(not(test))]
466 fn default() -> Self {
467 Self {
468 library: "lance".to_string(),
469 version: env!("CARGO_PKG_VERSION").to_string(),
470 }
471 }
472
473 #[cfg(test)]
475 fn default() -> Self {
476 Self {
477 library: "lance".to_string(),
478 version: env!("CARGO_PKG_VERSION").to_string(),
479 }
480 .bump(VersionPart::Patch, true)
481 }
482}
483
484impl ProtoStruct for Manifest {
485 type Proto = pb::Manifest;
486}
487
488impl TryFrom<pb::Manifest> for Manifest {
489 type Error = Error;
490
491 fn try_from(p: pb::Manifest) -> Result<Self> {
492 let timestamp_nanos = p.timestamp.map(|ts| {
493 let sec = ts.seconds as u128 * 1e9 as u128;
494 let nanos = ts.nanos as u128;
495 sec + nanos
496 });
497 let writer_version = match p.writer_version {
499 Some(pb::manifest::WriterVersion { library, version }) => {
500 Some(WriterVersion { library, version })
501 }
502 _ => None,
503 };
504 let fragments = Arc::new(
505 p.fragments
506 .into_iter()
507 .map(Fragment::try_from)
508 .collect::<Result<Vec<_>>>()?,
509 );
510 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
511 let fields_with_meta = FieldsWithMeta {
512 fields: Fields(p.fields),
513 metadata: p.metadata,
514 };
515
516 if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0
517 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
518 {
519 return Err(Error::Internal {
520 message: "All fragments must have row ids".into(),
521 location: location!(),
522 });
523 }
524
525 let data_storage_format = match p.data_format {
526 None => {
527 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
528 DataStorageFormat::new(inferred_version)
530 } else {
531 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
533 DataStorageFormat::new(LanceFileVersion::Stable)
534 } else {
535 DataStorageFormat::new(LanceFileVersion::Legacy)
536 }
537 }
538 }
539 Some(format) => DataStorageFormat::from(format),
540 };
541
542 let schema = Schema::from(fields_with_meta);
543 let local_schema = schema.retain_storage_class(StorageClass::Default);
544
545 Ok(Self {
546 schema,
547 local_schema,
548 version: p.version,
549 writer_version,
550 version_aux_data: p.version_aux_data as usize,
551 index_section: p.index_section.map(|i| i as usize),
552 timestamp_nanos: timestamp_nanos.unwrap_or(0),
553 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
554 reader_feature_flags: p.reader_feature_flags,
555 writer_feature_flags: p.writer_feature_flags,
556 max_fragment_id: p.max_fragment_id,
557 fragments,
558 transaction_file: if p.transaction_file.is_empty() {
559 None
560 } else {
561 Some(p.transaction_file)
562 },
563 fragment_offsets,
564 next_row_id: p.next_row_id,
565 data_storage_format,
566 config: p.config,
567 blob_dataset_version: if p.blob_dataset_version == 0 {
568 None
569 } else {
570 Some(p.blob_dataset_version)
571 },
572 })
573 }
574}
575
576impl From<&Manifest> for pb::Manifest {
577 fn from(m: &Manifest) -> Self {
578 let timestamp_nanos = if m.timestamp_nanos == 0 {
579 None
580 } else {
581 let nanos = m.timestamp_nanos % 1e9 as u128;
582 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
583 Some(Timestamp {
584 seconds,
585 nanos: nanos as i32,
586 })
587 };
588 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
589 Self {
590 fields: fields_with_meta.fields.0,
591 version: m.version,
592 writer_version: m
593 .writer_version
594 .as_ref()
595 .map(|wv| pb::manifest::WriterVersion {
596 library: wv.library.clone(),
597 version: wv.version.clone(),
598 }),
599 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
600 metadata: fields_with_meta.metadata,
601 version_aux_data: m.version_aux_data as u64,
602 index_section: m.index_section.map(|i| i as u64),
603 timestamp: timestamp_nanos,
604 tag: m.tag.clone().unwrap_or_default(),
605 reader_feature_flags: m.reader_feature_flags,
606 writer_feature_flags: m.writer_feature_flags,
607 max_fragment_id: m.max_fragment_id,
608 transaction_file: m.transaction_file.clone().unwrap_or_default(),
609 next_row_id: m.next_row_id,
610 data_format: Some(pb::manifest::DataStorageFormat {
611 file_format: m.data_storage_format.file_format.clone(),
612 version: m.data_storage_format.version.clone(),
613 }),
614 config: m.config.clone(),
615 blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
616 }
617 }
618}
619
620#[async_trait]
621pub trait SelfDescribingFileReader {
622 async fn try_new_self_described(
630 object_store: &ObjectStore,
631 path: &Path,
632 cache: Option<&LanceCache>,
633 ) -> Result<Self>
634 where
635 Self: Sized,
636 {
637 let reader = object_store.open(path).await?;
638 Self::try_new_self_described_from_reader(reader.into(), cache).await
639 }
640
641 async fn try_new_self_described_from_reader(
642 reader: Arc<dyn Reader>,
643 cache: Option<&LanceCache>,
644 ) -> Result<Self>
645 where
646 Self: Sized;
647}
648
649#[async_trait]
650impl SelfDescribingFileReader for FileReader {
651 async fn try_new_self_described_from_reader(
652 reader: Arc<dyn Reader>,
653 cache: Option<&LanceCache>,
654 ) -> Result<Self> {
655 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
656 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
657 message: format!(
658 "Attempt to open file at {} as self-describing but it did not contain a manifest",
659 reader.path(),
660 ),
661 location: location!(),
662 })?;
663 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
664 if manifest.should_use_legacy_format() {
665 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
666 }
667 let schema = manifest.schema;
668 let max_field_id = schema.max_field_id().unwrap_or_default();
669 Self::try_new_from_reader(
670 reader.path(),
671 reader.clone(),
672 Some(metadata),
673 schema,
674 0,
675 0,
676 max_field_id,
677 cache,
678 )
679 .await
680 }
681}
682
683#[cfg(test)]
684mod tests {
685 use crate::format::DataFile;
686
687 use super::*;
688
689 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
690 use lance_core::datatypes::Field;
691
692 #[test]
693 fn test_writer_version() {
694 let wv = WriterVersion::default();
695 assert_eq!(wv.library, "lance");
696 let parts = wv.semver().unwrap();
697 assert_eq!(
698 parts,
699 (
700 env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
701 env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
702 env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
704 None
705 )
706 );
707 assert_eq!(
708 format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
709 env!("CARGO_PKG_VERSION")
710 );
711 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
712 let bumped = wv.bump(*part, false);
713 let bumped_parts = bumped.semver_or_panic();
714 assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
715 }
716 }
717
718 #[test]
719 fn test_fragments_by_offset_range() {
720 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
721 "a",
722 arrow_schema::DataType::Int64,
723 false,
724 )]);
725 let schema = Schema::try_from(&arrow_schema).unwrap();
726 let fragments = vec![
727 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
728 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
729 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
730 ];
731 let manifest = Manifest::new(
732 schema,
733 Arc::new(fragments),
734 DataStorageFormat::default(),
735 None,
736 );
737
738 let actual = manifest.fragments_by_offset_range(0..10);
739 assert_eq!(actual.len(), 1);
740 assert_eq!(actual[0].0, 0);
741 assert_eq!(actual[0].1.id, 0);
742
743 let actual = manifest.fragments_by_offset_range(5..15);
744 assert_eq!(actual.len(), 2);
745 assert_eq!(actual[0].0, 0);
746 assert_eq!(actual[0].1.id, 0);
747 assert_eq!(actual[1].0, 10);
748 assert_eq!(actual[1].1.id, 1);
749
750 let actual = manifest.fragments_by_offset_range(15..50);
751 assert_eq!(actual.len(), 2);
752 assert_eq!(actual[0].0, 10);
753 assert_eq!(actual[0].1.id, 1);
754 assert_eq!(actual[1].0, 25);
755 assert_eq!(actual[1].1.id, 2);
756
757 let actual = manifest.fragments_by_offset_range(45..100);
759 assert!(actual.is_empty());
760
761 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
762 }
763
764 #[test]
765 fn test_max_field_id() {
766 let mut field0 =
768 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
769 field0.set_id(-1, &mut 0);
770 let mut field2 =
771 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
772 field2.set_id(-1, &mut 2);
773
774 let schema = Schema {
775 fields: vec![field0, field2],
776 metadata: Default::default(),
777 };
778 let fragments = vec![
779 Fragment {
780 id: 0,
781 files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
782 deletion_file: None,
783 row_id_meta: None,
784 physical_rows: None,
785 },
786 Fragment {
787 id: 1,
788 files: vec![
789 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
790 DataFile::new_legacy_from_fields("path3", vec![2]),
791 ],
792 deletion_file: None,
793 row_id_meta: None,
794 physical_rows: None,
795 },
796 ];
797
798 let manifest = Manifest::new(
799 schema,
800 Arc::new(fragments),
801 DataStorageFormat::default(),
802 None,
803 );
804
805 assert_eq!(manifest.max_field_id(), 43);
806 }
807
808 #[test]
809 fn test_config() {
810 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
811 "a",
812 arrow_schema::DataType::Int64,
813 false,
814 )]);
815 let schema = Schema::try_from(&arrow_schema).unwrap();
816 let fragments = vec![
817 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
818 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
819 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
820 ];
821 let mut manifest = Manifest::new(
822 schema,
823 Arc::new(fragments),
824 DataStorageFormat::default(),
825 None,
826 );
827
828 let mut config = manifest.config.clone();
829 config.insert("lance.test".to_string(), "value".to_string());
830 config.insert("other-key".to_string(), "other-value".to_string());
831
832 manifest.update_config(config.clone());
833 assert_eq!(manifest.config, config.clone());
834
835 config.remove("other-key");
836 manifest.delete_config_keys(&["other-key"]);
837 assert_eq!(manifest.config, config);
838 }
839}