1use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
11use lance_file::reader::FileReader;
12use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
13use lance_io::traits::{ProtoStruct, Reader};
14use object_store::path::Path;
15use prost::Message;
16use prost_types::Timestamp;
17
18use super::Fragment;
19use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_MOVE_STABLE_ROW_IDS};
20use crate::format::pb;
21use lance_core::cache::FileMetadataCache;
22use lance_core::datatypes::{Schema, StorageClass};
23use lance_core::{Error, Result};
24use lance_io::object_store::ObjectStore;
25use lance_io::utils::read_struct;
26use snafu::location;
27
28#[derive(Debug, Clone, PartialEq)]
35pub struct Manifest {
36 pub schema: Schema,
38
39 pub local_schema: Schema,
41
42 pub version: u64,
44
45 pub writer_version: Option<WriterVersion>,
47
48 pub fragments: Arc<Vec<Fragment>>,
53
54 pub version_aux_data: usize,
56
57 pub index_section: Option<usize>,
59
60 pub timestamp_nanos: u128,
62
63 pub tag: Option<String>,
65
66 pub reader_feature_flags: u64,
68
69 pub writer_feature_flags: u64,
71
72 pub max_fragment_id: u32,
74
75 pub transaction_file: Option<String>,
77
78 fragment_offsets: Vec<usize>,
81
82 pub next_row_id: u64,
84
85 pub data_storage_format: DataStorageFormat,
87
88 pub config: HashMap<String, String>,
90
91 pub blob_dataset_version: Option<u64>,
93}
94
95pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
97
98pub fn is_detached_version(version: u64) -> bool {
99 version & DETACHED_VERSION_MASK != 0
100}
101
102fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
103 fragments
104 .iter()
105 .map(|f| f.num_rows().unwrap_or_default())
106 .chain([0]) .scan(0_usize, |offset, len| {
108 let start = *offset;
109 *offset += len;
110 Some(start)
111 })
112 .collect()
113}
114
115impl Manifest {
116 pub fn new(
117 schema: Schema,
118 fragments: Arc<Vec<Fragment>>,
119 data_storage_format: DataStorageFormat,
120 blob_dataset_version: Option<u64>,
121 ) -> Self {
122 let fragment_offsets = compute_fragment_offsets(&fragments);
123 let local_schema = schema.retain_storage_class(StorageClass::Default);
124
125 Self {
126 schema,
127 local_schema,
128 version: 1,
129 writer_version: Some(WriterVersion::default()),
130 fragments,
131 version_aux_data: 0,
132 index_section: None,
133 timestamp_nanos: 0,
134 tag: None,
135 reader_feature_flags: 0,
136 writer_feature_flags: 0,
137 max_fragment_id: 0,
138 transaction_file: None,
139 fragment_offsets,
140 next_row_id: 0,
141 data_storage_format,
142 config: HashMap::new(),
143 blob_dataset_version,
144 }
145 }
146
147 pub fn new_from_previous(
148 previous: &Self,
149 schema: Schema,
150 fragments: Arc<Vec<Fragment>>,
151 new_blob_version: Option<u64>,
152 ) -> Self {
153 let fragment_offsets = compute_fragment_offsets(&fragments);
154 let local_schema = schema.retain_storage_class(StorageClass::Default);
155
156 let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
157
158 Self {
159 schema,
160 local_schema,
161 version: previous.version + 1,
162 writer_version: Some(WriterVersion::default()),
163 fragments,
164 version_aux_data: 0,
165 index_section: None, timestamp_nanos: 0, tag: None,
168 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
171 transaction_file: None,
172 fragment_offsets,
173 next_row_id: previous.next_row_id,
174 data_storage_format: previous.data_storage_format.clone(),
175 config: previous.config.clone(),
176 blob_dataset_version,
177 }
178 }
179
180 pub fn timestamp(&self) -> DateTime<Utc> {
182 let nanos = self.timestamp_nanos % 1_000_000_000;
183 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
184 Utc.from_utc_datetime(
185 &DateTime::from_timestamp(seconds, nanos as u32)
186 .unwrap_or_default()
187 .naive_utc(),
188 )
189 }
190
191 pub fn set_timestamp(&mut self, nanos: u128) {
193 self.timestamp_nanos = nanos;
194 }
195
196 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
198 self.config.extend(upsert_values);
199 }
200
201 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
203 self.config
204 .retain(|key, _| !delete_keys.contains(&key.as_str()));
205 }
206
207 pub fn update_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
209 self.schema.metadata = new_metadata;
210 }
211
212 pub fn update_field_metadata(&mut self, field_id: i32, new_metadata: HashMap<String, String>) {
216 if let Some(field) = self.schema.field_by_id_mut(field_id) {
217 field.metadata = new_metadata;
218 }
219 }
220
221 pub fn update_max_fragment_id(&mut self) {
223 let max_fragment_id = self
224 .fragments
225 .iter()
226 .map(|f| f.id)
227 .max()
228 .unwrap_or_default()
229 .try_into()
230 .unwrap();
231
232 if max_fragment_id > self.max_fragment_id {
233 self.max_fragment_id = max_fragment_id;
234 }
235 }
236
237 pub fn max_fragment_id(&self) -> Option<u64> {
242 if self.max_fragment_id == 0 {
243 self.fragments.iter().map(|f| f.id).max()
246 } else {
247 Some(self.max_fragment_id.into())
248 }
249 }
250
251 pub fn max_field_id(&self) -> i32 {
256 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
257 let fragment_max_id = self
258 .fragments
259 .iter()
260 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
261 .max()
262 .copied();
263 let fragment_max_id = fragment_max_id.unwrap_or(-1);
264 schema_max_id.max(fragment_max_id)
265 }
266
267 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
270 if since.version >= self.version {
271 return Err(Error::io(
272 format!(
273 "fragments_since: given version {} is newer than manifest version {}",
274 since.version, self.version
275 ),
276 location!(),
277 ));
278 }
279 let start = since.max_fragment_id();
280 Ok(self
281 .fragments
282 .iter()
283 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
284 .cloned()
285 .collect())
286 }
287
288 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
304 let start = range.start;
305 let end = range.end;
306 let idx = self
307 .fragment_offsets
308 .binary_search(&start)
309 .unwrap_or_else(|idx| idx - 1);
310
311 let mut fragments = vec![];
312 for i in idx..self.fragments.len() {
313 if self.fragment_offsets[i] >= end
314 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
315 <= start
316 {
317 break;
318 }
319 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
320 }
321
322 fragments
323 }
324
325 pub fn uses_move_stable_row_ids(&self) -> bool {
327 self.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0
328 }
329
330 pub fn serialized(&self) -> Vec<u8> {
333 let pb_manifest: pb::Manifest = self.into();
334 pb_manifest.encode_to_vec()
335 }
336
337 pub fn should_use_legacy_format(&self) -> bool {
338 self.data_storage_format.version == LEGACY_FORMAT_VERSION
339 }
340}
341
342#[derive(Debug, Clone, PartialEq)]
343pub struct WriterVersion {
344 pub library: String,
345 pub version: String,
346}
347
348#[derive(Debug, Clone, PartialEq)]
349pub struct DataStorageFormat {
350 pub file_format: String,
351 pub version: String,
352}
353
354const LANCE_FORMAT_NAME: &str = "lance";
355
356impl DataStorageFormat {
357 pub fn new(version: LanceFileVersion) -> Self {
358 Self {
359 file_format: LANCE_FORMAT_NAME.to_string(),
360 version: version.resolve().to_string(),
361 }
362 }
363
364 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
365 self.version.parse::<LanceFileVersion>()
366 }
367}
368
369impl Default for DataStorageFormat {
370 fn default() -> Self {
371 Self::new(LanceFileVersion::default())
372 }
373}
374
375impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
376 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
377 Self {
378 file_format: pb.file_format,
379 version: pb.version,
380 }
381 }
382}
383
384#[derive(Debug, Clone, Copy, PartialEq, Eq)]
385pub enum VersionPart {
386 Major,
387 Minor,
388 Patch,
389}
390
391impl WriterVersion {
392 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
395 let mut parts = self.version.split('.');
396 let major = parts.next().unwrap_or("0").parse().ok()?;
397 let minor = parts.next().unwrap_or("0").parse().ok()?;
398 let patch = parts.next().unwrap_or("0").parse().ok()?;
399 let tag = parts.next();
400 Some((major, minor, patch, tag))
401 }
402
403 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
404 self.semver()
405 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
406 }
407
408 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
410 let version = self.semver_or_panic();
411 (version.0, version.1, version.2) < (major, minor, patch)
412 }
413
414 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
415 let parts = self.semver_or_panic();
416 let tag = if keep_tag { parts.3 } else { None };
417 let new_parts = match part {
418 VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
419 VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
420 VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
421 };
422 let new_version = if let Some(tag) = tag {
423 format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
424 } else {
425 format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
426 };
427 Self {
428 library: self.library.clone(),
429 version: new_version,
430 }
431 }
432}
433
434impl Default for WriterVersion {
435 #[cfg(not(test))]
436 fn default() -> Self {
437 Self {
438 library: "lance".to_string(),
439 version: env!("CARGO_PKG_VERSION").to_string(),
440 }
441 }
442
443 #[cfg(test)]
445 fn default() -> Self {
446 Self {
447 library: "lance".to_string(),
448 version: env!("CARGO_PKG_VERSION").to_string(),
449 }
450 .bump(VersionPart::Patch, true)
451 }
452}
453
454impl ProtoStruct for Manifest {
455 type Proto = pb::Manifest;
456}
457
458impl TryFrom<pb::Manifest> for Manifest {
459 type Error = Error;
460
461 fn try_from(p: pb::Manifest) -> Result<Self> {
462 let timestamp_nanos = p.timestamp.map(|ts| {
463 let sec = ts.seconds as u128 * 1e9 as u128;
464 let nanos = ts.nanos as u128;
465 sec + nanos
466 });
467 let writer_version = match p.writer_version {
469 Some(pb::manifest::WriterVersion { library, version }) => {
470 Some(WriterVersion { library, version })
471 }
472 _ => None,
473 };
474 let fragments = Arc::new(
475 p.fragments
476 .into_iter()
477 .map(Fragment::try_from)
478 .collect::<Result<Vec<_>>>()?,
479 );
480 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
481 let fields_with_meta = FieldsWithMeta {
482 fields: Fields(p.fields),
483 metadata: p.metadata,
484 };
485
486 if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0
487 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
488 {
489 return Err(Error::Internal {
490 message: "All fragments must have row ids".into(),
491 location: location!(),
492 });
493 }
494
495 let data_storage_format = match p.data_format {
496 None => {
497 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
498 DataStorageFormat::new(inferred_version)
500 } else {
501 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
503 DataStorageFormat::new(LanceFileVersion::Stable)
504 } else {
505 DataStorageFormat::new(LanceFileVersion::Legacy)
506 }
507 }
508 }
509 Some(format) => DataStorageFormat::from(format),
510 };
511
512 let schema = Schema::from(fields_with_meta);
513 let local_schema = schema.retain_storage_class(StorageClass::Default);
514
515 Ok(Self {
516 schema,
517 local_schema,
518 version: p.version,
519 writer_version,
520 fragments,
521 version_aux_data: p.version_aux_data as usize,
522 index_section: p.index_section.map(|i| i as usize),
523 timestamp_nanos: timestamp_nanos.unwrap_or(0),
524 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
525 reader_feature_flags: p.reader_feature_flags,
526 writer_feature_flags: p.writer_feature_flags,
527 max_fragment_id: p.max_fragment_id,
528 transaction_file: if p.transaction_file.is_empty() {
529 None
530 } else {
531 Some(p.transaction_file)
532 },
533 fragment_offsets,
534 next_row_id: p.next_row_id,
535 data_storage_format,
536 config: p.config,
537 blob_dataset_version: if p.blob_dataset_version == 0 {
538 None
539 } else {
540 Some(p.blob_dataset_version)
541 },
542 })
543 }
544}
545
546impl From<&Manifest> for pb::Manifest {
547 fn from(m: &Manifest) -> Self {
548 let timestamp_nanos = if m.timestamp_nanos == 0 {
549 None
550 } else {
551 let nanos = m.timestamp_nanos % 1e9 as u128;
552 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
553 Some(Timestamp {
554 seconds,
555 nanos: nanos as i32,
556 })
557 };
558 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
559 Self {
560 fields: fields_with_meta.fields.0,
561 version: m.version,
562 writer_version: m
563 .writer_version
564 .as_ref()
565 .map(|wv| pb::manifest::WriterVersion {
566 library: wv.library.clone(),
567 version: wv.version.clone(),
568 }),
569 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
570 metadata: fields_with_meta.metadata,
571 version_aux_data: m.version_aux_data as u64,
572 index_section: m.index_section.map(|i| i as u64),
573 timestamp: timestamp_nanos,
574 tag: m.tag.clone().unwrap_or_default(),
575 reader_feature_flags: m.reader_feature_flags,
576 writer_feature_flags: m.writer_feature_flags,
577 max_fragment_id: m.max_fragment_id,
578 transaction_file: m.transaction_file.clone().unwrap_or_default(),
579 next_row_id: m.next_row_id,
580 data_format: Some(pb::manifest::DataStorageFormat {
581 file_format: m.data_storage_format.file_format.clone(),
582 version: m.data_storage_format.version.clone(),
583 }),
584 config: m.config.clone(),
585 blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
586 }
587 }
588}
589
590#[async_trait]
591pub trait SelfDescribingFileReader {
592 async fn try_new_self_described(
600 object_store: &ObjectStore,
601 path: &Path,
602 cache: Option<&FileMetadataCache>,
603 ) -> Result<Self>
604 where
605 Self: Sized,
606 {
607 let reader = object_store.open(path).await?;
608 Self::try_new_self_described_from_reader(reader.into(), cache).await
609 }
610
611 async fn try_new_self_described_from_reader(
612 reader: Arc<dyn Reader>,
613 cache: Option<&FileMetadataCache>,
614 ) -> Result<Self>
615 where
616 Self: Sized;
617}
618
619#[async_trait]
620impl SelfDescribingFileReader for FileReader {
621 async fn try_new_self_described_from_reader(
622 reader: Arc<dyn Reader>,
623 cache: Option<&FileMetadataCache>,
624 ) -> Result<Self> {
625 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
626 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
627 message: format!(
628 "Attempt to open file at {} as self-describing but it did not contain a manifest",
629 reader.path(),
630 ),
631 location: location!(),
632 })?;
633 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
634 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
635 let schema = manifest.schema;
636 let max_field_id = schema.max_field_id().unwrap_or_default();
637 Self::try_new_from_reader(
638 reader.path(),
639 reader.clone(),
640 Some(metadata),
641 schema,
642 0,
643 0,
644 max_field_id,
645 cache,
646 )
647 .await
648 }
649}
650
651#[cfg(test)]
652mod tests {
653 use crate::format::DataFile;
654
655 use super::*;
656
657 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
658 use lance_core::datatypes::Field;
659
660 #[test]
661 fn test_writer_version() {
662 let wv = WriterVersion::default();
663 assert_eq!(wv.library, "lance");
664 let parts = wv.semver().unwrap();
665 assert_eq!(
666 parts,
667 (
668 env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
669 env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
670 env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
672 None
673 )
674 );
675 assert_eq!(
676 format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
677 env!("CARGO_PKG_VERSION")
678 );
679 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
680 let bumped = wv.bump(*part, false);
681 let bumped_parts = bumped.semver_or_panic();
682 assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
683 }
684 }
685
686 #[test]
687 fn test_fragments_by_offset_range() {
688 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
689 "a",
690 arrow_schema::DataType::Int64,
691 false,
692 )]);
693 let schema = Schema::try_from(&arrow_schema).unwrap();
694 let fragments = vec![
695 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
696 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
697 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
698 ];
699 let manifest = Manifest::new(
700 schema,
701 Arc::new(fragments),
702 DataStorageFormat::default(),
703 None,
704 );
705
706 let actual = manifest.fragments_by_offset_range(0..10);
707 assert_eq!(actual.len(), 1);
708 assert_eq!(actual[0].0, 0);
709 assert_eq!(actual[0].1.id, 0);
710
711 let actual = manifest.fragments_by_offset_range(5..15);
712 assert_eq!(actual.len(), 2);
713 assert_eq!(actual[0].0, 0);
714 assert_eq!(actual[0].1.id, 0);
715 assert_eq!(actual[1].0, 10);
716 assert_eq!(actual[1].1.id, 1);
717
718 let actual = manifest.fragments_by_offset_range(15..50);
719 assert_eq!(actual.len(), 2);
720 assert_eq!(actual[0].0, 10);
721 assert_eq!(actual[0].1.id, 1);
722 assert_eq!(actual[1].0, 25);
723 assert_eq!(actual[1].1.id, 2);
724
725 let actual = manifest.fragments_by_offset_range(45..100);
727 assert!(actual.is_empty());
728
729 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
730 }
731
732 #[test]
733 fn test_max_field_id() {
734 let mut field0 =
736 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
737 field0.set_id(-1, &mut 0);
738 let mut field2 =
739 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
740 field2.set_id(-1, &mut 2);
741
742 let schema = Schema {
743 fields: vec![field0, field2],
744 metadata: Default::default(),
745 };
746 let fragments = vec![
747 Fragment {
748 id: 0,
749 files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
750 deletion_file: None,
751 row_id_meta: None,
752 physical_rows: None,
753 },
754 Fragment {
755 id: 1,
756 files: vec![
757 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
758 DataFile::new_legacy_from_fields("path3", vec![2]),
759 ],
760 deletion_file: None,
761 row_id_meta: None,
762 physical_rows: None,
763 },
764 ];
765
766 let manifest = Manifest::new(
767 schema,
768 Arc::new(fragments),
769 DataStorageFormat::default(),
770 None,
771 );
772
773 assert_eq!(manifest.max_field_id(), 43);
774 }
775
776 #[test]
777 fn test_config() {
778 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
779 "a",
780 arrow_schema::DataType::Int64,
781 false,
782 )]);
783 let schema = Schema::try_from(&arrow_schema).unwrap();
784 let fragments = vec![
785 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
786 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
787 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
788 ];
789 let mut manifest = Manifest::new(
790 schema,
791 Arc::new(fragments),
792 DataStorageFormat::default(),
793 None,
794 );
795
796 let mut config = manifest.config.clone();
797 config.insert("lance.test".to_string(), "value".to_string());
798 config.insert("other-key".to_string(), "other-value".to_string());
799
800 manifest.update_config(config.clone());
801 assert_eq!(manifest.config, config.clone());
802
803 config.remove("other-key");
804 manifest.delete_config_keys(&["other-key"]);
805 assert_eq!(manifest.config, config);
806 }
807}