1use std::collections::HashMap;
5use std::ops::Range;
6use std::sync::Arc;
7
8use async_trait::async_trait;
9use chrono::prelude::*;
10use deepsize::DeepSizeOf;
11use lance_file::datatypes::{populate_schema_dictionary, Fields, FieldsWithMeta};
12use lance_file::reader::FileReader;
13use lance_file::version::{LanceFileVersion, LEGACY_FORMAT_VERSION};
14use lance_io::traits::{ProtoStruct, Reader};
15use object_store::path::Path;
16use prost::Message;
17use prost_types::Timestamp;
18
19use super::Fragment;
20use crate::feature_flags::{has_deprecated_v2_feature_flag, FLAG_MOVE_STABLE_ROW_IDS};
21use crate::format::pb;
22use lance_core::cache::LanceCache;
23use lance_core::datatypes::{Schema, StorageClass};
24use lance_core::{Error, Result};
25use lance_io::object_store::ObjectStore;
26use lance_io::utils::read_struct;
27use snafu::location;
28
29#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
36pub struct Manifest {
37 pub schema: Schema,
39
40 pub local_schema: Schema,
42
43 pub version: u64,
45
46 pub writer_version: Option<WriterVersion>,
48
49 pub fragments: Arc<Vec<Fragment>>,
54
55 pub version_aux_data: usize,
57
58 pub index_section: Option<usize>,
60
61 pub timestamp_nanos: u128,
63
64 pub tag: Option<String>,
66
67 pub reader_feature_flags: u64,
69
70 pub writer_feature_flags: u64,
72
73 pub max_fragment_id: Option<u32>,
76
77 pub transaction_file: Option<String>,
79
80 fragment_offsets: Vec<usize>,
83
84 pub next_row_id: u64,
86
87 pub data_storage_format: DataStorageFormat,
89
90 pub config: HashMap<String, String>,
92
93 pub blob_dataset_version: Option<u64>,
95}
96
97pub const DETACHED_VERSION_MASK: u64 = 0x8000_0000_0000_0000;
99
100pub fn is_detached_version(version: u64) -> bool {
101 version & DETACHED_VERSION_MASK != 0
102}
103
104fn compute_fragment_offsets(fragments: &[Fragment]) -> Vec<usize> {
105 fragments
106 .iter()
107 .map(|f| f.num_rows().unwrap_or_default())
108 .chain([0]) .scan(0_usize, |offset, len| {
110 let start = *offset;
111 *offset += len;
112 Some(start)
113 })
114 .collect()
115}
116
117impl Manifest {
118 pub fn new(
119 schema: Schema,
120 fragments: Arc<Vec<Fragment>>,
121 data_storage_format: DataStorageFormat,
122 blob_dataset_version: Option<u64>,
123 ) -> Self {
124 let fragment_offsets = compute_fragment_offsets(&fragments);
125 let local_schema = schema.retain_storage_class(StorageClass::Default);
126
127 Self {
128 schema,
129 local_schema,
130 version: 1,
131 writer_version: Some(WriterVersion::default()),
132 fragments,
133 version_aux_data: 0,
134 index_section: None,
135 timestamp_nanos: 0,
136 tag: None,
137 reader_feature_flags: 0,
138 writer_feature_flags: 0,
139 max_fragment_id: None,
140 transaction_file: None,
141 fragment_offsets,
142 next_row_id: 0,
143 data_storage_format,
144 config: HashMap::new(),
145 blob_dataset_version,
146 }
147 }
148
149 pub fn new_from_previous(
150 previous: &Self,
151 schema: Schema,
152 fragments: Arc<Vec<Fragment>>,
153 new_blob_version: Option<u64>,
154 ) -> Self {
155 let fragment_offsets = compute_fragment_offsets(&fragments);
156 let local_schema = schema.retain_storage_class(StorageClass::Default);
157
158 let blob_dataset_version = new_blob_version.or(previous.blob_dataset_version);
159
160 Self {
161 schema,
162 local_schema,
163 version: previous.version + 1,
164 writer_version: Some(WriterVersion::default()),
165 fragments,
166 version_aux_data: 0,
167 index_section: None, timestamp_nanos: 0, tag: None,
170 reader_feature_flags: 0, writer_feature_flags: 0, max_fragment_id: previous.max_fragment_id,
173 transaction_file: None,
174 fragment_offsets,
175 next_row_id: previous.next_row_id,
176 data_storage_format: previous.data_storage_format.clone(),
177 config: previous.config.clone(),
178 blob_dataset_version,
179 }
180 }
181
182 pub fn timestamp(&self) -> DateTime<Utc> {
184 let nanos = self.timestamp_nanos % 1_000_000_000;
185 let seconds = ((self.timestamp_nanos - nanos) / 1_000_000_000) as i64;
186 Utc.from_utc_datetime(
187 &DateTime::from_timestamp(seconds, nanos as u32)
188 .unwrap_or_default()
189 .naive_utc(),
190 )
191 }
192
193 pub fn set_timestamp(&mut self, nanos: u128) {
195 self.timestamp_nanos = nanos;
196 }
197
198 pub fn update_config(&mut self, upsert_values: impl IntoIterator<Item = (String, String)>) {
200 self.config.extend(upsert_values);
201 }
202
203 pub fn delete_config_keys(&mut self, delete_keys: &[&str]) {
205 self.config
206 .retain(|key, _| !delete_keys.contains(&key.as_str()));
207 }
208
209 pub fn update_schema_metadata(&mut self, new_metadata: HashMap<String, String>) {
211 self.schema.metadata = new_metadata;
212 }
213
214 pub fn update_field_metadata(&mut self, field_id: i32, new_metadata: HashMap<String, String>) {
218 if let Some(field) = self.schema.field_by_id_mut(field_id) {
219 field.metadata = new_metadata;
220 }
221 }
222
223 pub fn update_max_fragment_id(&mut self) {
225 if self.fragments.is_empty() {
227 return;
228 }
229
230 let max_fragment_id = self
231 .fragments
232 .iter()
233 .map(|f| f.id)
234 .max()
235 .unwrap() .try_into()
237 .unwrap();
238
239 match self.max_fragment_id {
240 None => {
241 self.max_fragment_id = Some(max_fragment_id);
243 }
244 Some(current_max) => {
245 if max_fragment_id > current_max {
248 self.max_fragment_id = Some(max_fragment_id);
249 }
250 }
251 }
252 }
253
254 pub fn max_fragment_id(&self) -> Option<u64> {
259 if let Some(max_id) = self.max_fragment_id {
260 Some(max_id.into())
262 } else {
263 self.fragments.iter().map(|f| f.id).max()
265 }
266 }
267
268 pub fn max_field_id(&self) -> i32 {
273 let schema_max_id = self.schema.max_field_id().unwrap_or(-1);
274 let fragment_max_id = self
275 .fragments
276 .iter()
277 .flat_map(|f| f.files.iter().flat_map(|file| file.fields.as_slice()))
278 .max()
279 .copied();
280 let fragment_max_id = fragment_max_id.unwrap_or(-1);
281 schema_max_id.max(fragment_max_id)
282 }
283
284 pub fn fragments_since(&self, since: &Self) -> Result<Vec<Fragment>> {
287 if since.version >= self.version {
288 return Err(Error::io(
289 format!(
290 "fragments_since: given version {} is newer than manifest version {}",
291 since.version, self.version
292 ),
293 location!(),
294 ));
295 }
296 let start = since.max_fragment_id();
297 Ok(self
298 .fragments
299 .iter()
300 .filter(|&f| start.map(|s| f.id > s).unwrap_or(true))
301 .cloned()
302 .collect())
303 }
304
305 pub fn fragments_by_offset_range(&self, range: Range<usize>) -> Vec<(usize, &Fragment)> {
321 let start = range.start;
322 let end = range.end;
323 let idx = self
324 .fragment_offsets
325 .binary_search(&start)
326 .unwrap_or_else(|idx| idx - 1);
327
328 let mut fragments = vec![];
329 for i in idx..self.fragments.len() {
330 if self.fragment_offsets[i] >= end
331 || self.fragment_offsets[i] + self.fragments[i].num_rows().unwrap_or_default()
332 <= start
333 {
334 break;
335 }
336 fragments.push((self.fragment_offsets[i], &self.fragments[i]));
337 }
338
339 fragments
340 }
341
342 pub fn uses_move_stable_row_ids(&self) -> bool {
344 self.reader_feature_flags & FLAG_MOVE_STABLE_ROW_IDS != 0
345 }
346
347 pub fn serialized(&self) -> Vec<u8> {
350 let pb_manifest: pb::Manifest = self.into();
351 pb_manifest.encode_to_vec()
352 }
353
354 pub fn should_use_legacy_format(&self) -> bool {
355 self.data_storage_format.version == LEGACY_FORMAT_VERSION
356 }
357}
358
359#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
360pub struct WriterVersion {
361 pub library: String,
362 pub version: String,
363}
364
365#[derive(Debug, Clone, PartialEq, DeepSizeOf)]
366pub struct DataStorageFormat {
367 pub file_format: String,
368 pub version: String,
369}
370
371const LANCE_FORMAT_NAME: &str = "lance";
372
373impl DataStorageFormat {
374 pub fn new(version: LanceFileVersion) -> Self {
375 Self {
376 file_format: LANCE_FORMAT_NAME.to_string(),
377 version: version.resolve().to_string(),
378 }
379 }
380
381 pub fn lance_file_version(&self) -> Result<LanceFileVersion> {
382 self.version.parse::<LanceFileVersion>()
383 }
384}
385
386impl Default for DataStorageFormat {
387 fn default() -> Self {
388 Self::new(LanceFileVersion::default())
389 }
390}
391
392impl From<pb::manifest::DataStorageFormat> for DataStorageFormat {
393 fn from(pb: pb::manifest::DataStorageFormat) -> Self {
394 Self {
395 file_format: pb.file_format,
396 version: pb.version,
397 }
398 }
399}
400
401#[derive(Debug, Clone, Copy, PartialEq, Eq)]
402pub enum VersionPart {
403 Major,
404 Minor,
405 Patch,
406}
407
408impl WriterVersion {
409 pub fn semver(&self) -> Option<(u32, u32, u32, Option<&str>)> {
412 let mut parts = self.version.split('.');
413 let major = parts.next().unwrap_or("0").parse().ok()?;
414 let minor = parts.next().unwrap_or("0").parse().ok()?;
415 let patch = parts.next().unwrap_or("0").parse().ok()?;
416 let tag = parts.next();
417 Some((major, minor, patch, tag))
418 }
419
420 pub fn semver_or_panic(&self) -> (u32, u32, u32, Option<&str>) {
421 self.semver()
422 .unwrap_or_else(|| panic!("Invalid writer version: {}", self.version))
423 }
424
425 pub fn older_than(&self, major: u32, minor: u32, patch: u32) -> bool {
427 let version = self.semver_or_panic();
428 (version.0, version.1, version.2) < (major, minor, patch)
429 }
430
431 pub fn bump(&self, part: VersionPart, keep_tag: bool) -> Self {
432 let parts = self.semver_or_panic();
433 let tag = if keep_tag { parts.3 } else { None };
434 let new_parts = match part {
435 VersionPart::Major => (parts.0 + 1, parts.1, parts.2, tag),
436 VersionPart::Minor => (parts.0, parts.1 + 1, parts.2, tag),
437 VersionPart::Patch => (parts.0, parts.1, parts.2 + 1, tag),
438 };
439 let new_version = if let Some(tag) = tag {
440 format!("{}.{}.{}.{}", new_parts.0, new_parts.1, new_parts.2, tag)
441 } else {
442 format!("{}.{}.{}", new_parts.0, new_parts.1, new_parts.2)
443 };
444 Self {
445 library: self.library.clone(),
446 version: new_version,
447 }
448 }
449}
450
451impl Default for WriterVersion {
452 #[cfg(not(test))]
453 fn default() -> Self {
454 Self {
455 library: "lance".to_string(),
456 version: env!("CARGO_PKG_VERSION").to_string(),
457 }
458 }
459
460 #[cfg(test)]
462 fn default() -> Self {
463 Self {
464 library: "lance".to_string(),
465 version: env!("CARGO_PKG_VERSION").to_string(),
466 }
467 .bump(VersionPart::Patch, true)
468 }
469}
470
471impl ProtoStruct for Manifest {
472 type Proto = pb::Manifest;
473}
474
475impl TryFrom<pb::Manifest> for Manifest {
476 type Error = Error;
477
478 fn try_from(p: pb::Manifest) -> Result<Self> {
479 let timestamp_nanos = p.timestamp.map(|ts| {
480 let sec = ts.seconds as u128 * 1e9 as u128;
481 let nanos = ts.nanos as u128;
482 sec + nanos
483 });
484 let writer_version = match p.writer_version {
486 Some(pb::manifest::WriterVersion { library, version }) => {
487 Some(WriterVersion { library, version })
488 }
489 _ => None,
490 };
491 let fragments = Arc::new(
492 p.fragments
493 .into_iter()
494 .map(Fragment::try_from)
495 .collect::<Result<Vec<_>>>()?,
496 );
497 let fragment_offsets = compute_fragment_offsets(fragments.as_slice());
498 let fields_with_meta = FieldsWithMeta {
499 fields: Fields(p.fields),
500 metadata: p.metadata,
501 };
502
503 if FLAG_MOVE_STABLE_ROW_IDS & p.reader_feature_flags != 0
504 && !fragments.iter().all(|frag| frag.row_id_meta.is_some())
505 {
506 return Err(Error::Internal {
507 message: "All fragments must have row ids".into(),
508 location: location!(),
509 });
510 }
511
512 let data_storage_format = match p.data_format {
513 None => {
514 if let Some(inferred_version) = Fragment::try_infer_version(fragments.as_ref())? {
515 DataStorageFormat::new(inferred_version)
517 } else {
518 if has_deprecated_v2_feature_flag(p.writer_feature_flags) {
520 DataStorageFormat::new(LanceFileVersion::Stable)
521 } else {
522 DataStorageFormat::new(LanceFileVersion::Legacy)
523 }
524 }
525 }
526 Some(format) => DataStorageFormat::from(format),
527 };
528
529 let schema = Schema::from(fields_with_meta);
530 let local_schema = schema.retain_storage_class(StorageClass::Default);
531
532 Ok(Self {
533 schema,
534 local_schema,
535 version: p.version,
536 writer_version,
537 version_aux_data: p.version_aux_data as usize,
538 index_section: p.index_section.map(|i| i as usize),
539 timestamp_nanos: timestamp_nanos.unwrap_or(0),
540 tag: if p.tag.is_empty() { None } else { Some(p.tag) },
541 reader_feature_flags: p.reader_feature_flags,
542 writer_feature_flags: p.writer_feature_flags,
543 max_fragment_id: p.max_fragment_id,
544 fragments,
545 transaction_file: if p.transaction_file.is_empty() {
546 None
547 } else {
548 Some(p.transaction_file)
549 },
550 fragment_offsets,
551 next_row_id: p.next_row_id,
552 data_storage_format,
553 config: p.config,
554 blob_dataset_version: if p.blob_dataset_version == 0 {
555 None
556 } else {
557 Some(p.blob_dataset_version)
558 },
559 })
560 }
561}
562
563impl From<&Manifest> for pb::Manifest {
564 fn from(m: &Manifest) -> Self {
565 let timestamp_nanos = if m.timestamp_nanos == 0 {
566 None
567 } else {
568 let nanos = m.timestamp_nanos % 1e9 as u128;
569 let seconds = ((m.timestamp_nanos - nanos) / 1e9 as u128) as i64;
570 Some(Timestamp {
571 seconds,
572 nanos: nanos as i32,
573 })
574 };
575 let fields_with_meta: FieldsWithMeta = (&m.schema).into();
576 Self {
577 fields: fields_with_meta.fields.0,
578 version: m.version,
579 writer_version: m
580 .writer_version
581 .as_ref()
582 .map(|wv| pb::manifest::WriterVersion {
583 library: wv.library.clone(),
584 version: wv.version.clone(),
585 }),
586 fragments: m.fragments.iter().map(pb::DataFragment::from).collect(),
587 metadata: fields_with_meta.metadata,
588 version_aux_data: m.version_aux_data as u64,
589 index_section: m.index_section.map(|i| i as u64),
590 timestamp: timestamp_nanos,
591 tag: m.tag.clone().unwrap_or_default(),
592 reader_feature_flags: m.reader_feature_flags,
593 writer_feature_flags: m.writer_feature_flags,
594 max_fragment_id: m.max_fragment_id,
595 transaction_file: m.transaction_file.clone().unwrap_or_default(),
596 next_row_id: m.next_row_id,
597 data_format: Some(pb::manifest::DataStorageFormat {
598 file_format: m.data_storage_format.file_format.clone(),
599 version: m.data_storage_format.version.clone(),
600 }),
601 config: m.config.clone(),
602 blob_dataset_version: m.blob_dataset_version.unwrap_or_default(),
603 }
604 }
605}
606
607#[async_trait]
608pub trait SelfDescribingFileReader {
609 async fn try_new_self_described(
617 object_store: &ObjectStore,
618 path: &Path,
619 cache: Option<&LanceCache>,
620 ) -> Result<Self>
621 where
622 Self: Sized,
623 {
624 let reader = object_store.open(path).await?;
625 Self::try_new_self_described_from_reader(reader.into(), cache).await
626 }
627
628 async fn try_new_self_described_from_reader(
629 reader: Arc<dyn Reader>,
630 cache: Option<&LanceCache>,
631 ) -> Result<Self>
632 where
633 Self: Sized;
634}
635
636#[async_trait]
637impl SelfDescribingFileReader for FileReader {
638 async fn try_new_self_described_from_reader(
639 reader: Arc<dyn Reader>,
640 cache: Option<&LanceCache>,
641 ) -> Result<Self> {
642 let metadata = Self::read_metadata(reader.as_ref(), cache).await?;
643 let manifest_position = metadata.manifest_position.ok_or(Error::Internal {
644 message: format!(
645 "Attempt to open file at {} as self-describing but it did not contain a manifest",
646 reader.path(),
647 ),
648 location: location!(),
649 })?;
650 let mut manifest: Manifest = read_struct(reader.as_ref(), manifest_position).await?;
651 if manifest.should_use_legacy_format() {
652 populate_schema_dictionary(&mut manifest.schema, reader.as_ref()).await?;
653 }
654 let schema = manifest.schema;
655 let max_field_id = schema.max_field_id().unwrap_or_default();
656 Self::try_new_from_reader(
657 reader.path(),
658 reader.clone(),
659 Some(metadata),
660 schema,
661 0,
662 0,
663 max_field_id,
664 cache,
665 )
666 .await
667 }
668}
669
670#[cfg(test)]
671mod tests {
672 use crate::format::DataFile;
673
674 use super::*;
675
676 use arrow_schema::{Field as ArrowField, Schema as ArrowSchema};
677 use lance_core::datatypes::Field;
678
679 #[test]
680 fn test_writer_version() {
681 let wv = WriterVersion::default();
682 assert_eq!(wv.library, "lance");
683 let parts = wv.semver().unwrap();
684 assert_eq!(
685 parts,
686 (
687 env!("CARGO_PKG_VERSION_MAJOR").parse().unwrap(),
688 env!("CARGO_PKG_VERSION_MINOR").parse().unwrap(),
689 env!("CARGO_PKG_VERSION_PATCH").parse::<u32>().unwrap() + 1,
691 None
692 )
693 );
694 assert_eq!(
695 format!("{}.{}.{}", parts.0, parts.1, parts.2 - 1),
696 env!("CARGO_PKG_VERSION")
697 );
698 for part in &[VersionPart::Major, VersionPart::Minor, VersionPart::Patch] {
699 let bumped = wv.bump(*part, false);
700 let bumped_parts = bumped.semver_or_panic();
701 assert!(wv.older_than(bumped_parts.0, bumped_parts.1, bumped_parts.2));
702 }
703 }
704
705 #[test]
706 fn test_fragments_by_offset_range() {
707 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
708 "a",
709 arrow_schema::DataType::Int64,
710 false,
711 )]);
712 let schema = Schema::try_from(&arrow_schema).unwrap();
713 let fragments = vec![
714 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
715 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
716 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
717 ];
718 let manifest = Manifest::new(
719 schema,
720 Arc::new(fragments),
721 DataStorageFormat::default(),
722 None,
723 );
724
725 let actual = manifest.fragments_by_offset_range(0..10);
726 assert_eq!(actual.len(), 1);
727 assert_eq!(actual[0].0, 0);
728 assert_eq!(actual[0].1.id, 0);
729
730 let actual = manifest.fragments_by_offset_range(5..15);
731 assert_eq!(actual.len(), 2);
732 assert_eq!(actual[0].0, 0);
733 assert_eq!(actual[0].1.id, 0);
734 assert_eq!(actual[1].0, 10);
735 assert_eq!(actual[1].1.id, 1);
736
737 let actual = manifest.fragments_by_offset_range(15..50);
738 assert_eq!(actual.len(), 2);
739 assert_eq!(actual[0].0, 10);
740 assert_eq!(actual[0].1.id, 1);
741 assert_eq!(actual[1].0, 25);
742 assert_eq!(actual[1].1.id, 2);
743
744 let actual = manifest.fragments_by_offset_range(45..100);
746 assert!(actual.is_empty());
747
748 assert!(manifest.fragments_by_offset_range(200..400).is_empty());
749 }
750
751 #[test]
752 fn test_max_field_id() {
753 let mut field0 =
755 Field::try_from(ArrowField::new("a", arrow_schema::DataType::Int64, false)).unwrap();
756 field0.set_id(-1, &mut 0);
757 let mut field2 =
758 Field::try_from(ArrowField::new("b", arrow_schema::DataType::Int64, false)).unwrap();
759 field2.set_id(-1, &mut 2);
760
761 let schema = Schema {
762 fields: vec![field0, field2],
763 metadata: Default::default(),
764 };
765 let fragments = vec![
766 Fragment {
767 id: 0,
768 files: vec![DataFile::new_legacy_from_fields("path1", vec![0, 1, 2])],
769 deletion_file: None,
770 row_id_meta: None,
771 physical_rows: None,
772 },
773 Fragment {
774 id: 1,
775 files: vec![
776 DataFile::new_legacy_from_fields("path2", vec![0, 1, 43]),
777 DataFile::new_legacy_from_fields("path3", vec![2]),
778 ],
779 deletion_file: None,
780 row_id_meta: None,
781 physical_rows: None,
782 },
783 ];
784
785 let manifest = Manifest::new(
786 schema,
787 Arc::new(fragments),
788 DataStorageFormat::default(),
789 None,
790 );
791
792 assert_eq!(manifest.max_field_id(), 43);
793 }
794
795 #[test]
796 fn test_config() {
797 let arrow_schema = ArrowSchema::new(vec![ArrowField::new(
798 "a",
799 arrow_schema::DataType::Int64,
800 false,
801 )]);
802 let schema = Schema::try_from(&arrow_schema).unwrap();
803 let fragments = vec![
804 Fragment::with_file_legacy(0, "path1", &schema, Some(10)),
805 Fragment::with_file_legacy(1, "path2", &schema, Some(15)),
806 Fragment::with_file_legacy(2, "path3", &schema, Some(20)),
807 ];
808 let mut manifest = Manifest::new(
809 schema,
810 Arc::new(fragments),
811 DataStorageFormat::default(),
812 None,
813 );
814
815 let mut config = manifest.config.clone();
816 config.insert("lance.test".to_string(), "value".to_string());
817 config.insert("other-key".to_string(), "other-value".to_string());
818
819 manifest.update_config(config.clone());
820 assert_eq!(manifest.config, config.clone());
821
822 config.remove("other-key");
823 manifest.delete_config_keys(&["other-key"]);
824 assert_eq!(manifest.config, config);
825 }
826}