1use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use deepsize::DeepSizeOf;
11use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
12use lance_file::reader::FileReader;
13use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
14use lance_io::traits::{ProtoStruct, Reader};
15use object_store::path::Path;
16use prost::Message;
17use prost_types::Timestamp;
18
19use super::Fragment;
20use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_MOVE_STABLE_ROW_IDS};
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::{Schema, StorageClass};
24use lance_core::{Error, Result};
25use lance_io::object_store::ObjectStore;
26use lance_io::utils::read_struct;
27use snafu::location;
28
29#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
36pub struct Manifest {
37 pub schema: Schema,
39
40 pub local_schema: Schema,
42
43 pub version: u64,
45
46 pub writer_version: Option<WriterVersion>,
48
49 pub fragments: Arc<Vec<Fragment>>,
54
55 pub version_aux_data: usize,
57
58 pub index_section: Option<usize>,
60
61 pub timestamp_nanos: u128,
63
64 pub tag: Option<String>,
66
67 pub reader_feature_flags: u64,
69
70 pub writer_feature_flags: u64,
72
73 pub max_fragment_id: u32,
75
76 pub transaction_file: Option<String>,
78
79 fragment_offsets: Vec<usize>,
82
83 pub next_row_id: u64,
85
86 pub data_storage_format: DataStorageFormat,
88
89 pub config: HashMap<String, String>,
91
92 pub blob_dataset_version: Option<u64>,
94}
95
96pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
98
99pub fn is_detached_version(version: u64) -> bool {
100 version & DETACHED_VERSION_MASK != 0
101}
102
103fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
104 fragments
105 .iter()
106 .map(|f| f.num_rows().unwrap_or_default())
107 .chain([0]) .scan(0_usize, |offset, len| {
109 let start = *offset;
110 *offset += len;
111 Some(start)
112 })
113 .collect()
114}
115
116impl Manifest {
117 pub fn new(
118 schema: Schema,
119 fragments: Arc<Vec<Fragment>>,
120 data_storage_format: DataStorageFormat,
121 blob_dataset_version: Option<u64>,
122 ) -> Self {
123 let fragment_offsets = compute_fragment_offsets(&fragments);
124 let local_schema = schema.retain_storage_class(StorageClass::Default);
125
126 Self {
127 schema,
128 local_schema,
129 version: 1,
130 writer_version: Some(WriterVersion::default()),
131 fragments,
132 version_aux_data: 0,
133 index_section: None,
134 timestamp_nanos: 0,
135 tag: None,
136 reader_feature_flags: 0,
137 writer_feature_flags: 0,
138 max_fragment_id: 0,
139 transaction_file: None,
140 fragment_offsets,
141 next_row_id: 0,
142 data_storage_format,
143 config: HashMap::new(),
144 blob_dataset_version,
145 }
146 }
147
148 pub fn new_from_previous(
149 previous: &Self,
150 schema: Schema,
151 fragments: Arc<Vec<Fragment>>,
152 new_blob_version: Option<u64>,
153 ) -> Self {
154 let fragment_offsets = compute_fragment_offsets(&fragments);
155 let local_schema = schema.retain_storage_class(StorageClass::Default);
156
157 let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
158
159 Self {
160 schema,
161 local_schema,
162 version: previous.version + 1,
163 writer_version: Some(WriterVersion::default()),
164 fragments,
165 version_aux_data: 0,
166 index_section: None, timestamp_nanos: 0, tag: None,
169 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
172 transaction_file: None,
173 fragment_offsets,
174 next_row_id: previous.next_row_id,
175 data_storage_format: previous.data_storage_format.clone(),
176 config: previous.config.clone(),
177 blob_dataset_version,
178 }
179 }
180
181 pub fn timestamp(&self) -> DateTime<Utc> {
183 let nanos = self.timestamp_nanos % 1_000_000_000;
184 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
185 Utc.from_utc_datetime(
186 &DateTime::from_timestamp(seconds, nanos as u32)
187 .unwrap_or_default()
188 .naive_utc(),
189 )
190 }
191
192 pub fn set_timestamp(&mut self, nanos: u128) {
194 self.timestamp_nanos = nanos;
195 }
196
197 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
199 self.config.extend(upsert_values);
200 }
201
202 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
204 self.config
205 .retain(|key, _| !delete_keys.contains(&key.as_str()));
206 }
207
208 pub fn update_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
210 self.schema.metadata = new_metadata;
211 }
212
213 pub fn update_field_metadata(&mut self, field_id: i32, new_metadata: HashMap<String, String>) {
217 if let Some(field) = self.schema.field_by_id_mut(field_id) {
218 field.metadata = new_metadata;
219 }
220 }
221
222 pub fn update_max_fragment_id(&mut self) {
224 let max_fragment_id = self
225 .fragments
226 .iter()
227 .map(|f| f.id)
228 .max()
229 .unwrap_or_default()
230 .try_into()
231 .unwrap();
232
233 if max_fragment_id > self.max_fragment_id {
234 self.max_fragment_id = max_fragment_id;
235 }
236 }
237
238 pub fn max_fragment_id(&self) -> Option<u64> {
243 if self.max_fragment_id == 0 {
244 self.fragments.iter().map(|f| f.id).max()
247 } else {
248 Some(self.max_fragment_id.into())
249 }
250 }
251
252 pub fn max_field_id(&self) -> i32 {
257 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
258 let fragment_max_id = self
259 .fragments
260 .iter()
261 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
262 .max()
263 .copied();
264 let fragment_max_id = fragment_max_id.unwrap_or(-1);
265 schema_max_id.max(fragment_max_id)
266 }
267
268 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
271 if since.version >= self.version {
272 return Err(Error::io(
273 format!(
274 "fragments_since: given version {} is newer than manifest version {}",
275 since.version, self.version
276 ),
277 location!(),
278 ));
279 }
280 let start = since.max_fragment_id();
281 Ok(self
282 .fragments
283 .iter()
284 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
285 .cloned()
286 .collect())
287 }
288
289 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
305 let start = range.start;
306 let end = range.end;
307 let idx = self
308 .fragment_offsets
309 .binary_search(&start)
310 .unwrap_or_else(|idx| idx - 1);
311
312 let mut fragments = vec![];
313 for i in idx..self.fragments.len() {
314 if self.fragment_offsets[i] >= end
315 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
316 <= start
317 {
318 break;
319 }
320 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
321 }
322
323 fragments
324 }
325
326 pub fn uses_move_stable_row_ids(&self) -> bool {
328 self.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0
329 }
330
331 pub fn serialized(&self) -> Vec<u8> {
334 let pb_manifest: pb::Manifest = self.into();
335 pb_manifest.encode_to_vec()
336 }
337
338 pub fn should_use_legacy_format(&self) -> bool {
339 self.data_storage_format.version == LEGACY_FORMAT_VERSION
340 }
341}
342
343#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
344pub struct WriterVersion {
345 pub library: String,
346 pub version: String,
347}
348
349#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
350pub struct DataStorageFormat {
351 pub file_format: String,
352 pub version: String,
353}
354
355const LANCE_FORMAT_NAME: &str = "lance";
356
357impl DataStorageFormat {
358 pub fn new(version: LanceFileVersion) -> Self {
359 Self {
360 file_format: LANCE_FORMAT_NAME.to_string(),
361 version: version.resolve().to_string(),
362 }
363 }
364
365 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
366 self.version.parse::<LanceFileVersion>()
367 }
368}
369
370impl Default for DataStorageFormat {
371 fn default() -> Self {
372 Self::new(LanceFileVersion::default())
373 }
374}
375
376impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
377 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
378 Self {
379 file_format: pb.file_format,
380 version: pb.version,
381 }
382 }
383}
384
385#[derive(Debug, Clone, Copy, PartialEq, Eq)]
386pub enum VersionPart {
387 Major,
388 Minor,
389 Patch,
390}
391
392impl WriterVersion {
393 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
396 let mut parts = self.version.split('.');
397 let major = parts.next().unwrap_or("0").parse().ok()?;
398 let minor = parts.next().unwrap_or("0").parse().ok()?;
399 let patch = parts.next().unwrap_or("0").parse().ok()?;
400 let tag = parts.next();
401 Some((major, minor, patch, tag))
402 }
403
404 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
405 self.semver()
406 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
407 }
408
409 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
411 let version = self.semver_or_panic();
412 (version.0, version.1, version.2) < (major, minor, patch)
413 }
414
415 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
416 let parts = self.semver_or_panic();
417 let tag = if keep_tag { parts.3 } else { None };
418 let new_parts = match part {
419 VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
420 VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
421 VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
422 };
423 let new_version = if let Some(tag) = tag {
424 format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
425 } else {
426 format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
427 };
428 Self {
429 library: self.library.clone(),
430 version: new_version,
431 }
432 }
433}
434
435impl Default for WriterVersion {
436 #[cfg(not(test))]
437 fn default() -> Self {
438 Self {
439 library: "lance".to_string(),
440 version: env!("CARGO_PKG_VERSION").to_string(),
441 }
442 }
443
444 #[cfg(test)]
446 fn default() -> Self {
447 Self {
448 library: "lance".to_string(),
449 version: env!("CARGO_PKG_VERSION").to_string(),
450 }
451 .bump(VersionPart::Patch, true)
452 }
453}
454
455impl ProtoStruct for Manifest {
456 type Proto = pb::Manifest;
457}
458
459impl TryFrom<pb::Manifest> for Manifest {
460 type Error = Error;
461
462 fn try_from(p: pb::Manifest) -> Result<Self> {
463 let timestamp_nanos = p.timestamp.map(|ts| {
464 let sec = ts.seconds as u128 * 1e9 as u128;
465 let nanos = ts.nanos as u128;
466 sec + nanos
467 });
468 let writer_version = match p.writer_version {
470 Some(pb::manifest::WriterVersion { library, version }) => {
471 Some(WriterVersion { library, version })
472 }
473 _ => None,
474 };
475 let fragments = Arc::new(
476 p.fragments
477 .into_iter()
478 .map(Fragment::try_from)
479 .collect::<Result<Vec<_>>>()?,
480 );
481 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
482 let fields_with_meta = FieldsWithMeta {
483 fields: Fields(p.fields),
484 metadata: p.metadata,
485 };
486
487 if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0
488 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
489 {
490 return Err(Error::Internal {
491 message: "All fragments must have row ids".into(),
492 location: location!(),
493 });
494 }
495
496 let data_storage_format = match p.data_format {
497 None => {
498 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
499 DataStorageFormat::new(inferred_version)
501 } else {
502 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
504 DataStorageFormat::new(LanceFileVersion::Stable)
505 } else {
506 DataStorageFormat::new(LanceFileVersion::Legacy)
507 }
508 }
509 }
510 Some(format) => DataStorageFormat::from(format),
511 };
512
513 let schema = Schema::from(fields_with_meta);
514 let local_schema = schema.retain_storage_class(StorageClass::Default);
515
516 Ok(Self {
517 schema,
518 local_schema,
519 version: p.version,
520 writer_version,
521 fragments,
522 version_aux_data: p.version_aux_data as usize,
523 index_section: p.index_section.map(|i| i as usize),
524 timestamp_nanos: timestamp_nanos.unwrap_or(0),
525 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
526 reader_feature_flags: p.reader_feature_flags,
527 writer_feature_flags: p.writer_feature_flags,
528 max_fragment_id: p.max_fragment_id,
529 transaction_file: if p.transaction_file.is_empty() {
530 None
531 } else {
532 Some(p.transaction_file)
533 },
534 fragment_offsets,
535 next_row_id: p.next_row_id,
536 data_storage_format,
537 config: p.config,
538 blob_dataset_version: if p.blob_dataset_version == 0 {
539 None
540 } else {
541 Some(p.blob_dataset_version)
542 },
543 })
544 }
545}
546
547impl From<&Manifest> for pb::Manifest {
548 fn from(m: &Manifest) -> Self {
549 let timestamp_nanos = if m.timestamp_nanos == 0 {
550 None
551 } else {
552 let nanos = m.timestamp_nanos % 1e9 as u128;
553 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
554 Some(Timestamp {
555 seconds,
556 nanos: nanos as i32,
557 })
558 };
559 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
560 Self {
561 fields: fields_with_meta.fields.0,
562 version: m.version,
563 writer_version: m
564 .writer_version
565 .as_ref()
566 .map(|wv| pb::manifest::WriterVersion {
567 library: wv.library.clone(),
568 version: wv.version.clone(),
569 }),
570 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
571 metadata: fields_with_meta.metadata,
572 version_aux_data: m.version_aux_data as u64,
573 index_section: m.index_section.map(|i| i as u64),
574 timestamp: timestamp_nanos,
575 tag: m.tag.clone().unwrap_or_default(),
576 reader_feature_flags: m.reader_feature_flags,
577 writer_feature_flags: m.writer_feature_flags,
578 max_fragment_id: m.max_fragment_id,
579 transaction_file: m.transaction_file.clone().unwrap_or_default(),
580 next_row_id: m.next_row_id,
581 data_format: Some(pb::manifest::DataStorageFormat {
582 file_format: m.data_storage_format.file_format.clone(),
583 version: m.data_storage_format.version.clone(),
584 }),
585 config: m.config.clone(),
586 blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
587 }
588 }
589}
590
591#[async_trait]
592pub trait SelfDescribingFileReader {
593 async fn try_new_self_described(
601 object_store: &ObjectStore,
602 path: &Path,
603 cache: Option<&LanceCache>,
604 ) -> Result<Self>
605 where
606 Self: Sized,
607 {
608 let reader = object_store.open(path).await?;
609 Self::try_new_self_described_from_reader(reader.into(), cache).await
610 }
611
612 async fn try_new_self_described_from_reader(
613 reader: Arc<dyn Reader>,
614 cache: Option<&LanceCache>,
615 ) -> Result<Self>
616 where
617 Self: Sized;
618}
619
620#[async_trait]
621impl SelfDescribingFileReader for FileReader {
622 async fn try_new_self_described_from_reader(
623 reader: Arc<dyn Reader>,
624 cache: Option<&LanceCache>,
625 ) -> Result<Self> {
626 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
627 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
628 message: format!(
629 "Attempt to open file at {} as self-describing but it did not contain a manifest",
630 reader.path(),
631 ),
632 location: location!(),
633 })?;
634 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
635 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
636 let schema = manifest.schema;
637 let max_field_id = schema.max_field_id().unwrap_or_default();
638 Self::try_new_from_reader(
639 reader.path(),
640 reader.clone(),
641 Some(metadata),
642 schema,
643 0,
644 0,
645 max_field_id,
646 cache,
647 )
648 .await
649 }
650}
651
652#[cfg(test)]
653mod tests {
654 use crate::format::DataFile;
655
656 use super::*;
657
658 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
659 use lance_core::datatypes::Field;
660
661 #[test]
662 fn test_writer_version() {
663 let wv = WriterVersion::default();
664 assert_eq!(wv.library, "lance");
665 let parts = wv.semver().unwrap();
666 assert_eq!(
667 parts,
668 (
669 env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
670 env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
671 env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
673 None
674 )
675 );
676 assert_eq!(
677 format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
678 env!("CARGO_PKG_VERSION")
679 );
680 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
681 let bumped = wv.bump(*part, false);
682 let bumped_parts = bumped.semver_or_panic();
683 assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
684 }
685 }
686
687 #[test]
688 fn test_fragments_by_offset_range() {
689 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
690 "a",
691 arrow_schema::DataType::Int64,
692 false,
693 )]);
694 let schema = Schema::try_from(&arrow_schema).unwrap();
695 let fragments = vec![
696 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
697 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
698 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
699 ];
700 let manifest = Manifest::new(
701 schema,
702 Arc::new(fragments),
703 DataStorageFormat::default(),
704 None,
705 );
706
707 let actual = manifest.fragments_by_offset_range(0..10);
708 assert_eq!(actual.len(), 1);
709 assert_eq!(actual[0].0, 0);
710 assert_eq!(actual[0].1.id, 0);
711
712 let actual = manifest.fragments_by_offset_range(5..15);
713 assert_eq!(actual.len(), 2);
714 assert_eq!(actual[0].0, 0);
715 assert_eq!(actual[0].1.id, 0);
716 assert_eq!(actual[1].0, 10);
717 assert_eq!(actual[1].1.id, 1);
718
719 let actual = manifest.fragments_by_offset_range(15..50);
720 assert_eq!(actual.len(), 2);
721 assert_eq!(actual[0].0, 10);
722 assert_eq!(actual[0].1.id, 1);
723 assert_eq!(actual[1].0, 25);
724 assert_eq!(actual[1].1.id, 2);
725
726 let actual = manifest.fragments_by_offset_range(45..100);
728 assert!(actual.is_empty());
729
730 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
731 }
732
733 #[test]
734 fn test_max_field_id() {
735 let mut field0 =
737 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
738 field0.set_id(-1, &mut 0);
739 let mut field2 =
740 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
741 field2.set_id(-1, &mut 2);
742
743 let schema = Schema {
744 fields: vec![field0, field2],
745 metadata: Default::default(),
746 };
747 let fragments = vec![
748 Fragment {
749 id: 0,
750 files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
751 deletion_file: None,
752 row_id_meta: None,
753 physical_rows: None,
754 },
755 Fragment {
756 id: 1,
757 files: vec![
758 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
759 DataFile::new_legacy_from_fields("path3", vec![2]),
760 ],
761 deletion_file: None,
762 row_id_meta: None,
763 physical_rows: None,
764 },
765 ];
766
767 let manifest = Manifest::new(
768 schema,
769 Arc::new(fragments),
770 DataStorageFormat::default(),
771 None,
772 );
773
774 assert_eq!(manifest.max_field_id(), 43);
775 }
776
777 #[test]
778 fn test_config() {
779 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
780 "a",
781 arrow_schema::DataType::Int64,
782 false,
783 )]);
784 let schema = Schema::try_from(&arrow_schema).unwrap();
785 let fragments = vec![
786 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
787 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
788 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
789 ];
790 let mut manifest = Manifest::new(
791 schema,
792 Arc::new(fragments),
793 DataStorageFormat::default(),
794 None,
795 );
796
797 let mut config = manifest.config.clone();
798 config.insert("lance.test".to_string(), "value".to_string());
799 config.insert("other-key".to_string(), "other-value".to_string());
800
801 manifest.update_config(config.clone());
802 assert_eq!(manifest.config, config.clone());
803
804 config.remove("other-key");
805 manifest.delete_config_keys(&["other-key"]);
806 assert_eq!(manifest.config, config);
807 }
808}